Source code for sourmash.signature

#! /usr/bin/env python
"""
Save and load MinHash sketches in a JSON format, along with some metadata.
"""
from __future__ import print_function

import sys
import os
import weakref

from .logging import error
from . import MinHash
from ._minhash import to_bytes
from ._lowlevel import ffi, lib
from .utils import RustObject, rustcall, decode_str


SIGNATURE_VERSION = 0.4


[docs]class SourmashSignature(RustObject): "Main class for signature information." __dealloc_func__ = lib.signature_free def __init__(self, minhash, name="", filename=""): self._objptr = lib.signature_new() if name: self._name = name if filename: self.filename = filename self.minhash = minhash @property def minhash(self): return MinHash._from_objptr( self._methodcall(lib.signature_first_mh) ) @minhash.setter def minhash(self, value): # TODO: validate value is a MinHash self._methodcall(lib.signature_set_mh, value._objptr) def __hash__(self): return hash(self.md5sum()) def __str__(self): name = self.name() md5pref = self.md5sum()[:8] if name != md5pref: return "SourmashSignature('{}', {})".format(name, md5pref) return "SourmashSignature({})".format(md5pref) __repr__ = __str__ #def minhashes(self): # size = ffi.new("uintptr_t *") # mhs_ptr = self._methodcall(lib.signature_get_mhs, size) # size = ffi.unpack(size, 1)[0] # # mhs = [] # for i in range(size): # mh = MinHash._from_objptr(mhs_ptr[i]) # mhs.append(mh) # # return mhs
[docs] def md5sum(self): "Calculate md5 hash of the bottom sketch, specifically." return decode_str(self.minhash._methodcall(lib.kmerminhash_md5sum), free=True)
def __eq__(self, other): return self._methodcall(lib.signature_eq, other._objptr) @property def _name(self): return decode_str(self._methodcall(lib.signature_get_name), free=True) @_name.setter def _name(self, value): self._methodcall(lib.signature_set_name, to_bytes(value)) def __ne__(self, other): return not self == other
[docs] def name(self): "Return as nice a name as possible, defaulting to md5 prefix." name = self._name filename = self.filename if name: return name elif filename: return filename else: return self.md5sum()[:8]
@property def filename(self): return decode_str(self._methodcall(lib.signature_get_filename), free=True) @filename.setter def filename(self, value): self._methodcall(lib.signature_set_filename, to_bytes(value)) @property def license(self): return decode_str(self._methodcall(lib.signature_get_license), free=True) def _display_name(self, max_length): name = self._name filename = self.filename if name: if len(name) > max_length: name = name[: max_length - 3] + "..." elif filename: name = filename if len(name) > max_length: name = "..." + name[-max_length + 3 :] else: name = self.md5sum()[:8] assert len(name) <= max_length return name
[docs] def similarity(self, other, ignore_abundance=False, downsample=False): "Compute similarity with the other signature." return self.minhash.similarity(other.minhash, ignore_abundance=ignore_abundance, downsample=downsample)
[docs] def jaccard(self, other): "Compute Jaccard similarity with the other MinHash signature." return self.minhash.similarity(other.minhash, ignore_abundance=True, downsample=False)
[docs] def contained_by(self, other, downsample=False): "Compute containment by the other signature. Note: ignores abundance." return self.minhash.contained_by(other.minhash, downsample)
def add_sequence(self, sequence, force=False): self._methodcall(lib.signature_add_sequence, to_bytes(sequence), force) def add_protein(self, sequence): self._methodcall(lib.signature_add_protein, to_bytes(sequence)) @staticmethod def from_params(params): ptr = rustcall(lib.signature_from_params, params._get_objptr()) return SourmashSignature._from_objptr(ptr) def __len__(self): return self._methodcall(lib.signature_len) def __getstate__(self): # enable pickling return ( self.minhash, self._name, self.filename, ) def __setstate__(self, tup): (mh, name, filename) = tup self.__del__() self._objptr = lib.signature_new() if name: self._name = name if filename: self.filename = filename self.minhash = mh def __reduce__(self): return ( SourmashSignature, ( self.minhash, self._name, self.filename ), )
[docs]def load_signatures( data, ksize=None, select_moltype=None, ignore_md5sum=False, do_raise=False, quiet=False ): """Load a JSON string with signatures into classes. Returns list of SourmashSignature objects. Note, the order is not necessarily the same as what is in the source file. """ if ksize: ksize = int(ksize) if not data: return is_fp = False is_filename = False is_fobj = False if hasattr(data, "fileno"): is_fp = True elif os.path.exists(data): # filename is_filename = True elif hasattr(data, "mode"): # file object-like is_fobj = True if "t" in data.mode: # need to reopen handler as binary if sys.version_info >= (3,): data = data.buffer elif hasattr(data, "find") and data.find("sourmash_signature") > 0: # json string containing the data if hasattr(data, "encode"): data = data.encode("utf-8") else: if do_raise: raise ValueError("Can't parse data. No such file or invalid data.") return if ksize is None: ksize = 0 if select_moltype is None: select_moltype = ffi.NULL else: try: select_moltype = select_moltype.encode("utf-8") except AttributeError: pass size = ffi.new("uintptr_t *") try: # JSON format if is_filename: sigs_ptr = rustcall( lib.signatures_load_path, data.encode("utf-8"), ignore_md5sum, ksize, select_moltype, size, ) else: if is_fp or is_fobj: # TODO: we still can't pass a file-like object to rust... try: buf = data.read() is_fp = False is_fobj = False data.close() data = buf except AttributeError: pass if hasattr(data, "encode"): data = data.encode("utf-8") # TODO: use ffi.cast in the future? # fp_c = ffi.cast("FILE *", data) # sigs_ptr = rustcall(lib.signatures_load_file, fp_c, ignore_md5sum, size) sigs_ptr = rustcall( lib.signatures_load_buffer, data, len(data), ignore_md5sum, ksize, select_moltype, size, ) size = ffi.unpack(size, 1)[0] sigs = [] for i in range(size): sig = SourmashSignature._from_objptr(sigs_ptr[i]) sigs.append(sig) for sig in sigs: yield sig except Exception as e: if not quiet: error("Error in parsing signature; quitting.") error("Exception: {}", str(e)) if do_raise: raise finally: if is_fp or is_fobj: data.close()
def load_one_signature(data, ksize=None, select_moltype=None, ignore_md5sum=False): sigiter = load_signatures( data, ksize=ksize, select_moltype=select_moltype, ignore_md5sum=ignore_md5sum ) try: first_sig = next(sigiter) except StopIteration: raise ValueError("no signatures to load") try: next(sigiter) except StopIteration: return first_sig raise ValueError("expected to load exactly one signature")
[docs]def save_signatures(siglist, fp=None): "Save multiple signatures into a JSON string (or into file handle 'fp')" attached_refs = weakref.WeakKeyDictionary() collected = [] for obj in siglist: rv = obj._get_objptr() attached_refs[rv] = obj collected.append(rv) siglist_c = ffi.new("Signature*[]", collected) if fp is None: buf = rustcall(lib.signatures_save_buffer, siglist_c, len(collected)) return decode_str(buf, free=True) else: # fp_c = ffi.cast("FILE *", fp) # buf = rustcall(lib.signatures_save_file, siglist_c, len(collected), fp_c) buf = rustcall(lib.signatures_save_buffer, siglist_c, len(collected)) result = decode_str(buf, free=True) try: fp.write(result) except TypeError: fp.write(result.encode('utf-8')) return None