Icechunk#
This example shows how to perform large-scale distributed writes to Icechunk using Cubed (based on the examples for using Icechunk with Dask).
Install the package pre-requisites by running the following:
pip install cubed icechunk
Start by creating an Icechunk store.
import icechunk
import tempfile
# initialize the icechunk store
storage = icechunk.local_filesystem_storage(tempfile.TemporaryDirectory().name)
icechunk_repo = icechunk.Repository.create(storage)
icechunk_session = icechunk_repo.writable_session("main")
Write to Icechunk#
Use cubed.icechunk.store_icechunk
to write a Cubed array to an Icechunk store.
The API follows that of cubed.store()
.
First create a Cubed array to write:
import cubed
shape = (100, 100)
cubed_chunks = (20, 20)
cubed_array = cubed.random.random(shape, chunks=cubed_chunks)
Now create the Zarr array you will write to.
import zarr
zarr_chunks = (10, 10)
group = zarr.group(store=icechunk_session.store, overwrite=True)
zarray = group.create_array(
"array",
shape=shape,
chunks=zarr_chunks,
dtype="f8",
fill_value=float("nan"),
)
Note that the chunks in the store are a divisor of the Cubed chunks. This means each individual write task is independent, and will not conflict. It is your responsibility to ensure that such conflicts are avoided.
Now write
from cubed.icechunk import store_icechunk
store_icechunk(
icechunk_session,
sources=[cubed_array],
targets=[zarray]
)
Finally commit your changes!
print(icechunk_session.commit("wrote a cubed array!"))
F028GZQCC0NN9G5R0X6G
Read from Icechunk#
Use cubed.from_zarr()
to read from Icechunk - note that no special Icechunk-specific function is needed in this case.
cubed.from_zarr(store=icechunk_session.store, path="array")
|
Distributed writes#
In distributed contexts where the Session, and Zarr Array objects are sent across the network, you must opt-in to successful pickling of a writable store.
cubed.icechunk.store_icechunk
takes care of the hard bit of merging Sessions but it is required that you opt-in to pickling prior to creating the target Zarr array objects.
Here is an example:
from cubed import config
# start a new session. Old session is readonly after committing
icechunk_session = icechunk_repo.writable_session("main")
zarr_chunks = (10, 10)
# use the Cubed processes executor which requires pickling
with config.set({"spec.executor_name": "processes"}):
with icechunk_session.allow_pickling():
cubed_array = cubed.random.random(shape, chunks=cubed_chunks)
group = zarr.group(
store=icechunk_session.store,
overwrite=True
)
zarray = group.create_array(
"array",
shape=shape,
chunks=zarr_chunks,
dtype="f8",
fill_value=float("nan"),
)
store_icechunk(
icechunk_session,
sources=[cubed_array],
targets=[zarray]
)
print(icechunk_session.commit("wrote a cubed array!"))
3Y94AJZWTES2JYWA0Y9G