Source code for deeptools.deepBlue

#!/usr/bin/env python
try:
    # python 2
    import xmlrpclib
except:
    # python 3
    import xmlrpc.client as xmlrpclib
import time
import tempfile
import os.path
import sys
import pyBigWig
from deeptools.utilities import mungeChromosome
from deeptoolsintervals import GTF
import datetime


[docs]def isDeepBlue(fname): """ Returns true if the file ends in .wig, .wiggle, or .bedgraph, since these indicate a file on the deepBlue server """ if fname.endswith(".wig"): return True if fname.endswith(".wiggle"): return True if fname.endswith(".bedgraph"): return True if fname.startswith("http") or fname.startswith("ftp"): return False # For ENCODE samples, the "Name" is just the ENCODE sample ID, so as a fallback check for files that aren't there. if not os.path.exists(fname): return True return False
[docs]def mergeRegions(regions): """ Given a list of [(chrom, start, end), ...], merge all overlapping regions This returns a dict, where values are sorted lists of [start, end]. """ bar = sorted(regions) out = dict() last = [None, None, None] for reg in bar: if reg[0] == last[0] and reg[1] <= last[2]: if reg[2] > last[2]: last[2] = reg[2] continue else: if last[0]: if last[0] not in out: out[last[0]] = list() out[last[0]].append([last[1], last[2]]) last = [reg[0], reg[1], reg[2]] if last[0] not in out: out[last[0]] = list() out[last[0]].append([last[1], last[2]]) return out
[docs]def makeTiles(db, args): """ Given a deepBlue object, return a list of regions that will be queried """ out = [] for (k, v) in db.chromsTuple: start = 0 while start <= v: end = start + args.binSize if end > v: end = v out.append([k, start, end]) start += end + args.distanceBetweenBins return out
[docs]def makeChromTiles(db): """ Make a region for each chromosome """ out = [] for (k, v) in db.chromsTuple: out.append([k, 0, v]) return out
[docs]def makeRegions(BED, args): """ Given a list of BED/GTF files, make a list of regions. These are vaguely extended as appropriate. For simplicity, the maximum of --beforeRegionStartLength and --afterRegionStartLength are tacked on to each end and transcripts are used for GTF files. """ itree = GTF(BED, transcriptID=args.transcriptID, transcript_id_designator=args.transcript_id_designator) o = [] extend = 0 # The before/after stuff is specific to computeMatrix if "beforeRegionStartLength" in args: extend = max(args.beforeRegionStartLength, args.afterRegionStartLength) for chrom in itree.chroms: regs = itree.findOverlaps(chrom, 0, 4294967295) # bigWig files use 32 bit coordinates for reg in regs: o.append([chrom, max(0, reg[0] - extend), reg[1] + extend]) del itree return o
[docs]def preloadWrapper(foo): """ This is a wrapper around the preload function for multiprocessing """ args = foo[2] regs = foo[3] res = deepBlue(foo[0], url=args.deepBlueURL, userKey=args.userKey) return res.preload(regs, tmpDir=args.deepBlueTempDir)
[docs]class deepBlue(object): def __init__(self, sample, url="http://deepblue.mpi-inf.mpg.de/xmlrpc", userKey="anonymous_key"): """ Connect to the requested deepblue server with the given user key and request the specifed sample from it. >>> sample = "S002R5H1.ERX300721.H3K4me3.bwa.GRCh38.20150528.bedgraph" >>> db = deepBlue(sample) # doctest: +SKIP >>> assert(db.chroms("chr1") == 248956422) # doctest: +SKIP """ self.sample = sample self.url = url self.userKey = userKey self.server = xmlrpclib.Server(url, allow_none=True) self.info = None self.experimentID = None self.genome = None self.chromsDict = None self.chromsTuple = None # Set self.experimentID experimentID = self.getEID() if not experimentID: raise RuntimeError("The requested sample({}) has no associated experiment! If you did not intend to use samples on deepBlue, then it appears either you misspelled a file name or (if you're using BAM files for input) one of your BAM files is lacking a valid index.".format(sample)) # Set self.info (status, resp) = self.server.info(self.experimentID, userKey) if status != "okay": raise RuntimeError("Received the following error while fetching information about '{}': {}".format(resp, sample)) self.info = resp[0] # Set self.genome genome = self.getGenome() if not genome: raise RuntimeError("Unable to determine an appropriate genome for '{}'".format(sample)) # Set self.chroms chroms = self.getChroms() if not chroms: raise RuntimeError("Unable to determine chromosome names/sizes for '{}'".format(sample))
[docs] def getEID(self): """ Given a sample name, return its associated experiment ID (or None on error). self.experimentID is then the internal ID (e.g., e52525) """ (status, resps) = self.server.search(self.sample, "experiments", self.userKey) if status != "okay": raise RuntimeError("Received an error ({}) while searching for the experiment associated with '{}'".format(resps, self.sample)) for resp in resps: if resp[1] == self.sample: self.experimentID = resp[0] return resp[0] return None
[docs] def getGenome(self): """ Determines and sets the genome assigned to a given sample. On error, this raises a runtime exception. self.genome is then the internal genome ID. """ if "genome" in self.info.keys(): self.genome = self.info["genome"] return self.genome
[docs] def getChroms(self): """ Determines and sets the chromosome names/sizes for a given sample. On error, this raises a runtime exception. self.chroms is then a dictionary of chromosome:length pairs """ (status, resp) = self.server.chromosomes(self.genome, self.userKey) if status != "okay": raise RuntimeError("Received an error while fetching chromosome information for '{}': {}".format(self.sample, resp)) self.chromsDict = {k: v for k, v in resp} self.chromsTuple = [(k, v) for k, v in resp] return resp
[docs] def chroms(self, chrom=None): """ Like the chroms() function in pyBigWig, returns either chromsDict (chrom is None) or the length of a given chromosome """ if chrom is None: return self.chromsDict elif chrom in self.chromsDict: return self.chromsDict[chrom] return None
[docs] def close(self): pass
[docs] def preload(self, regions, tmpDir=None): """ Given a sample and a set of regions, write a bigWig file containing the underlying signal. This function returns the file name, which needs to be deleted by the calling function at some point. This sends queries one chromosome at a time, due to memory limits on deepBlue """ startTime = datetime.datetime.now() regions2 = mergeRegions(regions) # Make a temporary file f = tempfile.NamedTemporaryFile(delete=False, dir=tmpDir) fname = f.name f.close() # Start with the bigWig file bw = pyBigWig.open(fname, "w") bw.addHeader(self.chromsTuple, maxZooms=0) # This won't work in IGV! # Make a string out of everything in a resonable order for k, v in self.chromsTuple: # Munge chromosome names as appropriate chrom = mungeChromosome(k, regions2.keys()) if not chrom: continue if chrom not in regions2 or len(regions2) == 0: continue regionsStr = "\n".join(["{}\t{}\t{}".format(k, reg[0], reg[1]) for reg in regions2[chrom]]) regionsStr += "\n" # Send the regions (status, regionsID) = self.server.input_regions(self.genome, regionsStr, self.userKey) if status != "okay": raise RuntimeError("Received the following error while sending regions for '{}': {}".format(regionsID, self.sample)) # Get the experiment information (status, queryID) = self.server.select_experiments(self.sample, k, None, None, self.userKey) if status != "okay": raise RuntimeError("Received the following error while running select_experiments on file '{}': {}".format(self.sample, queryID)) if not queryID: raise RuntimeError("Somehow, we received None as a query ID (file '{}')".format(self.sample)) # Intersect (status, intersectID) = self.server.intersection(queryID, regionsID, self.userKey) if status != "okay": raise RuntimeError("Received the following error while running intersection on file '{}': {}".format(self.sample, intersectID)) if not intersectID: raise RuntimeError("Somehow, we received None as an intersect ID (file '{}')".format(self.sample)) # Query the regions (status, reqID) = self.server.get_regions(intersectID, "START,END,VALUE", self.userKey) if status != "okay": raise RuntimeError("Received the following error while fetching regions in file '{}': {}".format(self.sample, reqID)) # Wait for the server to process the data (status, info) = self.server.info(reqID, self.userKey) request_status = info[0]["state"] while request_status != "done" and request_status != "failed": time.sleep(0.1) (status, info) = self.server.info(reqID, self.userKey) request_status = info[0]["state"] # Get the actual data (status, resp) = self.server.get_request_data(reqID, self.userKey) if status != "okay": raise RuntimeError("Received the following error while fetching data in file '{}': {}".format(self.sample, resp)) for intervals in resp.split("\n"): interval = intervals.split("\t") if interval[0] == '': continue bw.addEntries([k], [int(interval[0]) - 1], ends=[int(interval[1]) - 1], values=[float(interval[2])]) bw.close() sys.stderr.write("{} done (took {})\n".format(self.sample, datetime.datetime.now() - startTime)) sys.stderr.flush() return fname