Skip to content
Merged
17 changes: 9 additions & 8 deletions bench/ndarray/fancy_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

NUMPY = True
BLOSC = True
ZARR = False
HDF5 = False
ZARR = True
HDF5 = True
SPARSE = False

NDIMS = 2 # must be at least 2
Expand All @@ -37,9 +37,9 @@ def genarray(r, ndims=2, verbose=True):
shape = (d,) * ndims
chunks = (d // 4,) * ndims
blocks = (max(d // 10, 1),) * ndims
urlpath = f'linspace{r}{ndims}D.b2nd'
t = time.time()
arr = blosc2.linspace(0, 1000, num=np.prod(shape), shape=shape, dtype=np.float64,
urlpath=f'linspace{r}{ndims}D.b2nd', mode='w')
arr = blosc2.linspace(0, 1000, num=np.prod(shape), shape=shape, dtype=np.float64, urlpath=urlpath, mode='w')
t = time.time() - t
arrsize = np.prod(arr.shape) * arr.dtype.itemsize / 2 ** 30
if verbose:
Expand Down Expand Up @@ -134,18 +134,19 @@ def timer(arr):
err = (mean - times.min(axis=1), times.max(axis=1)-mean)
plt.bar(x + w, mean , width, color=c, label=label, yerr=err, capsize=5, ecolor='k',
error_kw=dict(lw=2, capthick=2, ecolor='k'))
labs+=label
labs += label

filename = f"results{labs}{NDIMS}D" + "sparse" if SPARSE else f"results{labs}{NDIMS}D"
filename = f"{labs}{NDIMS}D" + "sparse" if SPARSE else f"{labs}{NDIMS}D"
filename += blosc2.__version__.replace('.','_')

with open(f"{filename}.pkl", 'wb') as f:
pickle.dump(result_tuple, f)
pickle.dump({'times':result_tuple, 'sizes':genuine_sizes}, f)

plt.xlabel('Array size (GB)')
plt.legend()
plt.xticks(x-width, np.round(genuine_sizes, 2))
plt.ylabel("Time (s)")
plt.title(f"Fancy indexing performance comparison, {NDIMS}D" +f"{" sparse" if SPARSE else ""}")
plt.title(f"Fancy indexing {blosc2.__version__}, {NDIMS}D" +f"{" sparse" if SPARSE else ""}")
plt.gca().set_yscale('log')
plt.savefig(f'plots/fancyIdx{filename}.png', format="png")
plt.show()
Expand Down
21 changes: 11 additions & 10 deletions bench/ndarray/fancy_index1D.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@

NUMPY = True
BLOSC = True
ZARR = True
HDF5 = True
SPARSE = True
ZARR = False
HDF5 = False
SPARSE = False

if HDF5:
SPARSE = True # HDF5 takes too long for non-sparse indexing
Expand All @@ -50,7 +50,7 @@ def genarray(r, verbose=True):
return arr, arrsize


target_sizes = np.float64(np.array([.2, .5, 1, 2, 5, 10]))
target_sizes = np.float64(np.array([.1, .2, .5, 1, 2, 3]))
rng = np.random.default_rng()
blosctimes = []
nptimes = []
Expand All @@ -61,14 +61,14 @@ def genarray(r, verbose=True):
arr, arrsize = genarray(d)
genuine_sizes += [arrsize]
idx = rng.integers(low=0, high=arr.shape[0], size=(1000,)) if SPARSE else rng.integers(low=0, high=arr.shape[0], size=(arr.shape[0]//4,))
sorted_idx = np.unique(np.sort(idx))
sorted_idx = np.sort(np.unique(idx))
## Test fancy indexing for different use cases
def timer(arr):
time_list = []
if not (HDF5 or ZARR):
b = arr[[[sorted_idx], [idx]]]
time_list += [time.time() - t]
t = time.time()
t = time.time()
b = arr[[[idx[::-1]], [idx]]]
time_list += [time.time() - t]
t = time.time()
b = arr[sorted_idx] if HDF5 else arr[idx]
time_list += [time.time() - t]
Expand Down Expand Up @@ -114,15 +114,16 @@ def timer(arr):
error_kw=dict(lw=2, capthick=2, ecolor='k'))
labs+=label

filename = f"results{labs}1Dsparse" if SPARSE else f"results{labs}1D"
filename = f"{labs}1Dsparse" if SPARSE else f"{labs}1D"
filename+=blosc2.__version__.replace('.','_')
with open(filename+".pkl", 'wb') as f:
pickle.dump({'times':result_tuple, 'sizes':genuine_sizes}, f)

plt.xlabel('Array size (GB)')
plt.legend()
plt.xticks(x-width, np.round(genuine_sizes, 2))
plt.ylabel("Time (s)")
plt.title(f"Fancy indexing performance comparison, 1D {' sparse' if SPARSE else ''}")
plt.title(f"Fancy indexing {blosc2.__version__}, 1D {' sparse' if SPARSE else ''}")
plt.gca().set_yscale('log')
plt.savefig(f'plots/{filename}.png', format="png")
plt.show()
Expand Down
233 changes: 184 additions & 49 deletions src/blosc2/ndarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import ndindex
import numpy as np
from ndindex.subindex_helpers import ceiling

import blosc2
from blosc2 import SpecialValue, blosc2_ext, compute_chunks_blocks
Expand Down Expand Up @@ -1476,66 +1477,124 @@ def get_fselection_numpy(self, key: list | np.ndarray) -> np.ndarray:
out: np.ndarray

"""
# TODO: Make this faster for broadcasted keys
## Can`t do this because ndindex doesn't support all the same indexing cases as Numpy
# TODO: Make this faster and avoid running out of memory - avoid broadcasting keys

## Can't do this because ndindex doesn't support all the same indexing cases as Numpy
# if math.prod(self.shape) * self.dtype.itemsize < blosc2.MAX_FAST_PATH_SIZE:
# return self[:][key] # load into memory for smallish arrays
shape = self.shape
chunks = self.chunks

# TODO: try to optimise and avoid this expand which seems to copy - maybe np.broadcast
_slice = ndindex.ndindex(key).expand(shape)
out_shape = _slice.newshape(shape)
out = np.empty(out_shape, dtype=self.dtype)

if self.ndim == 1:
# Fast path so we can avoid the costly lines in slow path
# (sub_idx = _slice.as_subindex(c).raw, sel_idx = c.as_subindex(_slice))
key = np.array(key)
if np.any(np.diff(key) < 0):
idx_order = np.argsort(key)
sorted_idxs = key[idx_order]
_slice = _slice.raw
# now all indices are slices or arrays of integers (or booleans)
# moreover, all arrays are consecutive (otherwise an error is raised)

if np.all([isinstance(s, (slice, np.ndarray)) for s in _slice]) and np.all(
[s.dtype is not bool for s in _slice if isinstance(s, np.ndarray)]
):
chunks = np.array(chunks)
# |------|
# ------| arrs |------
arridxs = [i for i, s in enumerate(_slice) if isinstance(s, np.ndarray)]
begin, end = arridxs[0], arridxs[-1] + 1
flat_shape = tuple((i.stop - i.start + (i.step - 1)) // i.step for i in _slice[:begin])
idx_dim = np.prod(_slice[begin].shape)

# TODO: find a nicer way to do the copy maybe
arr = np.empty((idx_dim, end - begin), dtype=_slice[begin].dtype)
for i, s in enumerate(_slice[begin:end]):
arr[:, i] = s.reshape(-1) # have to do a copy

flat_shape += (idx_dim,)
flat_shape += tuple((i.stop - i.start + (i.step - 1)) // i.step for i in _slice[end:])
# out_shape could have new dims if indexing arrays are not all 1D
# (we have just flattened them so need to handle accordingly)
prior_tuple = _slice[:begin]
post_tuple = _slice[end:]
divider = chunks[begin:end]
chunked_arr = arr // divider
if arr.shape[-1] == 1: # 1D chunks, can avoid loading whole chunks
idx_order = np.argsort(arr.squeeze(axis=1), axis=-1) # sort by real index
chunk_nitems = np.bincount(chunked_arr.reshape(-1), minlength=self.schunk.nchunks)
unique_chunks = np.nonzero(chunk_nitems)[0][:, None] # add dummy axis
chunk_nitems = chunk_nitems[unique_chunks]
else:
idx_order = None
sorted_idxs = key
chunk_nitems = np.bincount(
sorted_idxs // chunks[0], minlength=self.schunk.nchunks
) # number of items per chunk
chunked_arr = np.ascontiguousarray(
chunked_arr
) # ensure C-order memory to allow structured dtype view
# TODO: check that avoids sort and copy (alternative: maybe do a bincount with structured data types?)
_, row_ids, idx_inv, chunk_nitems = np.unique(
chunked_arr.view([("", chunked_arr.dtype)] * chunked_arr.shape[1]),
return_counts=True,
return_index=True,
return_inverse=True,
)
unique_chunks = chunked_arr[row_ids]
idx_order = np.argsort(
idx_inv.squeeze(-1)
) # sort by chunks (can't sort by index since larger index could belong to lower chunk)
# e.g. chunks of (100, 10) means (50, 15) has chunk idx (0,1) but (60,5) has (0, 0)
sorted_idxs = arr[idx_order]
out = np.empty(flat_shape, dtype=self.dtype)
shape = np.array(shape)

chunk_nitems_cumsum = np.cumsum(chunk_nitems)
for chunk_i, c in enumerate(chunk_nitems):
if c == 0:
continue # no items in chunk
cprior_slices = [
slice_to_chunktuple(s, c) for s, c in zip(prior_tuple, chunks[:begin], strict=True)
]
cpost_slices = [slice_to_chunktuple(s, c) for s, c in zip(post_tuple, chunks[end:], strict=True)]
# TODO: rewrite to allow interleaved slices/array indexes
for chunk_i, chunk_idx in enumerate(unique_chunks):
start = 0 if chunk_i == 0 else chunk_nitems_cumsum[chunk_i - 1]
stop = chunk_nitems_cumsum[chunk_i]
selection = sorted_idxs[start:stop]
out_selection = idx_order[start:stop] if idx_order is not None else slice(start, stop)
to_be_loaded = np.empty((selection[-1] - selection[0] + 1,), dtype=self.dtype)
# if len(selection) < 10:
# # TODO: optimise for case of few elements and go index-by-index
# selector = out_selection.start + np.arange(out_selection.stop) if isinstance(out_selection, slice) else out_selection
# for j, idx in enumerate(sorted_idxs[start:stop]):
# out[out_selection[j]] = self[idx]
# else:
super().get_slice_numpy(
to_be_loaded, ((selection[0],), (selection[-1] + 1,))
) # get relevant section of chunk
loc_idx = selection - selection[0]
out[out_selection] = to_be_loaded[loc_idx]

else:
chunk_size = ndindex.ChunkSize(chunks)
# repeated indices are grouped together
intersecting_chunks = chunk_size.as_subchunks(
_slice, shape
) # if _slice is (), returns all chunks
for c in intersecting_chunks:
sub_idx = _slice.as_subindex(c).raw
sel_idx = c.as_subindex(_slice)
new_shape = sel_idx.newshape(out_shape)
start, stop, step = get_ndarray_start_stop(self.ndim, c.raw, shape)
chunk = np.empty(
tuple(sp - st for st, sp in zip(start, stop, strict=True)), dtype=self.dtype
)
super().get_slice_numpy(chunk, (start, stop))
out[sel_idx.raw] = chunk[sub_idx].reshape(new_shape)
out_mid_selection = (idx_order[start:stop],)
if (
arr.shape[-1] == 1
): # can avoid loading in whole chunk if 1D for array indexed chunks, a bit faster
chunk_begin = selection[0]
chunk_end = selection[-1] + 1
else:
chunk_begin = chunk_idx * chunks[begin:end]
chunk_end = np.minimum((chunk_idx + 1) * chunks[begin:end], shape[begin:end])
loc_mid_selection = tuple(a for a in (selection - chunk_begin).T)

# loop over chunks coming from slices before and after array indices
for cprior_tuple in product(*cprior_slices):
out_prior_selection, prior_selection, loc_prior_selection = _get_selection(
cprior_tuple, prior_tuple, chunks[:begin]
)
for cpost_tuple in product(*cpost_slices):
out_post_selection, post_selection, loc_post_selection = _get_selection(
cpost_tuple, post_tuple, chunks[end:] if end is not None else []
)
locbegin, locend = _get_local_slice(
prior_selection, post_selection, (chunk_begin, chunk_end)
)
to_be_loaded = np.empty(locend - locbegin, dtype=self.dtype)
# basically load whole chunk, except for slice part at beginning and end
super().get_slice_numpy(to_be_loaded, (locbegin, locend))
loc_idx = loc_prior_selection + loc_mid_selection + loc_post_selection
out_idx = out_prior_selection + out_mid_selection + out_post_selection
out[out_idx] = to_be_loaded[loc_idx]
return out.reshape(out_shape) # should have filled in correct order, just need to reshape

# Default when there are booleans
out = np.empty(out_shape, dtype=self.dtype)
chunk_size = ndindex.ChunkSize(chunks)
# repeated indices are grouped together
intersecting_chunks = chunk_size.as_subchunks(_slice, shape) # if _slice is (), returns all chunks
for c in intersecting_chunks:
sub_idx = _slice.as_subindex(c).raw
sel_idx = c.as_subindex(_slice)
new_shape = sel_idx.newshape(out_shape)
start, stop, step = get_ndarray_start_stop(self.ndim, c.raw, shape)
chunk = np.empty(tuple(sp - st for st, sp in zip(start, stop, strict=True)), dtype=self.dtype)
super().get_slice_numpy(chunk, (start, stop))
out[sel_idx.raw] = chunk[sub_idx].reshape(new_shape)

return out

Expand Down Expand Up @@ -4519,3 +4578,79 @@ def __setitem__(self, selection, input) -> np.ndarray:

# def __setitem__(self, selection, input) -> np.ndarray:
# return NotImplementedError


def slice_to_chunktuple(s, n):
"""
Adapted from _slice_iter in ndindex.ChunkSize.as_subchunks.
Parameters
----------
s : slice
A slice object with start, stop, and step attributes.
n : int
The number of elements in the chunk axis

Returns
-------
out: tuple
"""
start, stop, step = s.start, s.stop, s.step
if step > n:
return tuple((start + k * step) // n for k in range(ceiling(stop - start, step)))
else:
return tuple(range(start // n, ceiling(stop, n)))


def _get_selection(ctuple, ptuple, chunks):
# we assume that at least one element of chunk intersects with the slice
# (as a consequence of only looping over intersecting chunks)
pselection = ()
for i, s, csize in zip(ctuple, ptuple, chunks, strict=True):
# we need to advance to first element within chunk that intersects with slice, not
# necessarily the first element of chunk
# i * csize = s.start + n*step + k, already added n+1 elements, k in [1, step]
np1 = (i * csize - s.start + s.step - 1) // s.step # gives (n + 1)
# can have n = -1 if s.start > i * csize, but never < -1 since have to intersect with chunk
pselection += (
slice(
builtins.max(s.start, s.start + np1 * s.step), # start+(n+1)*step gives i*csize if k=step
builtins.min(csize * (i + 1), s.stop),
s.step,
),
)

# selection relative to coordinates of out (necessarily step = 1)
# stop = start + step * n + k => n = (stop - start - 1) // step
# hence, out_stop = out_start + n + 1
# ps.start = pt.start + out_start * step
out_pselection = tuple(
slice(
(ps.start - pt.start + pt.step - 1) // pt.step,
(ps.start - pt.start + pt.step - 1) // pt.step + (ps.stop - ps.start + ps.step - 1) // ps.step,
1,
)
for ps, pt in zip(pselection, ptuple, strict=True)
)

loc_selection = tuple(slice(0, s.stop - s.start, s.step) for s in pselection)

return out_pselection, pselection, loc_selection


def _get_local_slice(prior_selection, post_selection, chunk_bounds):
chunk_begin, chunk_end = chunk_bounds
locbegin = np.hstack(
(
[s.start for s in prior_selection],
chunk_begin,
[s.start for s in post_selection],
),
casting="unsafe",
dtype="int64",
)
locend = np.hstack(
([s.stop for s in prior_selection], chunk_end, [s.stop for s in post_selection]),
casting="unsafe",
dtype="int64",
)
return locbegin, locend
Loading
Loading