How to Work with Dask for Large Datasets#
This guide covers using Dask with xugrid to handle datasets too large to fit in memory, and to parallelize computations.
Loading Data Lazily#
By default, xu.open_dataset() loads data lazily using Dask:
import xugrid as xu
# Data is NOT loaded into memory
uds = xu.open_dataset("large_model.nc")
# Check that data is lazy (Dask array)
print(uds["temperature"].data)
# <dask.array<...>>
Specify chunks for optimal performance:
# Chunk along time dimension
uds = xu.open_dataset("large_model.nc", chunks={"time": 10})
# Or let xarray choose chunks
uds = xu.open_dataset("large_model.nc", chunks="auto")
Loading from Zarr#
Zarr is inherently chunked and works seamlessly with Dask:
# Zarr stores are naturally lazy
uds = xu.open_zarr("large_model.zarr")
# Remote data (S3, GCS, Azure)
uds = xu.open_zarr("s3://bucket/model.zarr")
Lazy Computations#
Operations on Dask-backed arrays are lazy:
uds = xu.open_dataset("large_model.nc", chunks={"time": 10})
temp = uds["temperature"]
# These operations build a task graph, no computation yet
temp_celsius = temp - 273.15
temp_mean = temp_celsius.mean(dim="time")
temp_anomaly = temp_celsius - temp_mean
# Compute when ready
result = temp_anomaly.compute()
Triggering Computation#
Several ways to trigger computation:
# Explicit compute
result = lazy_array.compute()
# Load into memory
result = lazy_array.load()
# Save to file (computes as it writes)
lazy_array.ugrid.to_zarr("output.zarr")
lazy_array.ugrid.to_netcdf("output.nc")
# Plotting (computes the visible data)
lazy_array.ugrid.plot()
Using a Dask Cluster#
For large computations, use a distributed cluster:
from dask.distributed import Client, LocalCluster
# Local cluster (uses all CPU cores)
cluster = LocalCluster()
client = Client(cluster)
# Now computations are distributed
uds = xu.open_dataset("large_model.nc", chunks={"time": 10})
result = uds["temperature"].mean(dim="time").compute()
# View progress in dashboard
print(client.dashboard_link)
For HPC or cloud:
# Connect to existing scheduler
client = Client("tcp://scheduler-address:8786")
# Or use dask-jobqueue for HPC
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(cores=4, memory="16GB")
cluster.scale(jobs=10)
client = Client(cluster)
Regridding with Dask#
Regridders work with Dask arrays:
source = xu.open_dataset("source.nc", chunks={"time": 10})["var"]
target = xu.data.disk()
# Create regridder
regridder = xu.OverlapRegridder(source=source, target=target)
# Regrid is lazy
result = regridder.regrid(source)
print(type(result.data)) # Dask array
# Save lazily
result.ugrid.to_zarr("regridded.zarr")
Chunking Strategies#
Unstructured grids have a single spatial dimension (faces), which affects chunking strategies:
# Good: chunk along non-spatial dimensions
uds = xu.open_dataset("model.nc", chunks={
"time": 10,
"layer": 5,
# Don't chunk the face dimension - it's the spatial dimension
})
# The face dimension is typically not chunked because:
# 1. Spatial operations need all faces
# 2. Grid topology connects faces in complex patterns
For very large grids, consider partitioning instead of chunking:
# Partition spatially (requires pymetis)
partitions = uds.ugrid.partition(n_parts=4)
# Process partitions separately
results = []
for part in partitions:
result = process(part)
results.append(result)
# Merge back
combined = xu.merge_partitions(results)
Memory Management#
Monitor and control memory usage:
# Check chunk sizes
uds = xu.open_dataset("model.nc", chunks={"time": 10})
print(uds["temperature"].data) # Shows chunk sizes
# Rechunk if needed
rechunked = uds["temperature"].chunk({"time": 5})
# Persist intermediate results to cluster memory
from dask.distributed import Client
client = Client()
intermediate = expensive_computation(uds)
intermediate = client.persist(intermediate) # Keep in cluster memory
# Use for multiple downstream computations
result1 = analysis1(intermediate)
result2 = analysis2(intermediate)
Writing Large Results#
Write results efficiently:
# Zarr is best for Dask - writes chunks in parallel
result.ugrid.to_zarr("output.zarr")
# NetCDF works but is sequential
result.ugrid.to_netcdf("output.nc")
# For very large results, compute in chunks
for t in range(0, n_times, chunk_size):
chunk = result.isel(time=slice(t, t + chunk_size)).compute()
chunk.ugrid.to_netcdf(f"output_{t:04d}.nc")
Parallel I/O with Zarr#
Zarr enables parallel reads and writes:
# Write in parallel (each worker writes its chunks)
result.ugrid.to_zarr("output.zarr")
# Read in parallel (each worker reads needed chunks)
uds = xu.open_zarr("output.zarr")
result = uds["var"].mean().compute() # Parallel reduction
Debugging Dask Computations#
Visualize the task graph:
# Install graphviz for visualization
result.data.visualize("task_graph.png")
# Check task count
print(len(result.data.__dask_graph__()))
# Profile execution
from dask.diagnostics import ProgressBar
with ProgressBar():
result.compute()
Common Pitfalls#
Accidental compute: Some operations trigger computation unexpectedly:
# These compute immediately:
len(uda) # Needs to count
float(uda.mean()) # Needs the value
uda.values # Converts to numpy
# These stay lazy:
uda.mean() # Returns lazy array
uda.data # Returns Dask array
Chunking spatial dimension: Avoid chunking the face dimension:
# Avoid this - breaks spatial operations
uds = xu.open_dataset("model.nc", chunks={"mesh2d_nFaces": 1000})
Too many tasks: Many small chunks create overhead:
# Too granular
uds = xu.open_dataset("model.nc", chunks={"time": 1})
# Better
uds = xu.open_dataset("model.nc", chunks={"time": 100})
Not using a scheduler: For large jobs, always use distributed:
from dask.distributed import Client
client = Client() # Use local cluster at minimum