Package Bio :: Package Motif :: Module _Motif
[hide private]
[frames] | no frames]

Source Code for Module Bio.Motif._Motif

  1  # Copyright 2003-2009 by Bartek Wilczynski.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5  """Implementation of sequence motifs (PRIVATE). 
  6  """ 
  7  from Bio.Seq import Seq 
  8  from Bio.SubsMat import FreqTable 
  9  from Bio.Alphabet import IUPAC 
 10  import math,random 
 11   
12 -class Motif(object):
13 """ 14 A class representing sequence motifs. 15 """
16 - def __init__(self,alphabet=IUPAC.unambiguous_dna):
17 self.instances = [] 18 self.has_instances=False 19 self.counts = {} 20 self.has_counts=False 21 self.mask = [] 22 self._pwm_is_current = False 23 self._pwm = [] 24 self._log_odds_is_current = False 25 self._log_odds = [] 26 self.alphabet=alphabet 27 self.length=None 28 self.background=dict(map(lambda n: (n,1.0/len(self.alphabet.letters)), self.alphabet.letters)) 29 self.beta=1.0 30 self.info=None 31 self.name=""
32
33 - def _check_length(self, len):
34 if self.length==None: 35 self.length = len 36 elif self.length != len: 37 print "len",self.length,self.instances, len 38 raise ValueError("You can't change the length of the motif")
39
40 - def _check_alphabet(self,alphabet):
41 if self.alphabet==None: 42 self.alphabet=alphabet 43 elif self.alphabet != alphabet: 44 raise ValueError("Wrong Alphabet")
45
46 - def add_instance(self,instance):
47 """ 48 adds new instance to the motif 49 """ 50 self._check_alphabet(instance.alphabet) 51 self._check_length(len(instance)) 52 if self.has_counts: 53 for i in range(self.length): 54 let=instance[i] 55 self.counts[let][i]+=1 56 57 if self.has_instances or not self.has_counts: 58 self.instances.append(instance) 59 self.has_instances=True 60 61 self._pwm_is_current = False 62 self._log_odds_is_current = False
63 64
65 - def set_mask(self,mask):
66 """ 67 sets the mask for the motif 68 69 The mask should be a string containing asterisks in the position of significant columns and spaces in other columns 70 """ 71 self._check_length(len(mask)) 72 self.mask=[] 73 for char in mask: 74 if char=="*": 75 self.mask.append(1) 76 elif char==" ": 77 self.mask.append(0) 78 else: 79 raise ValueError("Mask should contain only '*' or ' ' and not a '%s'"%char)
80
81 - def pwm(self,laplace=True):
82 """ 83 returns the PWM computed for the set of instances 84 85 if laplace=True (default), pseudocounts equal to self.background multiplied by self.beta are added to all positions. 86 """ 87 88 if self._pwm_is_current: 89 return self._pwm 90 #we need to compute new pwm 91 self._pwm = [] 92 for i in xrange(self.length): 93 dict = {} 94 #filling the dict with 0's 95 for letter in self.alphabet.letters: 96 if laplace: 97 dict[letter]=self.beta*self.background[letter] 98 else: 99 dict[letter]=0.0 100 if self.has_counts: 101 #taking the raw counts 102 for letter in self.alphabet.letters: 103 dict[letter]+=self.counts[letter][i] 104 elif self.has_instances: 105 #counting the occurences of letters in instances 106 for seq in self.instances: 107 #dict[seq[i]]=dict[seq[i]]+1 108 dict[seq[i]]+=1 109 self._pwm.append(FreqTable.FreqTable(dict,FreqTable.COUNT,self.alphabet)) 110 self._pwm_is_current=1 111 return self._pwm
112
113 - def log_odds(self,laplace=True):
114 """ 115 returns the logg odds matrix computed for the set of instances 116 """ 117 118 if self._log_odds_is_current: 119 return self._log_odds 120 #we need to compute new pwm 121 self._log_odds = [] 122 pwm=self.pwm(laplace) 123 for i in xrange(self.length): 124 d = {} 125 for a in self.alphabet.letters: 126 d[a]=math.log(pwm[i][a]/self.background[a],2) 127 self._log_odds.append(d) 128 self._log_odds_is_current=1 129 return self._log_odds
130
131 - def ic(self):
132 """Method returning the information content of a motif. 133 """ 134 res=0 135 pwm=self.pwm() 136 for i in range(self.length): 137 res+=2 138 for a in self.alphabet.letters: 139 if pwm[i][a]!=0: 140 res+=pwm[i][a]*math.log(pwm[i][a],2) 141 return res
142
143 - def exp_score(self,st_dev=False):
144 """ 145 Computes expected score of motif's instance and its standard deviation 146 """ 147 exs=0.0 148 var=0.0 149 pwm=self.pwm() 150 for i in range(self.length): 151 ex1=0.0 152 ex2=0.0 153 for a in self.alphabet.letters: 154 if pwm[i][a]!=0: 155 ex1+=pwm[i][a]*(math.log(pwm[i][a],2)-math.log(self.background[a],2)) 156 ex2+=pwm[i][a]*(math.log(pwm[i][a],2)-math.log(self.background[a],2))**2 157 exs+=ex1 158 var+=ex2-ex1**2 159 if st_dev: 160 return exs,math.sqrt(var) 161 else: 162 return exs
163
164 - def search_instances(self,sequence):
165 """ 166 a generator function, returning found positions of instances of the motif in a given sequence 167 """ 168 if not self.has_instances: 169 raise ValueError ("This motif has no instances") 170 for pos in xrange(0,len(sequence)-self.length+1): 171 for instance in self.instances: 172 if instance.tostring()==sequence[pos:pos+self.length].tostring(): 173 yield(pos,instance) 174 break # no other instance will fit (we don't want to return multiple hits)
175
176 - def score_hit(self,sequence,position,normalized=0,masked=0):
177 """ 178 give the pwm score for a given position 179 """ 180 lo=self.log_odds() 181 score = 0.0 182 for pos in xrange(self.length): 183 a = sequence[position+pos] 184 if not masked or self.mask[pos]: 185 try: 186 score += lo[pos][a] 187 except: 188 pass 189 if normalized: 190 if not masked: 191 score/=self.length 192 else: 193 score/=len(filter(lambda x: x, self.mask)) 194 return score
195
196 - def search_pwm(self,sequence,normalized=0,masked=0,threshold=0.0,both=True):
197 """ 198 a generator function, returning found hits in a given sequence with the pwm score higher than the threshold 199 """ 200 if both: 201 rc = self.reverse_complement() 202 203 sequence=sequence.tostring().upper() 204 for pos in xrange(0,len(sequence)-self.length+1): 205 score = self.score_hit(sequence,pos,normalized,masked) 206 if score > threshold: 207 yield (pos,score) 208 if both: 209 rev_score = rc.score_hit(sequence,pos,normalized,masked) 210 if rev_score > threshold: 211 yield (-pos,rev_score)
212
213 - def dist_pearson(self, motif, masked = 0):
214 """ 215 return the similarity score based on pearson correlation for the given motif against self. 216 217 We use the Pearson's correlation of the respective probabilities. 218 """ 219 220 if self.alphabet != motif.alphabet: 221 raise ValueError("Cannot compare motifs with different alphabets") 222 223 max_p=-2 224 for offset in range(-self.length+1,motif.length): 225 if offset<0: 226 p = self.dist_pearson_at(motif,-offset) 227 else: #offset>=0 228 p = motif.dist_pearson_at(self,offset) 229 230 if max_p<p: 231 max_p=p 232 max_o=-offset 233 return 1-max_p,max_o
234
235 - def dist_pearson_at(self,motif,offset):
236 sxx = 0 # \sum x^2 237 sxy = 0 # \sum x \cdot y 238 sx = 0 # \sum x 239 sy = 0 # \sum y 240 syy = 0 # \sum x^2 241 norm=max(self.length,offset+motif.length) 242 243 for pos in range(max(self.length,offset+motif.length)): 244 for l in self.alphabet.letters: 245 xi = self[pos][l] 246 yi = motif[pos-offset][l] 247 sx = sx + xi 248 sy = sy + yi 249 sxx = sxx + xi * xi 250 syy = syy + yi * yi 251 sxy = sxy + xi * yi 252 253 norm *= len(self.alphabet.letters) 254 s1 = (sxy - sx*sy*1.0/norm) 255 s2 = (norm*sxx - sx*sx*1.0)*(norm*syy- sy*sy*1.0) 256 return s1/math.sqrt(s2)
257
258 - def dist_product(self,other):
259 """ 260 A similarity measure taking into account a product probability of generating overlaping instances of two motifs 261 """ 262 max_p=0.0 263 for offset in range(-self.length+1,other.length): 264 if offset<0: 265 p = self.dist_product_at(other,-offset) 266 else: #offset>=0 267 p = other.dist_product_at(self,offset) 268 if max_p<p: 269 max_p=p 270 max_o=-offset 271 return 1-max_p/self.dist_product_at(self,0),max_o
272
273 - def dist_product_at(self,other,offset):
274 s=0 275 for i in range(max(self.length,offset+other.length)): 276 f1=self[i] 277 f2=other[i-offset] 278 for n,b in self.background.items(): 279 s+=b*f1[n]*f2[n] 280 return s/i
281
282 - def dist_dpq(self,other):
283 r"""Calculates the DPQ distance measure between motifs. 284 285 It is calculated as a maximal value of DPQ formula (shown using LaTeX 286 markup, familiar to mathematicians): 287 288 \sqrt{\sum_{i=1}^{alignment.len()} \sum_{k=1}^alphabet.len() \ 289 \{ m1[i].freq(alphabet[k])*log_2(m1[i].freq(alphabet[k])/m2[i].freq(alphabet[k])) + 290 m2[i].freq(alphabet[k])*log_2(m2[i].freq(alphabet[k])/m1[i].freq(alphabet[k])) 291 } 292 293 over possible non-spaced alignemts of two motifs. See this reference: 294 295 D. M Endres and J. E Schindelin, "A new metric for probability 296 distributions", IEEE transactions on Information Theory 49, no. 7 297 (July 2003): 1858-1860. 298 """ 299 300 min_d=float("inf") 301 min_o=-1 302 d_s=[] 303 for offset in range(-self.length+1,other.length): 304 #print "%2.3d"%offset, 305 if offset<0: 306 d = self.dist_dpq_at(other,-offset) 307 overlap = self.length+offset 308 else: #offset>=0 309 d = other.dist_dpq_at(self,offset) 310 overlap = other.length-offset 311 overlap = min(self.length,other.length,overlap) 312 out = self.length+other.length-2*overlap 313 #print d,1.0*(overlap+out)/overlap,d*(overlap+out)/overlap 314 #d = d/(2*overlap) 315 d = (d/(out+overlap))*(2*overlap+out)/(2*overlap) 316 #print d 317 d_s.append((offset,d)) 318 if min_d> d: 319 min_d=d 320 min_o=-offset 321 return min_d,min_o#,d_s
322
323 - def dist_dpq_at(self,other,offset):
324 """ 325 calculates the dist_dpq measure with a given offset. 326 327 offset should satisfy 0<=offset<=self.length 328 """ 329 def dpq (f1,f2,alpha): 330 s=0 331 for n in alpha.letters: 332 avg=(f1[n]+f2[n])/2 333 s+=f1[n]*math.log(f1[n]/avg,2)+f2[n]*math.log(f2[n]/avg,2) 334 return math.sqrt(s)
335 336 s=0 337 for i in range(max(self.length,offset+other.length)): 338 f1=self[i] 339 f2=other[i-offset] 340 s+=dpq(f1,f2,self.alphabet) 341 return s
342
343 - def _read(self,stream):
344 """Reads the motif from the stream (in AlignAce format). 345 346 the self.alphabet variable must be set beforehand. 347 If the last line contains asterisks it is used for setting mask 348 """ 349 350 while 1: 351 ln = stream.readline() 352 if "*" in ln: 353 self.set_mask(ln.strip("\n\c")) 354 break 355 self.add_instance(Seq(ln.strip(),self.alphabet))
356
357 - def __str__(self,masked=False):
358 """ string representation of a motif. 359 """ 360 str = "" 361 for inst in self.instances: 362 str = str + inst.tostring() + "\n" 363 364 if masked: 365 for i in xrange(self.length): 366 if self.mask[i]: 367 str = str + "*" 368 else: 369 str = str + " " 370 str = str + "\n" 371 return str
372
373 - def __len__(self):
374 """return the length of a motif 375 """ 376 if self.length==None: 377 return 0 378 else: 379 return self.length
380
381 - def _write(self,stream):
382 """ 383 writes the motif to the stream 384 """ 385 386 stream.write(self.__str__())
387 388 389
390 - def _to_fasta(self):
391 """ 392 FASTA representation of motif 393 """ 394 if not self.has_instances: 395 self.make_instances_from_counts() 396 str = "" 397 for i,inst in enumerate(self.instances): 398 str = str + "> instance %d\n"%i + inst.tostring() + "\n" 399 400 return str
401
402 - def reverse_complement(self):
403 """ 404 Gives the reverse complement of the motif 405 """ 406 res = Motif() 407 if self.has_instances: 408 for i in self.instances: 409 res.add_instance(i.reverse_complement()) 410 else: # has counts 411 res.has_counts=True 412 res.counts["A"]=self.counts["T"][:] 413 res.counts["T"]=self.counts["A"][:] 414 res.counts["G"]=self.counts["C"][:] 415 res.counts["C"]=self.counts["G"][:] 416 res.counts["A"].reverse() 417 res.counts["C"].reverse() 418 res.counts["G"].reverse() 419 res.counts["T"].reverse() 420 res.length=self.length 421 res.mask = self.mask 422 return res
423 424
425 - def _from_jaspar_pfm(self,stream,make_instances=False):
426 """ 427 reads the motif from Jaspar .pfm file 428 429 The instances are fake, but the pwm is accurate. 430 """ 431 return self._from_horiz_matrix(stream,letters="ACGT",make_instances=make_instances)
432
433 - def _from_vert_matrix(self,stream,letters=None,make_instances=False):
434 """reads a vertical count matrix from stream and fill in the counts. 435 """ 436 437 self.counts = {} 438 self.has_counts=True 439 if letters==None: 440 letters=self.alphabet.letters 441 self.length=0 442 for i in letters: 443 self.counts[i]=[] 444 for ln in stream.readlines(): 445 rec=map(float,ln.strip().split()) 446 for k,v in zip(letters,rec): 447 self.counts[k].append(v) 448 self.length+=1 449 self.set_mask("*"*self.length) 450 if make_instances==True: 451 self.make_instances_from_counts() 452 return self
453
454 - def _from_horiz_matrix(self,stream,letters=None,make_instances=False):
455 """reads a horizontal count matrix from stream and fill in the counts. 456 """ 457 if letters==None: 458 letters=self.alphabet.letters 459 self.counts = {} 460 self.has_counts=True 461 462 for i in letters: 463 ln = stream.readline().strip().split() 464 #if there is a letter in the beginning, ignore it 465 if ln[0]==i: 466 ln=ln[1:] 467 #print ln 468 try: 469 self.counts[i]=map(int,ln) 470 except ValueError: #not integers 471 self.counts[i]=map(float,ln) #map(lambda s: int(100*float(s)),ln) 472 #print counts[i] 473 474 s = sum(map(lambda nuc: self.counts[nuc][0],letters)) 475 #print "sum", s 476 l = len(self.counts[letters[0]]) 477 self.length=l 478 self.set_mask("*"*l) 479 if make_instances==True: 480 self.make_instances_from_counts() 481 return self
482 483
484 - def make_instances_from_counts(self):
485 """Creates "fake" instances for a motif created from a count matrix. 486 487 In case the sums of counts are different for different columnes, the shorter columns are padded with background. 488 """ 489 alpha="".join(self.alphabet.letters) 490 #col[i] is a column taken from aligned motif instances 491 col=[] 492 self.has_instances=True 493 self.instances=[] 494 s = sum(map(lambda nuc: self.counts[nuc][0],self.alphabet.letters)) 495 for i in range(self.length): 496 col.append("") 497 for n in self.alphabet.letters: 498 col[i] = col[i]+ (n*(self.counts[n][i])) 499 if len(col[i])<s: 500 print "WARNING, column too short",len(col[i]),s 501 col[i]+=(alpha*s)[:(s-len(col[i]))] 502 #print i,col[i] 503 #iterate over instances 504 for i in range(s): 505 inst="" #start with empty seq 506 for j in range(self.length): #iterate over positions 507 inst+=col[j][i] 508 #print i,inst 509 inst=Seq(inst,self.alphabet) 510 self.add_instance(inst) 511 return self.instances
512
513 - def make_counts_from_instances(self):
514 """Creates the count matrix for a motif with instances. 515 516 """ 517 #make strings for "columns" of motifs 518 #col[i] is a column taken from aligned motif instances 519 counts={} 520 for a in self.alphabet.letters: 521 counts[a]=[] 522 self.has_counts=True 523 s = len(self.instances) 524 for i in range(self.length): 525 ci = dict(map(lambda a: (a,0),self.alphabet.letters)) 526 for inst in self.instances: 527 ci[inst[i]]+=1 528 for a in self.alphabet.letters: 529 counts[a].append(ci[a]) 530 self.counts=counts 531 return counts
532
533 - def _from_jaspar_sites(self,stream):
534 """ 535 reads the motif from Jaspar .sites file 536 537 The instances and pwm are OK. 538 """ 539 540 while True: 541 ln = stream.readline()# read the header "$>...." 542 if ln=="" or ln[0]!=">": 543 break 544 545 ln=stream.readline().strip()#read the actual sequence 546 i=0 547 while ln[i]==ln[i].lower(): 548 i+=1 549 inst="" 550 while i<len(ln) and ln[i]==ln[i].upper(): 551 inst+=ln[i] 552 i+=1 553 inst=Seq(inst,self.alphabet) 554 self.add_instance(inst) 555 556 self.set_mask("*"*len(inst)) 557 return self
558 559
560 - def __getitem__(self,index):
561 """Returns the probability distribution over symbols at a given position, padding with background. 562 563 If the requested index is out of bounds, the returned distribution comes from background. 564 """ 565 if index in range(self.length): 566 return self.pwm()[index] 567 else: 568 return self.background
569
570 - def consensus(self):
571 """Returns the consensus sequence of a motif. 572 """ 573 res="" 574 for i in range(self.length): 575 max_f=0 576 max_n="X" 577 for n in sorted(self[i]): 578 if self[i][n]>max_f: 579 max_f=self[i][n] 580 max_n=n 581 res+=max_n 582 return Seq(res,self.alphabet)
583
584 - def anticonsensus(self):
585 """returns the least probable pattern to be generated from this motif. 586 """ 587 res="" 588 for i in range(self.length): 589 min_f=10.0 590 min_n="X" 591 for n in sorted(self[i]): 592 if self[i][n]<min_f: 593 min_f=self[i][n] 594 min_n=n 595 res+=min_n 596 return Seq(res,self.alphabet)
597
598 - def max_score(self):
599 """Maximal possible score for this motif. 600 601 returns the score computed for the consensus sequence. 602 """ 603 return self.score_hit(self.consensus(),0)
604
605 - def min_score(self):
606 """Minimal possible score for this motif. 607 608 returns the score computed for the anticonsensus sequence. 609 """ 610 return self.score_hit(self.anticonsensus(),0)
611
612 - def weblogo(self,fname,format="PNG",**kwds):
613 """ 614 uses the Berkeley weblogo service to download and save a weblogo of itself 615 616 requires an internet connection. 617 The parameters from **kwds are passed directly to the weblogo server. 618 """ 619 import urllib 620 import urllib2 621 al= self._to_fasta() 622 url = 'http://weblogo.berkeley.edu/logo.cgi' 623 values = {'sequence' : al, 624 'format' : format, 625 'logowidth' : '18', 626 'logoheight' : '5', 627 'logounits' : 'cm', 628 'kind' : 'AUTO', 629 'firstnum' : "1", 630 'command' : 'Create Logo', 631 'smallsamplecorrection' : "on", 632 'symbolsperline' : 32, 633 'res' : '96', 634 'res_units' : 'ppi', 635 'antialias' : 'on', 636 'title' : '', 637 'barbits' : '', 638 'xaxis': 'on', 639 'xaxis_label' : '', 640 'yaxis': 'on', 641 'yaxis_label' : '', 642 'showends' : 'on', 643 'shrink' : '0.5', 644 'fineprint' : 'on', 645 'ticbits' : '1', 646 'colorscheme' : 'DEFAULT', 647 'color1' : 'green', 648 'color2' : 'blue', 649 'color3' : 'red', 650 'color4' : 'black', 651 'color5' : 'purple', 652 'color6' : 'orange', 653 'color1' : 'black', 654 } 655 for k,v in kwds.items(): 656 values[k]=str(v) 657 658 data = urllib.urlencode(values) 659 req = urllib2.Request(url, data) 660 response = urllib2.urlopen(req) 661 f=open(fname,"w") 662 im=response.read() 663 664 f.write(im) 665 f.close()
666 667
668 - def _to_transfac(self):
669 """Write the representation of a motif in TRANSFAC format 670 """ 671 res="XX\nTY Motif\n" #header 672 try: 673 res+="ID %s\n"%self.name 674 except: 675 pass 676 res+="BF undef\nP0" 677 for a in self.alphabet.letters: 678 res+=" %s"%a 679 res+="\n" 680 if not self.has_counts: 681 self.make_counts_from_instances() 682 for i in range(self.length): 683 if i<9: 684 res+="0%d"%(i+1) 685 else: 686 res+="%d"%(i+1) 687 for a in self.alphabet.letters: 688 res+=" %d"%self.counts[a][i] 689 res+="\n" 690 res+="XX\n" 691 return res
692
693 - def _to_vertical_matrix(self,letters=None):
694 """Return string representation of the motif as a matrix. 695 696 """ 697 if letters==None: 698 letters=self.alphabet.letters 699 self._pwm_is_current=False 700 pwm=self.pwm(laplace=False) 701 res="" 702 for i in range(self.length): 703 res+="\t".join([str(pwm[i][a]) for a in letters]) 704 res+="\n" 705 return res
706
707 - def _to_horizontal_matrix(self,letters=None,normalized=True):
708 """Return string representation of the motif as a matrix. 709 710 """ 711 if letters==None: 712 letters=self.alphabet.letters 713 res="" 714 if normalized: #output PWM 715 self._pwm_is_current=False 716 mat=self.pwm(laplace=False) 717 for a in letters: 718 res+="\t".join([str(mat[i][a]) for i in range(self.length)]) 719 res+="\n" 720 else: #output counts 721 if not self.has_counts: 722 self.make_counts_from_instances() 723 mat=self.counts 724 for a in letters: 725 res+="\t".join([str(mat[a][i]) for i in range(self.length)]) 726 res+="\n" 727 return res
728
729 - def _to_jaspar_pfm(self):
730 """Returns the pfm representation of the motif 731 """ 732 return self._to_horizontal_matrix(normalized=False,letters="ACGT")
733
734 - def format(self,format):
735 """Returns a string representation of the Motif in a given format 736 737 Currently supported fromats: 738 - jaspar-pfm : JASPAR Position Frequency Matrix 739 - transfac : TRANSFAC like files 740 - fasta : FASTA file with instances 741 """ 742 743 formatters={ 744 "jaspar-pfm": self._to_jaspar_pfm, 745 "transfac": self._to_transfac, 746 "fasta" : self._to_fasta, 747 } 748 749 try: 750 return formatters[format]() 751 except KeyError: 752 raise ValueError("Wrong format type")
753
754 - def scanPWM(self,seq):
755 """ 756 scans (using a fast C extension) a nucleotide sequence and returns the matrix of log-odds scores for all positions 757 758 - the result is a one-dimensional numpy array 759 - the sequence can only be a DNA sequence 760 - the search is performed only on one strand 761 """ 762 if self.alphabet!=IUPAC.unambiguous_dna: 763 raise ValueError("Wrong alphabet! Use only with DNA motifs") 764 if seq.alphabet!=IUPAC.unambiguous_dna: 765 raise ValueError("Wrong alphabet! Use only with DNA sequences") 766 767 768 import numpy 769 # get the log-odds matrix into a proper shape (each column contains sorted (ACGT) log-odds values) 770 logodds=numpy.array([map(lambda x: x[1],sorted(x.items())) for x in self.log_odds()]).transpose() 771 772 import _pwm 773 774 return _pwm.calculate(seq.tostring(),logodds)
775