Source code for instancelib.instances.hdf5vector

# Copyright (C) 2021 The InstanceLib Authors. All Rights Reserved.

# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3 of the License, or (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.

# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

import itertools
import pickle
from os import PathLike
from typing import (Any, Callable, Dict, Generic, Iterator, Optional, Sequence,
                    Tuple, Union)
import h5py  # type: ignore
import numpy as np  # type: ignore
import numpy.typing as npt
from h5py._hl.dataset import Dataset  # type: ignore

from ..exceptions import NoVectorsException
from ..utils.chunks import divide_iterable_in_lists, get_range
from ..utils.func import filter_snd_none, list_unzip, identity
from ..utils.numpy import (matrix_to_vector_list, matrix_tuple_to_vectors,
                           matrix_tuple_to_zipped, slicer)
from .vectorstorage import VectorStorage, ensure_writeable


from ..typehints import KT, DType


[docs]def keys_wrapper(keys: Sequence[Any]) -> Sequence[Union[int, str]]: def key_wrapper(key: Any) -> Union[int, str]: if isinstance(key, (int, str)): return key return str(key) converted = [key_wrapper(key) for key in keys] return converted
[docs]class HDF5VectorStorage(VectorStorage[KT, npt.NDArray[DType], npt.NDArray[DType]], Generic[KT, DType]): """This class provides the handling of on disk vector storage in HDF5 format. In many cases, storing feature matrices or large sets of vectors in memory is not feasible. This class provides methods that `InstanceProvider` implementations can use to ensure that only the vectors needed by some operations are kept in memory. This class enables processing all vector in chunks that do fit in memory, enabling ordering all unlabeled instances for very large datasets. Parameters ---------- h5path : str The path to the hdf5 file mode : str, optional The file mode (see `h5py` documentation), by default "r" """ __writemodes = ["a", "r+", "w", "w-", "x"] def __init__(self, h5path: "PathLike[str]", mode: str = "r") -> None: self.__mode = mode self.h5path = h5path self.key_dict: Dict[KT, int] = dict() self.inv_key_dict: Dict[int, KT] = dict() self.reload() @property def writeable(self) -> bool: """Check if the storage is writeable Returns ------- bool True when writeable """ return self.__mode in self.__writemodes
[docs] def __len__(self) -> int: """Returns the size of the dataset Returns ------- int The size of the dataset """ return len(self.key_dict)
@property def datasets_exist(self) -> bool: """Check if the HDF5 file contains a dataset Returns ------- bool True, if the file contains a dataset """ with h5py.File(self.h5path, self.__mode) as hfile: exist = "vectors" in hfile and "keys" in hfile return exist
[docs] def reload(self) -> None: """Reload the index from disk """ with h5py.File(self.h5path, self.__mode) as hfile: if "dicts" in hfile: dicts = hfile["dicts"] assert isinstance(dicts, Dataset) self.key_dict = pickle.loads(dicts[0]) # type: ignore self.inv_key_dict = pickle.loads(dicts[1]) # type: ignore
def __enter__(self): return self @ensure_writeable def __store_dicts(self) -> None: """Store the index dictionaries to disk in the HDF5 file """ with h5py.File(self.h5path, self.__mode) as hfile: if "dicts" not in hfile: dt = h5py.special_dtype(vlen=np.dtype("uint8")) # type: ignore hfile.create_dataset("dicts", (2,), dtype=dt) # type: ignore dicts = hfile["dicts"] assert isinstance(dicts, Dataset) dicts[0] = np.frombuffer( # type: ignore pickle.dumps(self.key_dict), dtype="uint8") #type: ignore dicts[1] = np.frombuffer( # type: ignore pickle.dumps(self.inv_key_dict), dtype="uint8") # type: ignore
[docs] @ensure_writeable def rebuild_index(self, type_restorer: Callable[[Any], KT] = identity) -> None: """Rebuild the index after manual manipulation of a HDF5 file. Raises ------ NoVectorsException If there are no vectors, or if they are stored incorrectly """ if not self.datasets_exist: raise NoVectorsException("There are no vectors stored in this file, " "therefore, the index dictionaries cannot " "be rebuilt.") self.key_dict: Dict[KT, int] = dict() self.inv_key_dict: Dict[int, KT] = dict() with h5py.File(self.h5path, self.__mode) as hfile: keys = hfile["keys"] assert isinstance(keys, Dataset) for i, key in enumerate(keys): # type: ignore r_key = type_restorer(key) self.key_dict[r_key] = i # type: ignore self.inv_key_dict[i] = r_key # type: ignore self.__store_dicts()
def __exit__(self, type, value, traceback): # type: ignore if self.__mode in self.__writemodes: self.__store_dicts()
[docs] def close(self) -> None: """Close the file and store changes to the index to disk """ self.__exit__(None, None, None) # type: ignore
@ensure_writeable def _create_matrix(self, first_slice: npt.NDArray[DType]) -> None: """Create a vectors colum in the HDF5 file and add the the vectors in `first_slice` Parameters ---------- first_slice : npt.NDArray[DType] A matrix """ vector_dim = first_slice.shape[1] with h5py.File(self.h5path, self.__mode) as hfile: if "vectors" not in hfile: hfile.create_dataset( # type: ignore "vectors", data=first_slice, maxshape=(None, vector_dim), dtype="f", chunks=True) @ensure_writeable def _create_keys(self, keys: Sequence[KT]) -> None: """Create a key column in the HDF5 file. Parameters ---------- keys : Sequence[KT] The keys that should be written """ with h5py.File(self.h5path, self.__mode) as hfile: converted_keys = keys_wrapper(keys) if "keys" not in hfile: hfile.create_dataset("keys", # type: ignore data = converted_keys, maxshape=(None,)) # type: ignore for i, key in enumerate(keys): self.key_dict[key] = i self.inv_key_dict[i] = key @ensure_writeable def _append_matrix(self, matrix: npt.NDArray[DType]) -> bool: """Append a matrix to storage (only for internal use) Parameters ---------- matrix : npt.NDArray[DType] A matrix. The vector dimension should match with this object Returns ------- bool [description] Raises ------ NoVectorsException [description] """ if not self.datasets_exist: raise NoVectorsException("Cannot append without existing vectors") with h5py.File(self.h5path, self.__mode) as hfile: dataset = hfile["vectors"] assert isinstance(dataset, Dataset) old_shape = dataset.shape # type: ignore mat_shape = matrix.shape assert mat_shape[1] == old_shape[1] new_shape = (dataset.shape[0] + mat_shape[0], mat_shape[1]) # type: ignore dataset.resize(size=new_shape) # type: ignore dataset[-mat_shape[0]:,:] = matrix return True @ensure_writeable def _append_keys(self, keys: Sequence[KT]) -> bool: """Append keys to the vector storage Parameters ---------- keys : Sequence[KT] The keys that should be appended to storage Returns ------- bool True, if the operation succeeded Raises ------ NoVectorsException If there are no vectors in storage, non can be appended """ if not self.datasets_exist: raise NoVectorsException("Cannot append without existing vectors") assert all(map(lambda k: k not in self.key_dict, keys)) new_keys = keys_wrapper(keys) # type: ignore with h5py.File(self.h5path, self.__mode) as hfile: key_set = hfile["keys"] assert isinstance(key_set, Dataset) old_shape = key_set.shape # type: ignore arr_shape = (len(new_keys),) new_shape = (old_shape[0] + arr_shape[0],) # type: ignore key_set.resize(size=new_shape) # type: ignore key_set[-arr_shape[0]:] = new_keys start_index: int = old_shape[0] # type: ignore for i, key in enumerate(keys): hdf5_idx = start_index + i self.key_dict[key] = hdf5_idx self.inv_key_dict[hdf5_idx] = key self.__store_dicts() return True def __getitem__(self, k: KT) -> npt.NDArray[DType]: if not self.datasets_exist: raise NoVectorsException("There are no vectors stored in this object") h5_idx = self.key_dict[k] with h5py.File(self.h5path, self.__mode) as hfile: dataset = hfile["vectors"] assert isinstance(dataset, Dataset) data = dataset[h5_idx,:] # type: ignore return data # type: ignore @ensure_writeable def __setitem__(self, k: KT, value: npt.NDArray[DType]) -> None: assert self.datasets_exist if k in self: h5_idx = self.key_dict[k] with h5py.File(self.h5path, self.__mode) as hfile: dataset = hfile["vectors"] assert isinstance(dataset, Dataset) dataset[h5_idx] = value # type: ignore return raise KeyError def __delitem__(self, v: KT) -> None: raise NotImplementedError def __contains__(self, item: object) -> bool: return item in self.key_dict def __iter__(self) -> Iterator[KT]: yield from self.key_dict
[docs] @ensure_writeable def add_bulk_matrix(self, keys: Sequence[KT], matrix: npt.NDArray[DType]) -> None: """Add matrices in bulk Parameters ---------- keys : Sequence[KT] A list of identifiers. The following should hold: `len(keys) == matrix.shape[0]` matrix : npt.NDArray[DType] A matrix. The rows should correspond with the identifiers in keys """ assert len(keys) == matrix.shape[0] if not self.datasets_exist: self._create_matrix(matrix) self._create_keys(keys) return if all(map(lambda k: k not in self.key_dict, keys)): if self._append_keys(keys): self._append_matrix(matrix) return
@ensure_writeable def _update_vectors(self, keys: Sequence[KT], values: Sequence[npt.NDArray[DType]]) -> None: """Update vectors in bulk Parameters ---------- keys : Sequence[KT] A list of identifiers values : Sequence[npt.NDArray[DType]] A list of new vectors """ assert len(keys) == len(values) if values: with h5py.File(self.h5path, self.__mode) as hfile: dataset = hfile["vectors"] assert isinstance(dataset, Dataset) for key, value in zip(keys, values): h5_idx = self.key_dict[key] dataset[h5_idx] = value # type: ignore
[docs] @ensure_writeable def add_bulk(self, input_keys: Sequence[KT], input_values: Sequence[Optional[npt.NDArray[DType]]]) -> None: """Add a bulk of keys and values (vectors) to the vector storage Parameters ---------- input_keys : Sequence[KT] The keys of the Instances input_values : Sequence[Optional[npt.NDArray[DType]]] The vectors that correspond with the indices """ assert len(input_keys) == len(input_values) and len(input_keys) > 0 # Filter all keys that do not have a vector (input_values may contain None) keys, values = filter_snd_none(input_keys, input_values) # type: ignore if not values: return # Check if the vector storage exists if not self.datasets_exist: matrix: npt.NDArray[DType] = np.vstack(values) # type: ignore self._create_keys(keys) self._create_matrix(matrix) return # Check if the keys do not already exist in storage if all(map(lambda k: k not in self.key_dict, keys)): # This is the ideal case, all vectors can directly # be appended as a matrix matrix = np.vstack(values) # type: ignore self.add_bulk_matrix(keys, matrix) return # Find out which (key, vector) pairs are already stored not_in_storage = filter(lambda kv: kv[0] not in self.key_dict, zip(keys, values)) in_storage = filter(lambda kv: kv[0] in self.key_dict, zip(keys, values)) # Update the already present key vector pairs old_keys, updated_vectors = list_unzip(in_storage) self._update_vectors(old_keys, updated_vectors) # Append the new key vector pairs new_keys, new_vectors = list_unzip(not_in_storage) if new_vectors: matrix: npt.NDArray[DType] = np.vstack(new_vectors) # type: ignore self.add_bulk_matrix(new_keys, matrix)
def _get_matrix(self, h5_idxs: Sequence[int]) -> Tuple[Sequence[KT], npt.NDArray[DType]]: """Return a matrix that correspond with the internal `h5_idxs`. Parameters ---------- h5_idxs : Sequence[int] A list of internal indices that correspond with the indices Returns ------- Tuple[Sequence[KT], npt.NDArray[DType]] A tuple containing: - The public indices (from the :class:`~allib.instances.InstanceProvider`) - A matrix where the rows map to the external indices Raises ------ NoVectorsException If there are no vectors stored in this object """ if not self.datasets_exist: raise NoVectorsException("There are no vectors stored in this object") with h5py.File(self.h5path, self.__mode) as dfile: dataset = dfile["vectors"] assert isinstance(dataset, Dataset) slices = get_range(h5_idxs) result_matrix: npt.NDArray[DType] = slicer(dataset, slices) # type: ignore included_keys = list(map(lambda idx: self.inv_key_dict[idx], h5_idxs)) return included_keys, result_matrix # type: ignore
[docs] def get_vectors(self, keys: Sequence[KT]) -> Tuple[Sequence[KT], Sequence[npt.NDArray[DType]]]: """Return the vectors that correspond with the `keys` Parameters ---------- keys : Sequence[KT] A list of identifier keys Returns ------- Tuple[Sequence[KT], Sequence[npt.NDArray[DType]]] A tuple containing two lists: - A list with identifier (order may differ from `keys` argument) - A list with vectors """ ret_keys, ret_matrix = self.get_matrix(keys) ret_vectors = matrix_to_vector_list(ret_matrix) return ret_keys, ret_vectors
[docs] def get_matrix(self, keys: Sequence[KT]) -> Tuple[Sequence[KT], npt.NDArray[DType]]: """Return a matrix containing the vectors that correspond with the `keys` Parameters ---------- keys : Sequence[KT] A list of identifier keys Returns ------- Tuple[Sequence[KT], npt.NDArray[DType]] A tuple containing: - A list with identifier keys (order may differ from `keys` argument) - A matrix containing the vectors (rows correspond with the returned list) Raises ------ NoVectorsException If there are no vectors returned """ if not self.datasets_exist: raise NoVectorsException("There are no vectors stored in this object") in_storage = frozenset(self.key_dict).intersection(keys) h5py_idxs = map(lambda k: self.key_dict[k], in_storage) sorted_keys = sorted(h5py_idxs) return self._get_matrix(sorted_keys)
[docs] def get_matrix_chunked(self, keys: Sequence[KT], chunk_size: int = 200) -> Iterator[Tuple[Sequence[KT], npt.NDArray[DType]]]: """Return matrices in chunks of `chunk_size` containing the vectors requested in `keys` Parameters ---------- keys : Sequence[KT] A list of identifier keys chunk_size : int, optional The size of the chunks, by default 200 Yields ------- Tuple[Sequence[KT], npt.NDArray[DType]] A tuple containing: - A list with identifier keys (order may differ from `keys` argument) - A matrix containing the vectors (rows correspond with the returned list) Raises ------ StopIteration When there are no more chunks to process """ if not self.datasets_exist: raise StopIteration in_storage = frozenset(self.key_dict).intersection(keys) h5py_idxs = map(lambda k: self.key_dict[k], in_storage) sorted_keys = sorted(h5py_idxs) chunks = divide_iterable_in_lists(sorted_keys, chunk_size) yield from map(self._get_matrix, chunks)
[docs] def get_vectors_chunked(self, keys: Sequence[KT], chunk_size: int = 200 ) -> Iterator[Tuple[Sequence[KT], Sequence[npt.NDArray[DType]]]]: """Return vectors in chunks of `chunk_size` containing the vectors requested in `keys` Parameters ---------- keys : Sequence[KT] A list of identifier keys chunk_size : int, optional The size of the chunks, by default 200 Yields ------- Tuple[Sequence[KT], Sequence[npt.NDArray[DType]]] A tuple containing two lists: - A list with identifiers (order may differ from `keys` argument) - A list with vectors """ results = itertools.starmap(matrix_tuple_to_vectors, self.get_matrix_chunked(keys, chunk_size)) yield from results # type: ignore
[docs] def get_vectors_zipped(self, keys: Sequence[KT], chunk_size: int = 200) -> Iterator[Sequence[Tuple[KT, npt.NDArray[DType]]]]: """Return vectors in chunks of `chunk_size` containing the vectors requested in `keys` Parameters ---------- keys : Sequence[KT] A list of identifier keys chunk_size : int, optional The size of the chunks, by default 200 Yields ------- Sequence[Tuple[KT, npt.NDArray[DType]]] A list containing tuples of: - An identifier (order may differ from `keys` argument) - A vector """ results = itertools.starmap(matrix_tuple_to_zipped, self.get_matrix_chunked(keys, chunk_size)) yield from results # type: ignore
[docs] def vectors_chunker(self, chunk_size: int = 200) -> Iterator[Sequence[Tuple[KT, npt.NDArray[DType]]]]: """Return vectors in chunks of `chunk_size`. This generator will yield all vectors contained in this object. Parameters ---------- chunk_size : int, optional The size of the chunks, by default 200 Yields ------- Sequence[Tuple[KT, npt.NDArray[DType]]] A list containing tuples of: - An identifier - A vector """ results = itertools.starmap(matrix_tuple_to_zipped, self.matrices_chunker(chunk_size)) yield from results # type: ignore
[docs] def matrices_chunker(self, chunk_size: int = 200): """Yield matrices in chunks of `chunk_size` containing all the vectors in this object Parameters ---------- chunk_size : int, optional The size of the chunks, by default 200 Yields ------- Tuple[Sequence[KT], npt.NDArray[DType]] A tuple containing: - A list with identifier keys - A matrix containing the vectors (row indices correspond with the list indices) Raises ------ StopIteration When there are no more chunks to process """ if not self.datasets_exist: raise StopIteration h5py_idxs = self.inv_key_dict.keys() sorted_keys = sorted(h5py_idxs) chunks = divide_iterable_in_lists(sorted_keys, chunk_size) yield from map(self._get_matrix, chunks)