Source code for deeptools.utilities

import sys
import os
from deeptoolsintervals import GTF
from deeptools.bamHandler import openBam
import matplotlib as mpl
mpl.use('Agg')
from deeptools import cm  # noqa: F401
import numpy as np


debug = 0


[docs] def smartLabel(label): """ Given a file name, likely with a path, return the file name without the path and with the file extension removed. Thus, something like /path/to/some.special.file should return some.special, since only the first extension (if present) should be stripped. """ lab = os.path.splitext(os.path.basename(label))[0] if lab == '': # Maybe we have a dot file? lab = os.path.basename(label) return lab
[docs] def smartLabels(labels): return [smartLabel(x) for x in labels]
[docs] def convertCmap(c, vmin=0, vmax=1): cmap = mpl.cm.get_cmap(c) norm = mpl.colors.Normalize(vmin=vmin, vmax=vmax) cmap_rgb = [] for i in range(255): k = mpl.colors.colorConverter.to_rgb(cmap(norm(i))) cmap_rgb.append(k) h = 1.0 / 254 colorScale = [] for k in range(255): C = list(map(np.uint8, np.array(cmap(k * h)[:3]) * 255)) colorScale.append([k * h, 'rgb' + str((C[0], C[1], C[2]))]) return colorScale
[docs] def getTLen(read, notAbs=False): """ Get the observed template length of a read. For a paired-end read, this is normally just the TLEN field. For SE reads this is the observed coverage of the genome (excluding splicing). """ if abs(read.template_length) > 0: if notAbs: return read.template_length return abs(read.template_length) tlen = 0 try: # the cigartuples property apparently didn't always exist for op, opLen in read.cigartuples: if op == 0: tlen += opLen elif op == 2: tlen += opLen elif op == 7: tlen += opLen elif op == 8: tlen += opLen except: pass return tlen
[docs] def getGC_content(tb, chrom, fragStart, fragEnd, fraction=True): bases = tb.bases(chrom, fragStart, fragEnd, fraction=False) if fragEnd > tb.chroms(chrom): fragEnd = tb.chroms(chrom) if sum(bases.values()) < 0.95 * (fragEnd - fragStart): raise Exception("WARNING: too many NNNs present in {}:{}-{}".format(chrom, fragStart, fragEnd)) return None if fraction: return (bases['G'] + bases['C']) / float(fragEnd - fragStart) return bases['G'] + bases['C']
[docs] def tbitToBamChrName(tbitNames, bamNames): """ checks if the chromosome names from the two-bit and bam file coincide. In case they do not coincide, a fix is tried. If successful, then a mapping table is returned. tbitNames and bamNames should be lists """ chrNameBitToBam = dict((x, x) for x in tbitNames) if set(bamNames) != set(tbitNames): sys.stderr.write("Bam and 2bit do not have matching " "chromosome names:\n2bit:{}\n\nbam:{}" "\n\n".format(tbitNames, bamNames)) if len(set(bamNames).intersection(set(tbitNames))) > 0: sys.stderr.write("Using the following common chromosomes between " "bam chromosome names and 2bit chromosome " "names:\n") for item in set(bamNames).intersection(set(tbitNames)): sys.stderr.write(item + "\n") chrNameBitToBam = dict([(x, x) for x in set(bamNames).intersection(set(tbitNames))]) elif set(["chr" + x if x != 'dmel_mitochondrion_genome' else 'chrM' for x in bamNames]) == set(tbitNames): sys.stderr.write("Adding 'chr' seems to solve the problem. " "Continuing ...") chrNameBitToBam = dict([("chr" + x if x != 'dmel_mitochondrion_genome' else 'chrM', x) for x in bamNames]) elif set([x for x in tbitNames if x.count('random') == 0 and x.count('chrM') == 0]) == set(bamNames): if debug: print("Removing random and mitochondrial chromosomes" "fixes the problem") chrNameBitToBam = dict([(x, x) for x in tbitNames if x.count('random') == 0 and x.count('chrM') == 0]) elif len(set(["chr" + x for x in bamNames if x != 'dmel_mitochondrion_genome']).intersection(set(tbitNames))) > 0: bamNames2 = ["chr" + x for x in bamNames if x != 'dmel_mitochondrion_genome'] sys.stderr.write("Adding 'chr' seems to solve the problem for the following " "chromosomes...") for item in set(bamNames2).intersection(set(tbitNames)): sys.stderr.write(item + "\n") chrNameBitToBam = {"chrM": "MT"} for i in range(len(bamNames)): if bamNames2[i] in tbitNames: chrNameBitToBam.update({bamNames2[i]: bamNames[i]}) elif len(set([x[3:] for x in bamNames if x.startswith("chr")]).intersection(set(tbitNames))) > 0: bamNames = [x for x in bamNames] bamNames2 = [x[3:] for x in bamNames if x.startswith("chr")] if debug: sys.stderr.write("Removing 'chr' seems to solve the problem for the following " "chromosomes...") for item in set(bamNames).intersection(set(tbitNames)): sys.stderr.write(item + "\n") chrNameBitToBam = {"MT": "chrM"} for i in range(len(bamNames)): if bamNames2[i] in tbitNames: chrNameBitToBam.update({bamNames2[i]: bamNames[i]}) else: if debug: print("Index and reference do not have matching ") "chromosome names" exit(0) return chrNameBitToBam
[docs] def getCommonChrNames(bamFileHandles, verbose=True): r""" Compares the names and lengths of a list of bam file handles. The input is list of pysam file handles. The function returns a duple containing the common chromosome names and the common chromome lengths. Hopefully, only _random and chrM are not common. """ def get_chrom_and_size(bam_handler): """ Reads the chromosome/scaffold name and the length from the bam file and returns a list of (chromname, size) tuples :param bam_handler: :return: list of (chrom, size) tuples """ try: # BAM file return [(x, y) for x, y in zip(bam_handler.references, bam_handler.lengths)] except: return [(k, v) for k, v in bam_handler.chroms().items()] def print_chr_names_and_size(chr_set): sys.stderr.write("chromosome\tlength\n") for name, size in chr_set: sys.stderr.write("{0:>15}\t{1:>10}\n".format(name, size)) common_chr = set(get_chrom_and_size(bamFileHandles[0])) non_common_chr = set() for j in range(1, len(bamFileHandles)): _names_and_size = set(get_chrom_and_size(bamFileHandles[j])) if len(common_chr & _names_and_size) == 0: # try to add remove 'chr' from the chromosome name _corr_names_size = set() for chrom_name, size in _names_and_size: if chrom_name.startswith('chr'): _corr_names_size.add((chrom_name[3:], size)) else: _corr_names_size.add(('chr' + chrom_name, size)) if len(common_chr & _corr_names_size) == 0: message = "No common chromosomes found. Are the bam files files " \ "from the same species and same assemblies?\n" sys.stderr.write(message) print_chr_names_and_size(common_chr) sys.stderr.write("\nand the following is the list of the unmatched chromosome and chromosome\n" "lengths from file\n{}\n".format(bamFileHandles.name)) print_chr_names_and_size(_names_and_size) exit(1) else: _names_and_size = _corr_names_size non_common_chr |= common_chr ^ _names_and_size common_chr = common_chr & _names_and_size if len(non_common_chr) > 0: sys.stderr.write("\nThe following chromosome names did not match between the the bam files\n") print_chr_names_and_size(non_common_chr) # the common chromosomes has to be sorted as in the original # bam files chr_sizes = [] for tuple in get_chrom_and_size(bamFileHandles[0]): if tuple in common_chr: chr_sizes.append(tuple) return chr_sizes, non_common_chr
[docs] def copyFileInMemory(filePath, suffix=''): """ copies a file into the special /dev/shm device which moves the file into memory. This process speeds ups the multiprocessor access to such files """ # fallback for windows users if os.name == 'nt': return filePath memFileName = getTempFileName(suffix=suffix) import shutil shutil.copyfile(filePath, memFileName) return memFileName
[docs] def getTempFileName(suffix=''): """ Return a temporary file name. The calling function is responsible for deleting this upon completion. """ import tempfile _tempFile = tempfile.NamedTemporaryFile(prefix="_deeptools_", suffix=suffix, delete=False) memFileName = _tempFile.name _tempFile.close() return memFileName
[docs] def gtfOptions(allArgs=None): """ This is used a couple places to setup arguments to mapReduce """ transcriptID = "transcript" exonID = "exon" transcript_id_designator = "transcript_id" keepExons = False if allArgs is not None: allArgs = vars(allArgs) transcriptID = allArgs.get("transcriptID", transcriptID) exonID = allArgs.get("exonID", exonID) transcript_id_designator = allArgs.get("transcript_id_designator", transcript_id_designator) keepExons = allArgs.get("keepExons", keepExons) return transcriptID, exonID, transcript_id_designator, keepExons
[docs] def toString(s): """ This takes care of python2/3 differences """ if isinstance(s, str): return s if isinstance(s, bytes): if sys.version_info[0] == 2: return str(s) return s.decode('ascii') if isinstance(s, list): return [toString(x) for x in s] return s
[docs] def toBytes(s): """ Like toString, but for functions requiring bytes in python3 """ if sys.version_info[0] == 2: return s if isinstance(s, bytes): return s if isinstance(s, str): return bytes(s, 'ascii') if isinstance(s, list): return [toBytes(x) for x in s] return s
[docs] def mungeChromosome(chrom, chromList): """ A generic chromosome munging function. "chrom" is munged by adding/removing "chr" such that it appears in chromList On error, None is returned, but a common chromosome list should be used beforehand to avoid this possibility """ if chrom in chromList: return chrom if chrom == "MT" and "chrM" in chromList: return "chrM" if chrom == "chrM" and "MT" in chromList: return "MT" if chrom.startswith("chr") and chrom[3:] in chromList: return chrom[3:] if "chr" + chrom in chromList: return "chr" + chrom # This shouldn't actually happen return None
[docs] def bam_total_reads(bam_handle, chroms_to_ignore, stats): """ Count the total number of mapped reads in a BAM file, filtering the chromosome given in chroms_to_ignore list """ if chroms_to_ignore: return sum([s[0] for k, s in stats.items() if k not in chroms_to_ignore]) else: return sum([s[0] for s in stats.values()])
[docs] def bam_blacklisted_worker(args): bam, chrom, start, end = args fh = openBam(bam) blacklisted = 0 for r in fh.fetch(reference=chrom, start=start, end=end): if r.is_unmapped: continue if r.reference_start >= start and r.reference_start + r.infer_query_length(always=False) - 1 <= end: blacklisted += 1 fh.close() return blacklisted
[docs] def bam_blacklisted_reads(bam_handle, chroms_to_ignore, blackListFileName=None, numberOfProcessors=1): blacklisted = 0 if blackListFileName is None: return blacklisted # Get the chromosome lengths chromLens = {x: y for x, y in zip(bam_handle.references, bam_handle.lengths)} bl = GTF(blackListFileName) hasOverlaps, minOverlap = bl.hasOverlaps(returnDistance=True) if hasOverlaps: sys.exit("Your blacklist file(s) has (have) regions that overlap. Proceeding with such a file would result in deepTools incorrectly calculating scaling factors. As such, you MUST fix this issue before being able to proceed.\n") if minOverlap < 1000: sys.stderr.write("WARNING: The minimum distance between intervals in your blacklist is {}. It makes little biological sense to include small regions between two blacklisted regions. Instead, these should likely be blacklisted as well.\n".format(minOverlap)) regions = [] for chrom in bl.chroms: if (not chroms_to_ignore or chrom not in chroms_to_ignore) and chrom in chromLens: for reg in bl.findOverlaps(chrom, 0, chromLens[chrom]): regions.append([bam_handle.filename, chrom, reg[0], reg[1]]) if len(regions) > 0: import multiprocessing if len(regions) > 1 and numberOfProcessors > 1: pool = multiprocessing.Pool(numberOfProcessors) res = pool.map_async(bam_blacklisted_worker, regions).get(9999999) else: res = [bam_blacklisted_worker(x) for x in regions] for val in res: blacklisted += val return blacklisted