Source code for sourmash.sbt

#!/usr/bin/env python
"""
An implementation of sequence bloom trees, Solomon & Kingsford, 2015.

To try it out, do::

    factory = GraphFactory(ksize, tablesizes, n_tables)
    root = Node(factory)

    graph1 = factory()
    # ... add stuff to graph1 ...
    leaf1 = Leaf("a", graph1)
    root.insert(leaf1)

For example, ::

    # filenames: list of fa/fq files
    # ksize: k-mer size
    # tablesizes: Bloom filter table sizes
    # n_tables: Number of tables

    factory = GraphFactory(ksize, tablesizes, n_tables)
    root = Node(factory)

    for filename in filenames:
        graph = factory()
        graph.consume_fasta(filename)
        leaf = Leaf(filename, graph)
        root.insert(leaf)

then define a search function, ::

    def kmers(k, seq):
        for start in range(len(seq) - k + 1):
            yield seq[start:start + k]

    def search_transcript(node, seq, threshold):
        presence = [ node.data.get(kmer) for kmer in kmers(ksize, seq) ]
        if sum(presence) >= int(threshold * len(seq)):
            return 1
        return 0
"""

from __future__ import print_function, unicode_literals, division

from collections import namedtuple
try:
    from collections.abc import Mapping
except ImportError:  # Python 2...
    from collections import Mapping

from copy import copy
import json
import math
import os
from random import randint, random
import sys
from tempfile import NamedTemporaryFile

import khmer

try:
    load_nodegraph = khmer.load_nodegraph
except AttributeError:
    load_nodegraph = khmer.Nodegraph.load

from .sbt_storage import FSStorage, TarStorage, IPFSStorage, RedisStorage
from .logging import error, notify, debug
from .index import Index

STORAGES = {
    'TarStorage': TarStorage,
    'FSStorage': FSStorage,
    'IPFSStorage': IPFSStorage,
    'RedisStorage': RedisStorage,
}
NodePos = namedtuple("NodePos", ["pos", "node"])


