Source code for ch5mpy.h5array.array

# coding: utf-8

# ====================================================
# imports
from __future__ import annotations

from numbers import Number
from typing import TYPE_CHECKING, Any, Collection, Iterator, TypeVar

import numpy as np
import numpy.lib.mixins
import numpy.typing as npt

from ch5mpy._typing import NP_FUNC, SELECTOR
from ch5mpy.h5array import repr
from ch5mpy.h5array.chunks.iter import ChunkIterator, PairedChunkIterator
from ch5mpy.h5array.functions import HANDLED_FUNCTIONS
from ch5mpy.h5array.io import read_one_from_dataset, write_to_dataset
from ch5mpy.indexing import Selection, map_slice
from ch5mpy.objects.dataset import Dataset, DatasetWrapper
from ch5mpy.write import write_dataset

if TYPE_CHECKING:
    from ch5mpy.h5array.view import H5ArrayView


# ====================================================
# code
_T = TypeVar("_T", bound=np.generic, covariant=True)
SIZES = {"K": 1024, "M": 1024 * 1024, "G": 1024 * 1024 * 1024}


def get_size(s: int | str) -> int:
    value: int | None = None

    if isinstance(s, int):
        value = s

    elif s[-1] in SIZES and s[:-1].lstrip("-").isdigit():
        value = int(s[:-1]) * SIZES[s[-1]]

    elif s.isdigit():
        value = int(s)

    if value is None:
        raise ValueError(f"Unrecognized size '{s}'")

    if value <= 0:
        raise ValueError(f"Got invalid size ({value} <= 0).")

    return value


def as_array(values: Any, dtype: np.dtype[Any]) -> npt.NDArray[Any]:
    # FIXME : work on H5Arrays directly instead of converting to np.array
    if np.issubdtype(dtype, str):
        return np.array(values, dtype=bytes)

    try:
        return np.array(values, dtype=dtype)

    except ValueError:
        raise ValueError(f"Couldn't set value of type {type(values)} in H5Array of type {dtype}.")


def _dtype_repr(dset: Dataset[Any] | DatasetWrapper[Any]) -> str:
    if np.issubdtype(dset.dtype, np.str_):
        return f"'{dset.dtype}'"

    return str(dset.dtype)


