#! /usr/bin/python # Copyright (C) 2005 Peter Clote # All Rights Reserved. # # Permission to use, copy, modify, and distribute this # software for NON-COMMERCIAL purposes # and without fee is hereby granted provided that this # copyright notice appears in all copies. If you use, modify # or extend this code, please cite the reference # "Symmetric time warping, Boltzmann pair probabilities and # functional genomics", by P. Clote and J. Straubhaar, # submitted to Journal of Mathematical Biology. # # THE AUTHOR MAKES NO REPRESENTATIONS OR # WARRANTIES ABOUT THE SUITABILITY OF THE SOFTWARE, EITHER # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE # IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE, OR NON-INFRINGEMENT. THE AUTHOR # SHALL NOT BE LIABLE FOR ANY DAMAGES SUFFERED # BY LICENSEE AS A RESULT OF USING, MODIFYING OR DISTRIBUTING # THIS SOFTWARE OR ITS DERIVATIVES. # # # # NOTE: Most recent version, handles sequences and microarray data # timeWarpBoltzmann.py # P.Clote # Symmetric timewarp algorithm for expression data. # This implementation differs from standard Kruskal-Liberman # description in our treatment of analogue of gap in sequence alignment # Our treatment ensures that computing alignment from left to right # is identical to computing alignment from right to left, which is # indispensible in computing Boltzmann alignment. # Additionally, we compute the forward partition FZ and backward partition # BZ functions, as well as the Boltzmann pair probabilities Pr_A[a_i,b_j] # of time warping a_i with b_j in optimal time warping A. # # For debugging purposes, this program time warps nucleotide sequences. # To distinguish between nucleotide sequence input and expression data, # use the boolean constant SEQUENCE (1 for nucleotide data, 0 for gene # expression data). import string,sys,random,math import tempfile,os import getPAMmatrix from parameters import GAP,LINELEN,INF,EPSILON,k,T,DEBUG,VERBOSE from parameters import getNumericalSequence,printMatrix,getSequence from parameters import getSubsequencesFromAlignment, printSequenceAndAlignment from parameters import printList, distance, isNumber, SEQUENCE TAU = 10.0 #even sample intervals for yeast time series sequences TAUover2 = float(TAU)/2.0 MU = 40.0/3.0 #even sample intervals for human time series sequences #MU = 10.0 #NOTE: Yeast time series is 0,10min,...,160min for 2 cell cycles # Human time series is 0,2hrs,...,24hrs for 2 cell cycles #Thus with 12 human time points per 16 yeast time points, expansion factor #of 4/3 for human time series taken MUover2 = float(MU)/2.0 TAUMUover2 = float(TAU+MU)/2.0 GIFfile = 0 #GIFfile is 1 if user desires to create a GIF file #for the path graph, else 0 def printPathGraph(L): infilename = tempfile.mktemp() outfilename = tempfile.mktemp() infile = open(infilename,"w") outfile = open(outfilename,"w") #WARNING: infile is write-only and used to write contents of list L #Then infile is passed to gnuplot to produce an eps output graph in outfile for (x,y) in L: infile.write(str(x)+"\t"+str(y)+"\n") infile.flush(); infile.close() #I hope that it stays present long enough to call gnuplot outfile.write("set terminal postscript eps\n") graphFileName = "pathGraph.eps" cmd = 'set output "' + graphFileName +'"\n' cmd += 'set pointsize 3 ' + '\n' cmd += 'set noautoscale ' + '\n' #unscaled axis distances outfile.write(cmd) outfile.write('plot "'+infilename+ '" with linespoints\n') outfile.flush() cmd = "gnuplot " + outfilename os.system(cmd) if GIFfile: cmd = "ps2gif " + "pathGraph.eps" os.system(cmd) cmd = "rm -f p2f*.pbm" os.system(cmd) def computeFZ(a,b,k,T): #compute forward partition function a = [a[0]] + a + [ a[-1] ] #pad a with repeat of first,last number b = [b[0]] + b + [ b[-1] ] #pad b with repeat of first,last number n = len(a); m = len(b) #WARNING: In contrast to sequence alignment, in time warping #we have D[(i,j)] = time warp distance between a[0],...,a[i] #and b[0],...,b[j] for all 0<=ii>n and 0>j>m BZ[(n-1,m-1)] = 1 for i in range(n-2,0,-1): BZ[(i,m-1)] = BZ[(i+1,m-1)]*math.exp(-TAUover2*distance(a[i],b[m-1])/(k*T)) for j in range(m-2,0,-1): BZ[(n-1,j)] = BZ[(n-1,j+1)]*math.exp(-MUover2*distance(a[n-1],b[j])/(k*T)) for i in range(n-2,0,-1): for j in range(m-2,0,-1): sum = math.exp( -TAUMUover2*distance(a[i],b[j])/(k*T) ) * BZ[(i+1,j+1)] sum += math.exp( -TAUover2*(distance(a[i],b[j])+distance(a[i],b[j-1]))/2 \ /(k*T) ) * BZ[(i+1,j)] sum += math.exp( -MUover2*(distance(a[i],b[j])+distance(a[i-1],b[j]))/2 \ /(k*T) ) * BZ[(i,j+1)] BZ[(i,j)] = sum return BZ def computePairAlignmentProb(A,FZ,BZ,a,b,k,T): n=len(a);m=len(b) #WARNING: n,m length of original, non-padded sequences a = [a[0]] + a + [ a[-1] ] #pad a with repeat of first,last number b = [b[0]] + b + [ b[-1] ] #pad b with repeat of first,last number Zforwards = FZ[(n,m)] Zbackwards = BZ[(1,1)] Z = Zforwards print "Z: %.20f" % Z if VERBOSE: print "FZ[(n,m)] = %f" % Zforwards print "BZ[(1,1)] = %f" % Zbackwards error = abs(Zforwards-Zbackwards) print "Error: " + str( error ) #relativeError = abs(Zforwards-Zbackwards)/Zforwards #print "Relative error: " + str( relativeError ) if DEBUG: print "\n------------------------------------" print "FZ" printMatrix(FZ) #keys = FZ.keys(); keys.sort(); print keys print "\n------------------------------------" print "BZ" printMatrix(BZ) #keys = BZ.keys(); keys.sort(); print keys #assert( relativeError1: print "i:%d\tj:%d\t%s\t%s" % (i,j,x,y) i+=1; j+=1 elif x== '-': score = (FZ[(i,j)] * math.exp(-MUover2* \ (distance(a[i],b[j+1])+distance(a[i+1],b[j+1]))/2/(k*T) ) * \ BZ[(i+1,j+2)])/Z assert (y==b[j+1]) if score>1: print "i:%d\tj:%d\t%s\t%s" % (i,j,x,y) Probs.append( score ) j+=1 elif y== '-': score = (FZ[(i,j)] * math.exp(-TAUover2* \ (distance(a[i+1],b[j])+distance(a[i+1],b[j+1]))/2/(k*T) ) * \ BZ[(i+2,j+1)])/Z assert (x==a[i+1]) if score>1: print "i:%d\tj:%d\t%s\t%s" % (i,j,x,y) Probs.append( score ) i+=1; return Probs def pathMatrix(a,b): #WARNING: In contrast to sequence alignment, in time warping #we have D[(i,j)] = time warp distance between a[0],...,a[i] #and b[0],...,b[j] for all 0<=i0 or j>0: print i,j,backArrowLists[(i,j)] return (D,backArrowLists) def backwardsPathMatrix(a,b): #WARNING: In contrast to sequence alignment, in time warping #we have E[(i,j)] = time warp distance between a[i],...,a[n-1] #and b[j],...,b[m-1] for all 0>i>n and 0>j>m a = [ a[0] ] + a + [ a[-1] ] b = [ b[0] ] + b + [ b[-1] ] #pad a,b with first,last number for base case treatment n = len(a); m = len(b) E = {}; forwardArrowLists = {} E[(n-1,m-1)] = 0 for j in range(m-2,0,-1): E[(n-1,j)] = E[(n-1,j+1)]+MUover2*distance(a[n-1],b[j]) forwardArrowLists[(n-1,j)] = [(n-1,j+1)] for i in range(n-2,0,-1): E[(i,m-1)] = E[(i+1,m-1)]+TAUover2*distance(a[i],b[m-1]) forwardArrowLists[(i,m-1)] = [(i+1,m-1)] for i in range(n-2,0,-1): for j in range(m-2,0,-1): #determine minimum min = INF x = E[(i+1,j+1)] + TAUMUover2*distance(a[i],b[j]) if x < min: min = x x = E[(i+1,j)]+TAUover2*(distance(a[i],b[j])+distance(a[i],b[j-1]))/2 if x < min: min = x x = E[(i,j+1)]+MUover2*(distance(a[i],b[j])+distance(a[i-1],b[j]))/2 if x < min: min = x E[(i,j)] = min #now determine back arrows forwardArrowLists[(i,j)] = [] x = E[(i+1,j+1)] + TAUMUover2*distance(a[i],b[j]) if x == min: forwardArrowLists[(i,j)].append( (i+1,j+1) ) x = E[(i+1,j)] + TAUover2*distance(a[i],b[j]) if x == min: forwardArrowLists[(i,j)].append( (i+1,j) ) x = E[(i,j+1)] + MUover2*distance(a[i],b[j]) if x == min: forwardArrowLists[(i,j)].append( (i,j+1) ) if DEBUG: print "Forward arrow lists" for i in range(n-1,0,-1): for j in range(m-1,0,-1): if i0 and y>0: #print backArrowLists[(x,y)] #x0,y0 = random.choice( backArrowLists[(x,y)] ) x0,y0 = backArrowLists[(x,y)][0] #pick first direction, which preferentially is diagonal if #that appears in the list if x0+1==x and y0+1==y: A.append( (a[x],b[y]) ) B.append( (a[x],b[y]) ) C.append( (1,1) ) x = x0; y = y0 elif x0==x and y0+1==y: A.append( ('-',b[y]) ) B.append( (a[x],b[y]) ) C.append( (0,1) ) y = y0 elif x0+1==x and y0==y: A.append( (a[x],'-') ) B.append( (a[x],b[y]) ) C.append( (1,0) ) x = x0 while x>0: A.append( (a[x],'-') ) B.append( (a[x],b[y]) ) C.append( (1,0) ) x = x-1 while y>0: A.append( ('-',b[y]) ) B.append( (a[x],b[y]) ) C.append( (0,1) ) y = y-1 A.reverse() B.reverse() C.reverse() path = [] xCoord = 0; yCoord = 0 #coordinates of points in path graph path.append( (xCoord,yCoord) ) #start at origin for (deltaX,deltaY) in C: xCoord += deltaX; yCoord += deltaY path.append( (xCoord,yCoord) ) return (A,B,path) def printAlignment( A ): print "\n-------------------------------------\n" aStar = ""; bStar = "" for (x,y) in A: if SEQUENCE: aStar = ""; bStar = "" for (x,y) in A: aStar += x; bStar += y else: if isNumber(x): chX = str(x)+"\t" else: chX = "-"+"\t" if isNumber(y): chY = str(y)+"\t" else: chY = "-"+"\t" aStar += chX; bStar += chY print aStar print bStar def printDirectionsInPathMatrix( X ): print "\n-------------------------------------\n" for (ch,n) in X: for i in range(n): sys.stdout.write(' ') print ch print if __name__ == '__main__': if len(sys.argv) < 4: print "Usage: %s -s/-f a b " % sys.argv[0] print """ 1) a is expression array entry sequence (-s option) or file of expression array entry (-f option) 2) b is expression array entry sequence (-s option) or file of expression array entry (-f option) """ sys.exit(1) flag = sys.argv[1] if flag == '-s': if SEQUENCE: a = list(sys.argv[2]) b = list(sys.argv[3]) else: a = sys.argv[2] b = sys.argv[3] else: if SEQUENCE: a = list(getSequence(sys.argv[2])) b = list(getSequence(sys.argv[3])) else: a = getNumericalSequence(sys.argv[2]) b = getNumericalSequence(sys.argv[3]) D,arrows = pathMatrix(a,b) A,B,path = align(a,b,D,arrows) E,arrows = backwardsPathMatrix(a,b) printAlignment( A ) printAlignment( B ) printPathGraph(path) print "hi" print D[(len(a),len(b))] print E[(1,1)] print "First sequence has len %d" % len(a) print a print "Second sequence has len %d" % len(b) print b assert ( abs( D[(len(a),len(b))] - E[(1,1)] ) < 0.001 ) FZ = computeFZ(a,b,k,T) BZ = computeBZ(a,b,k,T) Probs = computePairAlignmentProb(A,FZ,BZ,a,b,k,T) if SEQUENCE: printSequenceAndAlignment(A,a,b,Probs,LINELEN) else: printAlignment( A ) assert (len(A)==len(Probs)) for i in range(len(A)): (x,y) = A[i] print "%s\t%s\t%f" % (x,y,Probs[i]) if DEBUG: print "\n--------------------------------------" print "Matrix D\n" printMatrix(D) print "\n--------------------------------------" print "Matrix E\n" printMatrix(E)