[docs]class GraphFactory(object): """Build new nodegraphs (Bloom filters) of a specific (fixed) size. Parameters ---------- ksize: int k-mer size. starting_size: int size (in bytes) for each nodegraph table. n_tables: int number of nodegraph tables to be used. """ def __init__(self, ksize, starting_size, n_tables): self.ksize = ksize self.starting_size = starting_size self.n_tables = n_tables def __call__(self): return khmer.Nodegraph(self.ksize, self.starting_size, self.n_tables)
[docs] def init_args(self): return (self.ksize, self.starting_size, self.n_tables)
[docs]class SBT(Index): """A Sequence Bloom Tree implementation allowing generic internal nodes and leaves. The default node and leaf format is a Bloom Filter (like the original implementation), but we also provide a MinHash leaf class (in the sourmash.sbtmh.SigLeaf class) Parameters ---------- factory: Factory Callable for generating new datastores for internal nodes. d: int Number of children for each internal node. Defaults to 2 (a binary tree) storage: Storage, default: None A Storage is any place where we can save and load data for the nodes. If set to None, will use a FSStorage. Notes ----- We use two dicts to store the tree structure: One for the internal nodes, and another for the leaves (datasets). """ def __init__(self, factory, d=2, storage=None): self.factory = factory self._nodes = {} self._missing_nodes = set() self._leaves = {} self.d = d self.next_node = 0 self.storage = storage
[docs] def signatures(self): for k in self.leaves(): yield k.data
[docs] def select(self, ksize=None, moltype=None): first_sig = next(iter(self.signatures())) ok = True if ksize is not None and first_sig.minhash.ksize != ksize: ok = False if moltype is not None and first_sig.moltype != moltype: ok = False if ok: return self raise ValueError("cannot select SBT on ksize {} / moltype {}".format(ksize, moltype))
[docs] def new_node_pos(self, node): if not self._nodes: self.next_node = 1 return 0 if not self._leaves: self.next_node = 2 return 1 min_leaf = min(self._leaves.keys()) next_internal_node = None if self.next_node <= min_leaf: for i in range(min_leaf): if all((i not in self._nodes, i not in self._leaves, i not in self._missing_nodes)): next_internal_node = i break if next_internal_node is None: self.next_node = max(self._leaves.keys()) + 1 else: self.next_node = next_internal_node return self.next_node
[docs] def insert(self, signature): "Add a new SourmashSignature in to the SBT." from .sbtmh import SigLeaf leaf = SigLeaf(signature.md5sum(), signature) self.add_node(leaf)
[docs] def add_node(self, node): pos = self.new_node_pos(node) if pos == 0: # empty tree; initialize w/node. n = Node(self.factory, name="internal." + str(pos)) self._nodes[0] = n pos = self.new_node_pos(node) # Cases: # 1) parent is a Leaf (already covered) # 2) parent is a Node (with empty position available) # - add Leaf, update parent # 3) parent is a Node (no position available) # - this is covered by case 1 # 4) parent is None # this can happen with d != 2, in this case create the parent node p = self.parent(pos) if isinstance(p.node, Leaf): # Create a new internal node # node and parent are children of new internal node n = Node(self.factory, name="internal." + str(p.pos)) self._nodes[p.pos] = n c1, c2 = self.children(p.pos)[:2] self._leaves[c1.pos] = p.node self._leaves[c2.pos] = node del self._leaves[p.pos] for child in (p.node, node): child.update(n) elif isinstance(p.node, Node): self._leaves[pos] = node node.update(p.node) elif p.node is None: n = Node(self.factory, name="internal." + str(p.pos)) self._nodes[p.pos] = n c1 = self.children(p.pos)[0] self._leaves[c1.pos] = node node.update(n) # update all parents! p = self.parent(p.pos) while p: self._rebuild_node(p.pos) node.update(self._nodes[p.pos]) p = self.parent(p.pos)
[docs] def find(self, search_fn, *args, **kwargs): "Search the tree using `search_fn`." unload_data = kwargs.get("unload_data", False) # initialize search queue with top node of tree matches = [] visited, queue = set(), [0] # while the queue is not empty, load each node and apply search # function. while queue: node_p = queue.pop(0) # repair while searching. node_g = self._leaves.get(node_p, None) if node_g is None: node_g = self._nodes.get(node_p, None) if node_g is None: if node_p in self._missing_nodes: self._rebuild_node(node_p) node_g = self._nodes[node_p] else: continue # if we have not visited this node before, if node_p not in visited: visited.add(node_p) # apply search fn. If return false, truncate search. if search_fn(node_g, *args): # leaf node? it's a match! if isinstance(node_g, Leaf): matches.append(node_g) # internal node? descend. elif isinstance(node_g, Node): if kwargs.get('dfs', True): # defaults search to dfs for c in self.children(node_p): queue.insert(0, c.pos) else: # bfs queue.extend(c.pos for c in self.children(node_p)) if unload_data: node_g.unload() return matches
[docs] def search(self, query, *args, **kwargs): from .sbtmh import search_minhashes, search_minhashes_containment from .sbtmh import SearchMinHashesFindBest from .signature import SourmashSignature threshold = kwargs['threshold'] ignore_abundance = kwargs['ignore_abundance'] do_containment = kwargs['do_containment'] best_only = kwargs['best_only'] unload_data = kwargs.get('unload_data', False) # figure out scaled value of tree, downsample query if needed. leaf = next(iter(self.leaves())) tree_mh = leaf.data.minhash tree_query = query if tree_mh.scaled and query.minhash.scaled and \ tree_mh.scaled > query.minhash.scaled: resampled_query_mh = tree_query.minhash resampled_query_mh = resampled_query_mh.downsample_scaled(tree_mh.scaled) tree_query = SourmashSignature(resampled_query_mh) # define both search function and post-search calculation function search_fn = search_minhashes query_match = lambda x: tree_query.similarity( x, downsample=False, ignore_abundance=ignore_abundance) if do_containment: search_fn = search_minhashes_containment query_match = lambda x: tree_query.contained_by(x, downsample=True) if best_only: # this needs to be reset for each SBT search_fn = SearchMinHashesFindBest().search # now, search! results = [] for leaf in self.find(search_fn, tree_query, threshold, unload_data=unload_data): similarity = query_match(leaf.data) # tree search should always/only return matches above threshold assert similarity >= threshold results.append((similarity, leaf.data, None)) return results
[docs] def gather(self, query, *args, **kwargs): from .sbtmh import GatherMinHashes if not query.minhash: # empty query? quit. return [] # use a tree search function that keeps track of its best match. search_fn = GatherMinHashes().search unload_data = kwargs.get('unload_data', False) leaf = next(iter(self.leaves())) tree_mh = leaf.data.minhash scaled = tree_mh.scaled threshold_bp = kwargs.get('threshold_bp', 0.0) threshold = 0.0 # are we setting a threshold? if threshold_bp: # if we have a threshold_bp of N, then that amounts to N/scaled # hashes: n_threshold_hashes = threshold_bp / scaled # that then requires the following containment: threshold = n_threshold_hashes / len(query.minhash) # is it too high to ever match? if so, exit. if threshold > 1.0: return [] # actually do search! results = [] for leaf in self.find(search_fn, query, threshold, unload_data=unload_data): leaf_mh = leaf.data.minhash containment = query.minhash.contained_by(leaf_mh, True) if containment >= threshold: results.append((containment, leaf.data, None)) return results
def _rebuild_node(self, pos=0): """Recursively rebuilds an internal node (if it is not present). Parameters ---------- pos: int node to be rebuild. Any internal node under it will be rebuild too. If you want to rebuild all missing internal nodes you can use pos=0 (the default). """ node = self._nodes.get(pos, None) if node is not None: # this node was already build, skip return node = Node(self.factory, name="internal.{}".format(pos)) self._nodes[pos] = node for c in self.children(pos): if c.pos in self._missing_nodes or isinstance(c.node, Leaf): cnode = c.node if cnode is None: self._rebuild_node(c.pos) cnode = self._nodes[c.pos] cnode.update(node)
[docs] def parent(self, pos): """Return the parent of the node at position ``pos``. If it is the root node (position 0), returns None. Parameters ---------- pos: int Position of the node in the tree. Returns ------- NodePos : A NodePos namedtuple with the position and content of the parent node. """ if pos == 0: return None p = int(math.floor((pos - 1) / self.d)) if p in self._leaves: return NodePos(p, self._leaves[p]) node = self._nodes.get(p, None) return NodePos(p, node)
[docs] def children(self, pos): """Return all children nodes for node at position ``pos``. Parameters ---------- pos: int Position of the node in the tree. Returns ------- list of NodePos A list of NodePos namedtuples with the position and content of all children nodes. """ return [self.child(pos, c) for c in range(self.d)]
[docs] def child(self, parent, pos): """Return a child node at position ``pos`` under the ``parent`` node. Parameters ---------- parent: int Parent node position in the tree. pos: int Position of the child one under the parent. Ranges from [0, arity - 1], where arity is the arity of the SBT (usually it is 2, a binary tree). Returns ------- NodePos A NodePos namedtuple with the position and content of the child node. """ cd = self.d * parent + pos + 1 if cd in self._leaves: return NodePos(cd, self._leaves[cd]) node = self._nodes.get(cd, None) return NodePos(cd, node)
[docs] def save(self, path, storage=None, sparseness=0.0, structure_only=False): """Saves an SBT description locally and node data to a storage. Parameters ---------- path : str path to where the SBT description should be saved. storage : Storage, optional Storage to be used for saving node data. Defaults to FSStorage (a hidden directory at the same level of path) sparseness : float How much of the internal nodes should be saved. Defaults to 0.0 (save all internal nodes data), can go up to 1.0 (don't save any internal nodes data) structure_only: boolean Write only the index schema and metadata, but not the data. Defaults to False (save data too) Returns ------- str full path to the new SBT description """ version = 4 if path.endswith('.sbt.json'): path = path[:-9] fn = os.path.abspath(path + '.sbt.json') if storage is None: # default storage location = os.path.dirname(fn) subdir = '.sbt.{}'.format(os.path.basename(path)) storage = FSStorage(location, subdir) fn = os.path.join(location, fn) backend = [k for (k, v) in STORAGES.items() if v == type(storage)][0] info = {} info['d'] = self.d info['version'] = version info['storage'] = { 'backend': backend, 'args': storage.init_args() } info['factory'] = { 'class': GraphFactory.__name__, 'args': self.factory.init_args() } nodes = {} total_nodes = len(self) for n, (i, node) in enumerate(self): if node is None: continue if isinstance(node, Node): if random() - sparseness <= 0: continue data = { # TODO: start using md5sum instead? 'filename': os.path.basename(node.name), 'name': node.name } try: node.metadata.pop('max_n_below') except (AttributeError, KeyError): pass data['metadata'] = node.metadata if structure_only is False: # trigger data loading before saving to the new place node.data node.storage = storage data['filename'] = node.save(data['filename']) node.storage = storage data['filename'] = node.save(data['filename']) nodes[i] = data notify("{} of {} nodes saved".format(n+1, total_nodes), end='\r') notify("\nFinished saving nodes, now saving SBT json file.") info['nodes'] = nodes with open(fn, 'w') as fp: json.dump(info, fp) return fn
def _save_v5(self, path, storage=None, sparseness=0.0, structure_only=False): version = 5 if path.endswith('.sbt.json'): path = path[:-9] fn = os.path.abspath(path + '.sbt.json') if storage is None: # default storage location = os.path.dirname(fn) subdir = '.sbt.{}'.format(os.path.basename(path)) storage = FSStorage(location, subdir) fn = os.path.join(location, fn) backend = [k for (k, v) in STORAGES.items() if v == type(storage)][0] info = {} info['d'] = self.d info['version'] = version info['storage'] = { 'backend': backend, 'args': storage.init_args() } info['factory'] = { 'class': GraphFactory.__name__, 'args': self.factory.init_args() } nodes = {} leaves = {} total_nodes = len(self) for n, (i, node) in enumerate(self): if node is None: continue if isinstance(node, Node): if random() - sparseness <= 0: continue data = { # TODO: start using md5sum instead? 'filename': os.path.basename(node.name), 'name': node.name } try: node.metadata.pop('max_n_below') except (AttributeError, KeyError): pass data['metadata'] = node.metadata if structure_only is False: # trigger data loading before saving to the new place node.data node.storage = storage data['filename'] = node.save(data['filename']) node.storage = storage data['filename'] = node.save(data['filename']) if isinstance(node, Node): nodes[i] = data else: leaves[i] = data notify("{} of {} nodes saved".format(n+1, total_nodes), end='\r') notify("\nFinished saving nodes, now saving SBT json file.") info['nodes'] = nodes info['leaves'] = leaves with open(fn, 'w') as fp: json.dump(info, fp) return fn
[docs] @classmethod def load(cls, location, leaf_loader=None, storage=None, print_version_warning=True): """Load an SBT description from a file. Parameters ---------- location : str path to the SBT description. leaf_loader : function, optional function to load leaf nodes. Defaults to ``Leaf.load``. storage : Storage, optional Storage to be used for saving node data. Defaults to FSStorage (a hidden directory at the same level of path) Returns ------- SBT the SBT tree built from the description. """ dirname = os.path.dirname(os.path.abspath(location)) sbt_name = os.path.basename(location) if sbt_name.endswith('.sbt.json'): sbt_name = sbt_name[:-9] loaders = { 1: cls._load_v1, 2: cls._load_v2, 3: cls._load_v3, 4: cls._load_v4, 5: cls._load_v5, } # @CTB hack: check to make sure khmer Nodegraph supports the # correct methods. x = khmer.Nodegraph(1, 1, 1) try: x.count(10) except TypeError: raise Exception("khmer version is too old; need >= 2.1,<3") if leaf_loader is None: leaf_loader = Leaf.load sbt_fn = os.path.join(dirname, sbt_name) if not sbt_fn.endswith('.sbt.json'): sbt_fn += '.sbt.json' with open(sbt_fn) as fp: jnodes = json.load(fp) version = 1 if isinstance(jnodes, Mapping): version = jnodes['version'] if version < 3 and storage is None: storage = FSStorage(dirname, '.sbt.{}'.format(sbt_name)) return loaders[version](jnodes, leaf_loader, dirname, storage, print_version_warning)
@staticmethod def _load_v1(jnodes, leaf_loader, dirname, storage, print_version_warning=True): if jnodes[0] is None: raise ValueError("Empty tree!") sbt_nodes = {} sample_bf = os.path.join(dirname, jnodes[0]['filename']) ksize, tablesize, ntables = khmer.extract_nodegraph_info(sample_bf)[:3] factory = GraphFactory(ksize, tablesize, ntables) for i, jnode in enumerate(jnodes): if jnode is None: continue jnode['filename'] = os.path.join(dirname, jnode['filename']) if 'internal' in jnode['name']: jnode['factory'] = factory sbt_node = Node.load(jnode, storage) else: sbt_node = leaf_loader(jnode, storage) sbt_nodes[i] = sbt_node tree = SBT(factory) tree._nodes = sbt_nodes return tree @classmethod def _load_v2(cls, info, leaf_loader, dirname, storage, print_version_warning=True): nodes = {int(k): v for (k, v) in info['nodes'].items()} if nodes[0] is None: raise ValueError("Empty tree!") sbt_nodes = {} sbt_leaves = {} sample_bf = os.path.join(dirname, nodes[0]['filename']) k, size, ntables = khmer.extract_nodegraph_info(sample_bf)[:3] factory = GraphFactory(k, size, ntables) for k, node in nodes.items(): if node is None: continue node['filename'] = os.path.join(dirname, node['filename']) if 'internal' in node['name']: node['factory'] = factory sbt_node = Node.load(node, storage) sbt_nodes[k] = sbt_node else: sbt_node = leaf_loader(node, storage) sbt_leaves[k] = sbt_node tree = cls(factory, d=info['d']) tree._nodes = sbt_nodes tree._leaves = sbt_leaves return tree @classmethod def _load_v3(cls, info, leaf_loader, dirname, storage, print_version_warning=True): nodes = {int(k): v for (k, v) in info['nodes'].items()} if not nodes: raise ValueError("Empty tree!") sbt_nodes = {} sbt_leaves = {} klass = STORAGES[info['storage']['backend']] if info['storage']['backend'] == "FSStorage": storage = FSStorage(dirname, info['storage']['args']['path']) elif storage is None: storage = klass(**info['storage']['args']) factory = GraphFactory(*info['factory']['args']) max_node = 0 for k, node in nodes.items(): if node is None: continue if 'internal' in node['name']: node['factory'] = factory sbt_node = Node.load(node, storage) sbt_nodes[k] = sbt_node else: sbt_node = leaf_loader(node, storage) sbt_leaves[k] = sbt_node max_node = max(max_node, k) tree = cls(factory, d=info['d'], storage=storage) tree._nodes = sbt_nodes tree._leaves = sbt_leaves tree._missing_nodes = {i for i in range(max_node) if i not in sbt_nodes and i not in sbt_leaves} if print_version_warning: error("WARNING: this is an old index version, please run `sourmash migrate` to update it.") error("WARNING: proceeding with execution, but it will take longer to finish!") tree._fill_min_n_below() return tree @classmethod def _load_v4(cls, info, leaf_loader, dirname, storage, print_version_warning=True): nodes = {int(k): v for (k, v) in info['nodes'].items()} if not nodes: raise ValueError("Empty tree!") sbt_nodes = {} sbt_leaves = {} klass = STORAGES[info['storage']['backend']] if info['storage']['backend'] == "FSStorage": storage = FSStorage(dirname, info['storage']['args']['path']) elif storage is None: storage = klass(**info['storage']['args']) factory = GraphFactory(*info['factory']['args']) max_node = 0 for k, node in nodes.items(): if 'internal' in node['name']: node['factory'] = factory sbt_node = Node.load(node, storage) sbt_nodes[k] = sbt_node else: sbt_node = leaf_loader(node, storage) sbt_leaves[k] = sbt_node max_node = max(max_node, k) tree = cls(factory, d=info['d'], storage=storage) tree._nodes = sbt_nodes tree._leaves = sbt_leaves tree._missing_nodes = {i for i in range(max_node) if i not in sbt_nodes and i not in sbt_leaves} tree.next_node = max_node return tree @classmethod def _load_v5(cls, info, leaf_loader, dirname, storage, print_version_warning=True): nodes = {int(k): v for (k, v) in info['nodes'].items()} leaves = {int(k): v for (k, v) in info['leaves'].items()} if not leaves: raise ValueError("Empty tree!") sbt_nodes = {} sbt_leaves = {} klass = STORAGES[info['storage']['backend']] if info['storage']['backend'] == "FSStorage": storage = FSStorage(dirname, info['storage']['args']['path']) elif storage is None: storage = klass(**info['storage']['args']) factory = GraphFactory(*info['factory']['args']) max_node = 0 for k, node in nodes.items(): node['factory'] = factory sbt_node = Node.load(node, storage) sbt_nodes[k] = sbt_node max_node = max(max_node, k) for k, node in leaves.items(): sbt_leaf = leaf_loader(node, storage) sbt_leaves[k] = sbt_leaf max_node = max(max_node, k) tree = cls(factory, d=info['d'], storage=storage) tree._nodes = sbt_nodes tree._leaves = sbt_leaves tree._missing_nodes = {i for i in range(max_node) if i not in sbt_nodes and i not in sbt_leaves} return tree def _fill_min_n_below(self): """\ Propagate the smallest hash size below each node up the tree from the leaves. """ def fill_min_n_below(node, *args, **kwargs): original_min_n_below = node.metadata.get('min_n_below', sys.maxsize) min_n_below = original_min_n_below children = kwargs['children'] for child in children: if child.node is not None: if isinstance(child.node, Leaf): min_n_below = min(len(child.node.data.minhash), min_n_below) else: child_n = child.node.metadata.get('min_n_below', sys.maxsize) min_n_below = min(child_n, min_n_below) if min_n_below == 0: min_n_below = 1 node.metadata['min_n_below'] = min_n_below return original_min_n_below != min_n_below self._fill_up(fill_min_n_below) def _fill_internal(self): def fill_nodegraphs(node, *args, **kwargs): children = kwargs['children'] for child in children: if child.node is not None: child.node.update(node) return True self._fill_up(fill_nodegraphs) def _fill_up(self, search_fn, *args, **kwargs): visited, queue = set(), list(reversed(sorted(self._leaves.keys()))) debug("started filling up") processed = 0 while queue: node_p = queue.pop(0) parent = self.parent(node_p) if parent is None: # we are in the root, no more nodes available to search assert len(queue) == 0 return was_missing = False if parent.node is None: if parent.pos in self._missing_nodes: self._rebuild_node(parent.pos) parent = self.parent(node_p) was_missing = True else: continue siblings = self.children(parent.pos) if node_p not in visited: visited.add(node_p) for sibling in siblings: visited.add(sibling.pos) try: queue.remove(sibling.pos) except ValueError: pass if search_fn(parent.node, children=siblings, *args) or was_missing: queue.append(parent.pos) processed += 1 if processed % 100 == 0: debug("processed {}, in queue {}", processed, len(queue), sep='\r') def __len__(self): internal_nodes = set(self._nodes).union(self._missing_nodes) return len(internal_nodes) + len(self._leaves)
[docs] def print_dot(self): print(""" digraph G { nodesep=0.3; ranksep=0.2; margin=0.1; node [shape=ellipse]; edge [arrowsize=0.8]; """) for i, node in self._nodes.items(): if isinstance(node, Node): print('"{}" [shape=box fillcolor=gray style=filled]'.format( node.name)) for j, child in self.children(i): if child is not None: print('"{}" -> "{}"'.format(node.name, child.name)) print("}")
[docs] def print(self): visited, stack = set(), [0] while stack: node_p = stack.pop() node_g = self._nodes.get(node_p, None) if node_p not in visited and node_g is not None: visited.add(node_p) depth = int(math.floor(math.log(node_p + 1, self.d))) print(" " * 4 * depth, node_g) if isinstance(node_g, Node): stack.extend(c.pos for c in self.children(node_p) if c.pos not in visited)
def __iter__(self): for i, node in self._nodes.items(): yield (i, node) for i, node in self._leaves.items(): yield (i, node) def _parents(self, pos=0): if pos == 0: yield None else: p = self.parent(pos) while p is not None: yield p.pos p = self.parent(p.pos)
[docs] def leaves(self, with_pos=False): for pos, data in self._leaves.items(): if with_pos: yield (pos, data) else: yield data
[docs] def combine(self, other): larger, smaller = self, other if len(other) > len(self): larger, smaller = other, self n = Node(self.factory, name="internal.0", storage=self.storage) larger._nodes[0].update(n) smaller._nodes[0].update(n) new_nodes = {} new_nodes[0] = n new_leaves = {} levels = int(math.ceil(math.log(len(larger), self.d))) + 1 current_pos = 1 n_previous = 0 n_next = 1 for level in range(1, levels + 1): for tree in (larger, smaller): for pos in range(n_previous, n_next): if tree._nodes.get(pos, None) is not None: new_node = copy(tree._nodes[pos]) new_node.name = "internal.{}".format(current_pos) new_nodes[current_pos] = new_node elif tree._leaves.get(pos, None) is not None: new_node = copy(tree._leaves[pos]) new_leaves[current_pos] = new_node current_pos += 1 n_previous = n_next n_next = n_previous + int(self.d ** level) current_pos = n_next # TODO: do we want to return a new tree, or merge into this one? self._nodes = new_nodes self._leaves = new_leaves return self
[docs]class Node(object): "Internal node of SBT." def __init__(self, factory, name=None, path=None, storage=None): self.name = name self.storage = storage self._factory = factory self._data = None self._path = path self.metadata = dict() def __str__(self): return '*Node:{name} [occupied: {nb}, fpr: {fpr:.2}]'.format( name=self.name, nb=self.data.n_occupied(), fpr=khmer.calc_expected_collisions(self.data, True, 1.1))
[docs] def save(self, path): # We need to do this tempfile dance because khmer only load # data from files. with NamedTemporaryFile(suffix=".gz") as f: self.data.save(f.name) f.file.flush() f.file.seek(0) return self.storage.save(path, f.read())
@property def data(self): if self._data is None: if self._path is None: self._data = self._factory() else: data = self.storage.load(self._path) # We need to do this tempfile dance because khmer only load # data from files. with NamedTemporaryFile(suffix=".gz") as f: f.write(data) f.file.flush() self._data = load_nodegraph(f.name) return self._data @data.setter def data(self, new_data): self._data = new_data
[docs] def unload(self): self._data = None
[docs] @staticmethod def load(info, storage=None): new_node = Node(info['factory'], name=info['name'], path=info['filename'], storage=storage) new_node.metadata = info.get('metadata', {}) return new_node
[docs] def update(self, parent): parent.data.update(self.data) if 'min_n_below' in self.metadata: min_n_below = min(parent.metadata.get('min_n_below', sys.maxsize), self.metadata.get('min_n_below')) if min_n_below == 0: min_n_below = 1 parent.metadata['min_n_below'] = min_n_below
[docs]class Leaf(object): def __init__(self, metadata, data=None, name=None, storage=None, path=None): self.metadata = metadata if name is None: name = metadata self.name = name self.storage = storage self._data = data self._path = path def __str__(self): return '**Leaf:{name} [occupied: {nb}, fpr: {fpr:.2}] -> {metadata}'.format( name=self.name, metadata=self.metadata, nb=self.data.n_occupied(), fpr=khmer.calc_expected_collisions(self.data, True, 1.1)) @property def data(self): if self._data is None: data = self.storage.load(self._path) # We need to do this tempfile dance because khmer only load # data from files. with NamedTemporaryFile(suffix=".gz") as f: f.write(data) f.file.flush() self._data = load_nodegraph(f.name) return self._data @data.setter def data(self, new_data): self._data = new_data
[docs] def unload(self): self._data = None
[docs] def save(self, path): # We need to do this tempfile dance because khmer only load # data from files. with NamedTemporaryFile(suffix=".gz") as f: self.data.save(f.name) f.file.flush() f.file.seek(0) return self.storage.save(path, f.read())
[docs] def update(self, parent): parent.data.update(self.data)
[docs] @classmethod def load(cls, info, storage=None): return cls(info['metadata'], name=info['name'], path=info['filename'], storage=storage)
def filter_distance(filter_a, filter_b, n=1000): """ Compute a heuristic distance per bit between two Bloom filters. Parameters ---------- filter_a : Nodegraph filter_b : Nodegraph n : int Number of positions to compare (in groups of 8) Returns ------- float The distance between both filters (from 0.0 to 1.0) """ from numpy import array A = filter_a.graph.get_raw_tables() B = filter_b.graph.get_raw_tables() distance = 0 for q, p in zip(A, B): a = array(q, copy=False) b = array(p, copy=False) for i in map(lambda x: randint(0, len(a)), range(n)): distance += sum(map(int, [not bool((a[i] >> j) & 1) ^ bool((b[i] >> j) & 1) for j in range(8)])) return distance / (8.0 * len(A) * n) def convert_cmd(name, backend): "Convert an SBT to use a different back end." from .sbtmh import SigLeaf options = backend.split('(') backend = options.pop(0) backend = backend.lower().strip("'") if options: print(options) options = options[0].split(')') options = [options.pop(0)] #options = {} else: options = [] if backend.lower() in ('ipfs', 'ipfsstorage'): backend = IPFSStorage elif backend.lower() in ('redis', 'redisstorage'): backend = RedisStorage elif backend.lower() in ('tar', 'tarstorage'): backend = TarStorage elif backend.lower() in ('fs', 'fsstorage'): backend = FSStorage if options: options = [os.path.dirname(options[0]), os.path.basename(options[0])] else: # this is the default for SBT v2 tag = '.sbt.' + os.path.basename(name) if tag.endswith('.sbt.json'): tag = tag[:-9] path = os.path.dirname(name) options = [path, tag] else: error('backend not recognized: {}'.format(backend)) with backend(*options) as storage: sbt = SBT.load(name, leaf_loader=SigLeaf.load) sbt.save(name, storage=storage)