Source code for deeptools.utilities

import sys
import os

debug = 0

[docs]def getGC_content(dnaString, as_fraction=True): if len(dnaString) == 0: return None if dnaString.count('N') > len(dnaString) * 0.05: raise Exception("WARNING: too many NNNs present in sequence of length {}".format(len(dnaString))) return None gc = 0 gc += dnaString.count('G') gc += dnaString.count('g') gc += dnaString.count('C') gc += dnaString.count('c') if as_fraction: return(float(gc) / len(dnaString)) else: return gc
[docs]def tbitToBamChrName(tbitNames, bamNames): """ checks if the chromosome names from the two-bit and bam file coincide. In case they do not coincide, a fix is tried. If successful, then a mapping table is returned. tbitNames and bamNames should be lists """ chrNameBitToBam = dict((x, x) for x in tbitNames) if set(bamNames) != set(tbitNames): sys.stderr.write("Bam and 2bit do not have matching " "chromosome names:\n2bit:{}\n\nbam:{}" "\n\n".format(tbitNames, bamNames)) if len(set(bamNames).intersection(set(tbitNames))) > 0: sys.stderr.write("Using the following common chromosomes between " "bam chromosome names and 2bit chromosome " "names:\n") for item in set(bamNames).intersection(set(tbitNames)): sys.stderr.write(item + "\n") chrNameBitToBam = dict([(x, x) for x in set(bamNames).intersection(set(tbitNames))]) elif set(["chr" + x if x != 'dmel_mitochondrion_genome' else 'chrM' for x in bamNames]) == set(tbitNames): sys.stderr.write("Adding 'chr' seems to solve the problem. " "Continuing ...") chrNameBitToBam = dict([("chr" + x if x != 'dmel_mitochondrion_genome' else 'chrM', x) for x in bamNames]) elif set([x for x in tbitNames if x.count('random') == 0 and x.count('chrM') == 0]) == set(bamNames): if debug: print "Removing random and mitochondrial chromosomes"\ "fixes the problem" chrNameBitToBam = dict([(x, x) for x in tbitNames if x.count('random') == 0 and x.count('chrM') == 0]) else: if debug: print "Index and reference do not have matching " "chromosome names" exit(0) return chrNameBitToBam
[docs]def getCommonChrNames(bamFileHandlers, verbose=True): r""" Compares the names and lengths of a list of bam file handlers. The input is list of pysam file handlers. The function returns a duple containing the common chromosome names and the common chromome lengths. Hopefully, only _random and chrM are not common. """ def get_chrom_and_size(bam_handler): """ Reads the chromosome/scaffold name and the length from the bam file and returns a list of (chromname, size) tuples :param bam_handler: :return: list of (chrom, size) tuples """ return [(bam_handler.references[i], bam_handler.lengths[i]) for i in range(0, len(bam_handler.references))] def print_chr_names_and_size(chr_set): sys.stderr.write("chromosome\tlength\n") for name, size in chr_set: sys.stderr.write("{0:>15}\t{1:>10}\n".format(name, size)) common_chr = set(get_chrom_and_size(bamFileHandlers[0])) non_common_chr = set() for j in range(1, len(bamFileHandlers)): _names_and_size = set(get_chrom_and_size(bamFileHandlers[j])) if len(common_chr & _names_and_size) == 0: # try to add remove 'chr' from the chromosome name _corr_names_size = set() for chrom_name, size in _names_and_size: if chrom_name.startswith('chr'): _corr_names_size.add((chrom_name[3:], size)) else: _corr_names_size.add(('chr' + chrom_name, size)) if len(common_chr & _corr_names_size) == 0: message = "No common chromosomes found. Are the bam files files " \ "from the same species and same assemblies?\n" sys.stderr.write(message) print_chr_names_and_size(common_chr) sys.stderr.write("\nand the following is the list of the unmatched chromosome and chromosome\n" "lengths from file\n{}\n".format( print_chr_names_and_size(_names_and_size) exit(1) else: _names_and_size = _corr_names_size non_common_chr |= common_chr ^ _names_and_size common_chr = common_chr & _names_and_size if len(non_common_chr) > 0: sys.stderr.write("\nThe following chromosome names did not match between the the bam files\n") print_chr_names_and_size(non_common_chr) # the common chromosomes has to be sorted as in the original # bam files chr_sizes = [] for tuple in get_chrom_and_size(bamFileHandlers[0]): if tuple in common_chr: chr_sizes.append(tuple) return chr_sizes, non_common_chr
[docs]def copyFileInMemory(filePath, suffix=''): """ copies a file into the special /dev/shm device which moves the file into memory. This process speeds ups the multiprocessor access to such files """ # fallback for windows users if == 'nt': return filePath memFileName = getTempFileName(suffix=suffix) import shutil shutil.copyfile(filePath, memFileName) return memFileName
[docs]def getTempFileName(suffix=''): """ returns a temporary file name. If the special /dev/shm device is available, the temporary file would be located in that folder. /dv/shm is a folder that resides in memory and which has much faster accession. """ import tempfile import config as cfg # get temp dir from configuration file tmp_dir = cfg.config.get('general', 'tmp_dir') if tmp_dir == 'default': _tempFile = tempfile.NamedTemporaryFile(prefix="_deeptools_", suffix=suffix, delete=False) else: try: _tempFile = tempfile.NamedTemporaryFile(prefix="_deeptools_", suffix=suffix, dir=tmp_dir, delete=False) # fall back to system tmp file except OSError: _tempFile = tempfile.NamedTemporaryFile(prefix="_deeptools_", suffix=suffix, delete=False) memFileName = _tempFile.close() return memFileName
[docs]def which(program): """ method to identify if a program is on the user PATH variable. From: """ import os def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None