polars_cloud.LazyFrameRemote.distributed#
- LazyFrameRemote.distributed(
- *,
- shuffle_compression: ShuffleCompression = 'auto',
- shuffle_format: ShuffleFormat = 'auto',
- shuffle_compression_level: int | None = None,
- sort_partitioned: bool = True,
- pre_aggregation: bool = True,
- expression_extraction: bool = False,
- equi_join_broadcast_limit: int = 268435456,
- partitions_per_worker: int | None = None,
- single_worker_ops: SingleWorkerOps = 'auto',
Whether the query should run in a distributed fashion.
- Parameters:
- shuffle_compression{‘auto’, ‘lz4’, ‘zstd’, ‘uncompressed’}
Compress files before shuffling them. Compression reduces disk and network IO, but disables memory mapping. Choose “zstd” for good compression performance. Choose “lz4” for fast compression/decompression. Choose “uncompressed” for memory mapped access at the expense of file size.
- shuffle_format{‘auto’, ‘ipc’, ‘parquet’}
File format to use for shuffles.
- shuffle_compression_level
Compression level of shuffle. If set to
Noneit is decided by the optimizer.- sort_partitioned
Whether sort operations can be executed on multiple workers.
- pre_aggregation
Whether group-by and selected aggregations are pre-aggregated on worker nodes if possible.
- expression_extraction
Whether sub-expressions are extracted into a form amenable to distributed processing. For example
filter(pl.col.x < pl.col.y.mean()), here themeanwould be pre-aggregated if the option is set toTrue.Warning
This functionality is experimental. It may be changed at any point without it being considered a breaking change.
- equi_join_broadcast_limit
Whether equi joins are allowed to be converted from partitioned to broadcasted. The passed value is the maximum size in bytes to broadcasted. Set to 0 to disable broadcasting.
- partitions_per_worker
Into how many parts to split the data when distributing work over workers. A higher number means less peak memory usage, but might mean slightly less performant execution.
- single_worker_ops
Whether to allow memory-intensive operations to execute on a single worker. This can lead to a faster execution, but it also increases a risk of running out of RAM.
This option affects individual operations (joins, group-bys, etc.). For executing the whole query on a single worker, use
single_node()instead ofdistributed().Set to
autoto let the planner decide which operations are safe to execute on a single worker. This option strikes a balance between safety and performance, and should be preferred for most queries.Set to
allowif a single worker has enough RAM to execute the whole query. This gives the planner the freedom to execute any part of the query on a single worker, if it leads to a faster plan, at the risk of the query failing if the worker runs out of RAM.Set to
forbidto force the planner to always execute memory-intensive operations in partitions. This is the safest option, but can make the plan slower.
Warning
This functionality is experimental. It may be changed at any point without it being considered a breaking change.
Examples
>>> ctx = pc.ComputeContext(cluster_size=10) >>> query.remote(ctx).distributed().sink_parquet(...)