#! /usr/bin/env python
"""
Save and load MinHash sketches in a JSON format, along with some metadata.
"""
from __future__ import print_function
import hashlib
import gzip
import bz2file
import io
import sys
from . import signature_json
from .logging import error
SIGNATURE_VERSION=0.4
[docs]class SourmashSignature(object):
"Main class for signature information."
def __init__(self, minhash, name='', filename=''):
self.d = {}
self.d['class'] = 'sourmash_signature'
if name:
self.d['name'] = name
if filename:
self.d['filename'] = filename
self.minhash = minhash
self.d['license'] = 'CC0'
def __hash__(self):
return hash(self.md5sum())
def __str__(self):
name = self.name()
md5pref = self.md5sum()[:8]
if name != md5pref:
return "SourmashSignature('{}', {})".format(name, md5pref)
return "SourmashSignature({})".format(md5pref)
__repr__ = __str__
[docs] def md5sum(self):
"Calculate md5 hash of the bottom sketch, specifically."
m = hashlib.md5()
m.update(str(self.minhash.ksize).encode('ascii'))
for k in self.minhash.get_mins():
m.update(str(k).encode('utf-8'))
return m.hexdigest()
def __eq__(self, other):
allkeys = set(self.d.keys()).union(set(other.d.keys()))
for k in allkeys:
if self.d.get(k) != other.d.get(k):
return False
return self.minhash == other.minhash
def __ne__(self, other):
return not self == other
[docs] def name(self):
"Return as nice a name as possible, defaulting to md5 prefix."
if 'name' in self.d:
return self.d.get('name')
elif 'filename' in self.d:
return self.d.get('filename')
else:
return self.md5sum()[:8]
def _display_name(self, max_length):
if 'name' in self.d:
name = self.d['name']
if len(name) > max_length:
name = name[:max_length - 3] + '...'
elif 'filename' in self.d:
name = self.d['filename']
if len(name) > max_length:
name = '...' + name[-max_length + 3:]
else:
name = self.md5sum()[:8]
assert len(name) <= max_length
return name
def _save(self):
"Return metadata and a dictionary containing the sketch info."
e = dict(self.d)
minhash = self.minhash
sketch = {}
sketch['ksize'] = int(minhash.ksize)
sketch['num'] = minhash.num
sketch['max_hash'] = minhash.max_hash
sketch['seed'] = int(minhash.seed)
if self.minhash.track_abundance:
values = minhash.get_mins(with_abundance=True)
sketch['mins'] = list(map(int, values.keys()))
sketch['abundances'] = list(map(int, values.values()))
else:
sketch['mins'] = list(map(int, minhash.get_mins()))
sketch['md5sum'] = self.md5sum()
if minhash.is_protein and not minhash.dayhoff and not minhash.hp:
sketch['molecule'] = 'protein'
elif minhash.dayhoff:
sketch['molecule'] = 'dayhoff'
elif minhash.hp:
sketch['molecule'] = 'hp'
else:
sketch['molecule'] = 'DNA'
e['signature'] = sketch
return self.d.get('name'), self.d.get('filename'), sketch
[docs] def similarity(self, other, ignore_abundance=False, downsample=False):
"Compute similarity with the other MinHash signature."
try:
return self.minhash.similarity(other.minhash, ignore_abundance)
except ValueError as e:
if 'mismatch in max_hash' in str(e) and downsample:
xx = self.minhash.downsample_max_hash(other.minhash)
yy = other.minhash.downsample_max_hash(self.minhash)
return xx.similarity(yy, ignore_abundance)
else:
raise
[docs] def jaccard(self, other):
"Compute Jaccard similarity with the other MinHash signature."
return self.minhash.similarity(other.minhash, True)
[docs] def contained_by(self, other, downsample=False):
"Compute containment by the other signature. Note: ignores abundance."
try:
return self.minhash.contained_by(other.minhash)
except ValueError as e:
if 'mismatch in max_hash' in str(e) and downsample:
xx = self.minhash.downsample_max_hash(other.minhash)
yy = other.minhash.downsample_max_hash(self.minhash)
return xx.contained_by(yy)
else:
raise
def _guess_open(filename):
"""
Make a best-effort guess as to how to parse the given sequence file.
Handles '-' as shortcut for stdin.
Deals with .gz and .bz2 as well as plain text.
"""
magic_dict = {
b"\x1f\x8b\x08": "gz",
b"\x42\x5a\x68": "bz2",
} # Inspired by http://stackoverflow.com/a/13044946/1585509
if filename == '-':
filename = '/dev/stdin'
bufferedfile = io.open(file=filename, mode='rb', buffering=8192)
num_bytes_to_peek = max(len(x) for x in magic_dict)
file_start = bufferedfile.peek(num_bytes_to_peek)
compression = None
for magic, ftype in magic_dict.items():
if file_start.startswith(magic):
compression = ftype
break
if compression is 'bz2':
sigfile = bz2file.BZ2File(filename=bufferedfile)
elif compression is 'gz':
if not bufferedfile.seekable():
bufferedfile.close()
raise ValueError("gziped data not streamable, pipe through zcat \
first")
sigfile = gzip.GzipFile(filename=filename)
else:
sigfile = bufferedfile
return sigfile
[docs]def load_signatures(data, ksize=None, select_moltype=None,
ignore_md5sum=False, do_raise=False, quiet=False):
"""Load a JSON string with signatures into classes.
Returns list of SourmashSignature objects.
Note, the order is not necessarily the same as what is in the source file.
"""
if ksize:
ksize = int(ksize)
if not data:
return
is_fp = False
if hasattr(data, 'find') and data.find('sourmash_signature') == -1: # filename
done = False
try: # is it a file handle?
data.read
is_fp = True
done = True
except AttributeError:
pass
# not a file handle - treat it like a filename.
if not done:
try:
data = _guess_open(data)
is_fp = True
except OSError as excinfo:
if not quiet: error(str(excinfo))
if do_raise:
raise
return
else: # file-like
if hasattr(data, 'mode'): # file handler
if 't' in data.mode: # need to reopen handler as binary
if sys.version_info >= (3, ):
data = data.buffer
try:
# JSON format
for sig in signature_json.load_signatures_json(data,
ignore_md5sum=ignore_md5sum):
if not ksize or ksize == sig.minhash.ksize:
if not select_moltype or \
sig.minhash.is_molecule_type(select_moltype):
yield sig
except Exception as e:
if not quiet:
error("Error in parsing signature; quitting.")
error("Exception: {}", str(e))
if do_raise:
raise
finally:
if is_fp:
data.close()
def load_one_signature(data, ksize=None, select_moltype=None,
ignore_md5sum=False):
sigiter = load_signatures(data, ksize=ksize,
select_moltype=select_moltype,
ignore_md5sum=ignore_md5sum)
try:
first_sig = next(sigiter)
except StopIteration:
raise ValueError("no signatures to load")
try:
next(sigiter)
except StopIteration:
return first_sig
raise ValueError("expected to load exactly one signature")
[docs]def save_signatures(siglist, fp=None):
"Save multiple signatures into a JSON string (or into file handle 'fp')"
return signature_json.save_signatures_json(siglist, fp)