polars_cloud.LazyFrameRemote.distributed#

LazyFrameRemote.distributed(
*,
shuffle_compression: ShuffleCompression = 'auto',
shuffle_format: ShuffleFormat = 'auto',
shuffle_compression_level: int | None = None,
sort_partitioned: bool = True,
pre_aggregation: bool = True,
equi_join_broadcast_limit: int = 268435456,
) ExecuteRemote#

Whether the query should run in a distributed fashion.

Parameters:
shuffle_compression{‘auto’, ‘lz4’, ‘zstd’, ‘uncompressed’}

Compress files before shuffling them. Compression reduces disk and network IO, but disables memory mapping. Choose “zstd” for good compression performance. Choose “lz4” for fast compression/decompression. Choose “uncompressed” for memory mapped access at the expense of file size.

shuffle_format{‘auto’, ‘ipc’, ‘parquet’}

File format to use for shuffles.

shuffle_compression_level

Compression level of shuffle. If set to None it is decided by the optimizer.

sort_partitioned

Whether group-by and selected aggregations are pre-aggregated on worker nodes.

pre_aggregation

Whether group-by and selected aggregations are pre-aggregated on worker nodes if possible.

equi_join_broadcast_limit

Whether equi joins are allowed to be converted from partitioned to broadcasted. The passed value is the maximum size in bytes to broadcasted. Set to 0 to disable broadcasting.

Examples

>>> ctx = pc.ComputeContext(cluster_size=10)
>>> query.remote(ctx).distributed().sink_parquet(...)