[docs]class H5Array(Collection[_T], numpy.lib.mixins.NDArrayOperatorsMixin): """Wrapper around Dataset objects to interface with numpy's API.""" MAX_MEM_USAGE: int | str = "250M" # region magic methods
[docs] def __init__(self, dset: Dataset[_T] | DatasetWrapper[_T]): if not isinstance(dset, (Dataset, DatasetWrapper)): raise TypeError(f"Object of type '{type(dset)}' is not supported by H5Array.") if isinstance(dset, Dataset) and np.issubdtype(np.dtype(str(dset.attrs.get("dtype", "O"))), str): self._dset: Dataset[_T] | DatasetWrapper[_T] = dset.asstr() # type: ignore[assignment] else: self._dset = dset
def __repr__(self) -> str: return ( f"H5Array({repr.print_dataset(self, end='', padding=8, padding_skip_first=True)}, " f"shape={self.shape}, dtype={_dtype_repr(self._dset)})" ) def __str__(self) -> str: return repr.print_dataset(self, sep="") def __getitem__(self, index: SELECTOR | tuple[SELECTOR, ...]) -> _T | H5Array[_T] | H5ArrayView[_T]: from ch5mpy.h5array.view import H5ArrayView selection = Selection.from_selector(index, self.shape) if selection.is_empty: return H5Array(dset=self._dset) elif selection.compute_shape(self._dset.shape) == (): return read_one_from_dataset(self._dset, selection, self.dtype) else: return H5ArrayView(dset=self._dset, sel=selection) def __setitem__(self, index: SELECTOR | tuple[SELECTOR, ...], value: Any) -> None: selection = Selection.from_selector(index, self.shape) write_to_dataset(self._dset, as_array(value, self.dtype), selection) def __len__(self) -> int: return len(self._dset) def __iter__(self) -> Iterator[_T | npt.NDArray[_T] | H5Array[_T] | H5ArrayView[_T]]: # type: ignore[override] for i in range(self.shape[0]): yield self[i] def __contains__(self, item: Any) -> bool: for _, chunk in self.iter_chunks(): if item in chunk: return True return False def _inplace(self, func: NP_FUNC, value: Any) -> H5Array[_T]: if np.issubdtype(self.dtype, str): raise TypeError("Cannot perform inplace operation on str H5Array.") # special case : 0D array if self.shape == (): self._dset[:] = func(self._dset[:], value) return self # general case : 1D+ array for index, chunk in self.iter_chunks(): func(chunk, value, out=chunk) # write back result into array self.dset.write_direct( chunk, source_sel=map_slice(index, shift_to_zero=True), dest_sel=map_slice(index), ) return self def __add__(self, other: Any) -> Number | str | npt.NDArray[Any]: return np.array(self) + other # type: ignore[no-any-return] def __iadd__(self, other: Any) -> H5Array[_T]: return self._inplace(np.add, other) def __sub__(self, other: Any) -> Number | str | npt.NDArray[Any]: return np.array(self) - other # type: ignore[no-any-return] def __isub__(self, other: Any) -> H5Array[_T]: return self._inplace(np.subtract, other) def __mul__(self, other: Any) -> Number | str | npt.NDArray[Any]: return np.array(self) * other # type: ignore[no-any-return] def __imul__(self, other: Any) -> H5Array[_T]: return self._inplace(np.multiply, other) def __truediv__(self, other: Any) -> Number | str | npt.NDArray[Any]: return np.array(self) / other # type: ignore[no-any-return] def __itruediv__(self, other: Any) -> H5Array[_T]: return self._inplace(np.divide, other) def __mod__(self, other: Any) -> Number | str | npt.NDArray[Any]: return np.array(self) % other # type: ignore[no-any-return] def __imod__(self, other: Any) -> H5Array[_T]: return self._inplace(np.mod, other) def __pow__(self, other: Any) -> Number | str | npt.NDArray[Any]: return np.array(self) ** other # type: ignore[no-any-return] def __ipow__(self, other: Any) -> H5Array[_T]: return self._inplace(np.power, other) def __or__(self, other: Any) -> Number | npt.NDArray[Any]: return np.array(self) | other # type: ignore[no-any-return] def __ior__(self, other: Any) -> H5Array[_T]: return self._inplace(np.logical_or, other) def __and__(self, other: Any) -> Number | npt.NDArray[Any]: return np.array(self) & other # type: ignore[no-any-return] def __iand__(self, other: Any) -> H5Array[_T]: return self._inplace(np.logical_and, other) def __invert__(self) -> Number | npt.NDArray[Any]: return ~np.array(self) def __xor__(self, other: Any) -> Number | npt.NDArray[Any]: return np.array(self) ^ other # type: ignore[no-any-return] def __ixor__(self, other: Any) -> H5Array[_T]: return self._inplace(np.logical_xor, other) # endregion # region interface def __array__(self, dtype: npt.DTypeLike | None = None) -> npt.NDArray[Any]: array = np.array(self._dset) if dtype is None: return array return array.astype(dtype) def __array_ufunc__(self, ufunc: NP_FUNC, method: str, *inputs: Any, **kwargs: Any) -> Any: if method == "__call__": if ufunc not in HANDLED_FUNCTIONS: return NotImplemented return HANDLED_FUNCTIONS[ufunc](*inputs, **kwargs) else: raise NotImplementedError def __array_function__( self, func: NP_FUNC, types: tuple[type, ...], args: tuple[Any, ...], kwargs: dict[str, Any], ) -> Any: del types if func not in HANDLED_FUNCTIONS: return NotImplemented return HANDLED_FUNCTIONS[func](*args, **kwargs) # endregion # region predicates @property def is_chunked(self) -> bool: return self._dset.chunks is not None # endregion # region attributes @property def dset(self) -> Dataset[_T] | DatasetWrapper[_T]: return self._dset @property def chunk_size(self) -> int: """Get the size of a chunk (i.e. the nb of elements that can be read/written at a time).""" return get_size(self.MAX_MEM_USAGE) // self._dset.dtype.itemsize @property def shape(self) -> tuple[int, ...]: return self._dset.shape @property def dtype(self) -> np.dtype[_T]: return self._dset.dtype @property def ndim(self) -> int: return len(self.shape) @property def size(self) -> int: return int(np.prod(self.shape)) # endregion # region methods def _resize(self, amount: int, axis: int | tuple[int, ...] | None = None) -> None: if axis is None: axis = 0 if isinstance(axis, int): self._dset.resize(self.shape[axis] + amount, axis=axis) else: self._dset.resize([s + amount if i in axis else s for i, s in enumerate(self.shape)])
[docs] def expand(self, amount: int, axis: int | tuple[int, ...] | None = None) -> None: """ Resize an H5Array by adding `amount` elements along the selected axis. Raises: TypeError: if the H5Array does not wrap a chunked Dataset. """ self._resize(amount, axis)
[docs] def contract(self, amount: int, axis: int | tuple[int, ...] | None = None) -> None: """ Resize an H5Array by removing `amount` elements along the selected axis. Raises: TypeError: if the H5Array does not wrap a chunked Dataset. """ self._resize(-amount, axis)
[docs] def astype(self, dtype: npt.DTypeLike, inplace: bool = False) -> H5Array[Any]: """ Cast an H5Array to a specified dtype. This does not perform a copy, it returns a wrapper around the underlying H5 dataset. """ if np.issubdtype(dtype, str) and (np.issubdtype(self._dset.dtype, str) or self._dset.dtype == object): new_dset = self._dset.asstr() else: new_dset = self._dset.astype(dtype) if inplace: file, name = self._dset.file, self._dset.name del file[name] # FIXME : conversion to np happens anyway but might be expensive, could we save data without conversion ? write_dataset(file, name, np.array(new_dset), chunks=new_dset.chunks, maxshape=new_dset.maxshape) if file[name].dtype == object: self._dset = file[name].asstr() else: self._dset = file[name] return H5Array(new_dset)
[docs] def maptype(self, otype: type[Any]) -> H5Array[Any]: """ Cast an H5Array to any object type. This extends H5Array.astype() to any type <T>, where it is required that an object <T> can be constructed as T(v) for any value <v> in the dataset. """ return H5Array(self._dset.maptype(otype))
[docs] def iter_chunks(self, keepdims: bool = False) -> ChunkIterator: return ChunkIterator(self, keepdims)
[docs] def iter_chunks_with(self, other: npt.NDArray[Any] | H5Array[Any], keepdims: bool = False) -> PairedChunkIterator: return PairedChunkIterator(self, other, keepdims)
[docs] def read_direct( self, dest: npt.NDArray[_T], source_sel: tuple[slice, ...], dest_sel: tuple[slice, ...], ) -> None: dset = self._dset.asstr() if np.issubdtype(self.dtype, str) else self._dset dset.read_direct(dest, source_sel=source_sel, dest_sel=dest_sel)
[docs] def copy(self) -> npt.NDArray[_T]: return np.copy(self)
[docs] def min(self, axis: int | tuple[int, ...] | None = None) -> _T | npt.NDArray[_T]: return np.min(self, axis=axis) # type: ignore[no-any-return]
[docs] def max(self, axis: int | tuple[int, ...] | None = None) -> _T | npt.NDArray[_T]: return np.max(self, axis=axis) # type: ignore[no-any-return]
[docs] def mean(self, axis: int | tuple[int, ...] | None = None) -> Any | npt.NDArray[Any]: return np.mean(self, axis=axis)
# endregion