polars_cloud.LazyFrameRemote.distributed#

LazyFrameRemote.distributed(
*,
shuffle_compression: ShuffleCompression = 'auto',
shuffle_format: ShuffleFormat = 'auto',
shuffle_compression_level: int | None = None,
sort_partitioned: bool = True,
pre_aggregation: bool = True,
expression_extraction: bool = False,
equi_join_broadcast_limit: int = 268435456,
partitions_per_worker: int | None = None,
single_worker_ops: SingleWorkerOps = 'auto',
) ExecuteRemote#

Whether the query should run in a distributed fashion.

Parameters:
shuffle_compression{‘auto’, ‘lz4’, ‘zstd’, ‘uncompressed’}

Compress files before shuffling them. Compression reduces disk and network IO, but disables memory mapping. Choose “zstd” for good compression performance. Choose “lz4” for fast compression/decompression. Choose “uncompressed” for memory mapped access at the expense of file size.

shuffle_format{‘auto’, ‘ipc’, ‘parquet’}

File format to use for shuffles.

shuffle_compression_level

Compression level of shuffle. If set to None it is decided by the optimizer.

sort_partitioned

Whether sort operations can be executed on multiple workers.

pre_aggregation

Whether group-by and selected aggregations are pre-aggregated on worker nodes if possible.

expression_extraction

Whether sub-expressions are extracted into a form amenable to distributed processing. For example filter(pl.col.x < pl.col.y.mean()), here the mean would be pre-aggregated if the option is set to True.

Warning

This functionality is experimental. It may be changed at any point without it being considered a breaking change.

equi_join_broadcast_limit

Whether equi joins are allowed to be converted from partitioned to broadcasted. The passed value is the maximum size in bytes to broadcasted. Set to 0 to disable broadcasting.

partitions_per_worker

Into how many parts to split the data when distributing work over workers. A higher number means less peak memory usage, but might mean slightly less performant execution.

single_worker_ops

Whether to allow memory-intensive operations to execute on a single worker. This can lead to a faster execution, but it also increases a risk of running out of RAM.

This option affects individual operations (joins, group-bys, etc.). For executing the whole query on a single worker, use single_node() instead of distributed().

  • Set to auto to let the planner decide which operations are safe to execute on a single worker. This option strikes a balance between safety and performance, and should be preferred for most queries.

  • Set to allow if a single worker has enough RAM to execute the whole query. This gives the planner the freedom to execute any part of the query on a single worker, if it leads to a faster plan, at the risk of the query failing if the worker runs out of RAM.

  • Set to forbid to force the planner to always execute memory-intensive operations in partitions. This is the safest option, but can make the plan slower.

Warning

This functionality is experimental. It may be changed at any point without it being considered a breaking change.

Examples

>>> ctx = pc.ComputeContext(cluster_size=10)
>>> query.remote(ctx).distributed().sink_parquet(...)