#! /usr/bin/env python
"""
Save and load MinHash sketches in a JSON format, along with some metadata.
"""
from __future__ import print_function
import sys
import os
import weakref
from enum import Enum
from .logging import error
from . import MinHash
from ._minhash import to_bytes
from ._lowlevel import ffi, lib
from .utils import RustObject, rustcall, decode_str
from ._compat import PY2
SIGNATURE_VERSION = 0.4
[docs]class SourmashSignature(RustObject):
"Main class for signature information."
__dealloc_func__ = lib.signature_free
def __init__(self, minhash, name="", filename=""):
self._objptr = lib.signature_new()
if name:
self._name = name
if filename:
self.filename = filename
self.minhash = minhash
@property
def minhash(self):
return MinHash._from_objptr(
self._methodcall(lib.signature_first_mh)
)
@minhash.setter
def minhash(self, value):
# TODO: validate value is a MinHash
self._methodcall(lib.signature_set_mh, value._objptr)
def __hash__(self):
return hash(self.md5sum())
def __str__(self):
name = self.name()
md5pref = self.md5sum()[:8]
if name != md5pref:
return "SourmashSignature('{}', {})".format(name, md5pref)
return "SourmashSignature({})".format(md5pref)
__repr__ = __str__
#def minhashes(self):
# size = ffi.new("uintptr_t *")
# mhs_ptr = self._methodcall(lib.signature_get_mhs, size)
# size = ffi.unpack(size, 1)[0]
#
# mhs = []
# for i in range(size):
# mh = MinHash._from_objptr(mhs_ptr[i])
# mhs.append(mh)
#
# return mhs
[docs] def md5sum(self):
"Calculate md5 hash of the bottom sketch, specifically."
return decode_str(self.minhash._methodcall(lib.kmerminhash_md5sum), free=True)
def __eq__(self, other):
return self._methodcall(lib.signature_eq, other._objptr)
@property
def _name(self):
return decode_str(self._methodcall(lib.signature_get_name), free=True)
@_name.setter
def _name(self, value):
self._methodcall(lib.signature_set_name, to_bytes(value))
def __ne__(self, other):
return not self == other
[docs] def name(self):
"Return as nice a name as possible, defaulting to md5 prefix."
name = self._name
filename = self.filename
if name:
return name
elif filename:
return filename
else:
return self.md5sum()[:8]
@property
def filename(self):
return decode_str(self._methodcall(lib.signature_get_filename), free=True)
@filename.setter
def filename(self, value):
self._methodcall(lib.signature_set_filename, to_bytes(value))
@property
def license(self):
return decode_str(self._methodcall(lib.signature_get_license), free=True)
def _display_name(self, max_length):
name = self._name
filename = self.filename
if name:
if len(name) > max_length:
name = name[: max_length - 3] + "..."
elif filename:
name = filename
if len(name) > max_length:
name = "..." + name[-max_length + 3 :]
else:
name = self.md5sum()[:8]
assert len(name) <= max_length
return name
[docs] def similarity(self, other, ignore_abundance=False, downsample=False):
"Compute similarity with the other signature."
return self.minhash.similarity(other.minhash,
ignore_abundance=ignore_abundance,
downsample=downsample)
[docs] def jaccard(self, other):
"Compute Jaccard similarity with the other MinHash signature."
return self.minhash.similarity(other.minhash, ignore_abundance=True,
downsample=False)
[docs] def contained_by(self, other, downsample=False):
"Compute containment by the other signature. Note: ignores abundance."
return self.minhash.contained_by(other.minhash, downsample)
def add_sequence(self, sequence, force=False):
self._methodcall(lib.signature_add_sequence, to_bytes(sequence), force)
def add_protein(self, sequence):
self._methodcall(lib.signature_add_protein, to_bytes(sequence))
@staticmethod
def from_params(params):
ptr = rustcall(lib.signature_from_params, params._get_objptr())
return SourmashSignature._from_objptr(ptr)
def __len__(self):
return self._methodcall(lib.signature_len)
def __getstate__(self): # enable pickling
return (
self.minhash,
self._name,
self.filename,
)
def __setstate__(self, tup):
(mh, name, filename) = tup
self.__del__()
self._objptr = lib.signature_new()
if name:
self._name = name
if filename:
self.filename = filename
self.minhash = mh
def __reduce__(self):
return (
SourmashSignature,
(
self.minhash,
self._name,
self.filename
),
)
def _detect_input_type(data):
"""\
Determine how to load input from `data`. Returns SigInput enum.
Checks for:
- Python file-like objects
- JSON text (uncompressed sigs)
- Compressed memory buffers
- filename
"""
if hasattr(data, "fileno") or hasattr(data, "mode"): # file-like object
return SigInput.FILE_LIKE
elif hasattr(data, "find"): # check if it is uncompressed sig
try:
if data.find("sourmash_signature") > 0:
return SigInput.BUFFER
elif PY2:
try:
if data.startswith(b'\x1F\x8B'): # gzip compressed
return SigInput.BUFFER
except UnicodeDecodeError:
pass
except TypeError:
if data.find(b"sourmash_signature") > 0:
return SigInput.BUFFER
elif data.startswith(b'\x1F\x8B'): # gzip compressed
return SigInput.BUFFER
try:
if os.path.exists(data): # filename
return SigInput.PATH
except (ValueError, TypeError): # No idea...
return SigInput.UNKNOWN
return SigInput.UNKNOWN
[docs]def load_signatures(
data, ksize=None, select_moltype=None, ignore_md5sum=False, do_raise=False,
quiet=False
):
"""Load a JSON string with signatures into classes.
Returns list of SourmashSignature objects.
Note, the order is not necessarily the same as what is in the source file.
"""
if ksize is not None:
ksize = int(ksize)
else:
ksize = 0
if not data:
return
if select_moltype is None:
select_moltype = ffi.NULL
else:
try:
select_moltype = select_moltype.encode("utf-8")
except AttributeError:
pass
input_type = _detect_input_type(data)
if input_type == SigInput.UNKNOWN:
if not quiet:
error("Error in parsing signature; quitting. Cannot open file or invalid signature")
return
size = ffi.new("uintptr_t *")
try:
if input_type == SigInput.FILE_LIKE:
if hasattr(data, "mode") and "t" in data.mode: # need to reopen handler as binary
if sys.version_info >= (3,):
data = data.buffer
buf = data.read()
data.close()
data = buf
input_type = SigInput.BUFFER
elif input_type == SigInput.PATH:
sigs_ptr = rustcall(
lib.signatures_load_path,
data.encode("utf-8"),
ignore_md5sum,
ksize,
select_moltype,
size,
)
if input_type == SigInput.BUFFER:
if hasattr(data, "encode") and not PY2:
data = data.encode("utf-8")
sigs_ptr = rustcall(
lib.signatures_load_buffer,
data,
len(data),
ignore_md5sum,
ksize,
select_moltype,
size,
)
size = size[0]
sigs = []
for i in range(size):
sig = SourmashSignature._from_objptr(sigs_ptr[i])
sigs.append(sig)
for sig in sigs:
yield sig
except Exception as e:
if not quiet:
error("Error in parsing signature; quitting.")
error("Exception: {}", str(e))
if do_raise:
raise
def load_one_signature(data, ksize=None, select_moltype=None, ignore_md5sum=False):
sigiter = load_signatures(
data, ksize=ksize, select_moltype=select_moltype, ignore_md5sum=ignore_md5sum
)
try:
first_sig = next(sigiter)
except StopIteration:
raise ValueError("no signatures to load")
try:
next(sigiter)
except StopIteration:
return first_sig
raise ValueError("expected to load exactly one signature")
[docs]def save_signatures(siglist, fp=None, compression=0):
"Save multiple signatures into a JSON string (or into file handle 'fp')"
attached_refs = weakref.WeakKeyDictionary()
# get list of rust objects
collected = []
for obj in siglist:
rv = obj._get_objptr()
attached_refs[rv] = obj
collected.append(rv)
siglist_c = ffi.new("Signature*[]", collected)
size = ffi.new("uintptr_t *")
# save signature into a string (potentially compressed)
rawbuf = rustcall(lib.signatures_save_buffer, siglist_c, len(collected),
compression, size)
size = size[0]
# associate a finalizer with rawbuf so that it gets freed
buf = ffi.gc(rawbuf, lambda o: lib.nodegraph_buffer_free(o, size), size)
if compression:
result = ffi.buffer(buf, size)[:]
else:
result = ffi.string(buf, size)
if fp is None: # return string
return result
else:
try: # write to file
fp.write(result)
except TypeError:
fp.write(result.decode('utf-8'))
return None