DASK==延迟对象delayed
官方文档地址
Dask Delayed — Dask documentation
总结为:
输出函数延时
dataframe延时
统一compute
代码:
import dask.dataframe as dd
import fsspec
import pandas as pd
from dask.delayed import delayed
import os
# 创建一个模拟的 _write_csv 函数
def _write_csv(df, fil, *, depend_on=None, **kwargs):
print(df)
print(fil)
print(kwargs)
with fil as f:
df.to_csv(f, **kwargs)
return os.path.normpath(fil.path)
if __name__ == '__main__':
# 创建测试数据
data = {
'A': range(10),
'B': range(10, 20)
}
df = pd.DataFrame(data)
# 将 Pandas DataFrame 转为 Dask DataFrame,分区设置为2
print('dd.from_pandas(df, npartitions=2)')
dask_df = dd.from_pandas(df, npartitions=2)
print(dask_df)
print('dd.from_pandas(df, npartitions=2)')
print('dfs = dask_df.to_delayed()')
dfs = dask_df.to_delayed()
print(dfs)
print('dfs = dask_df.to_delayed()')
# 设置文件名
first_file = fsspec.open('output1.csv', 'w')
second_file = fsspec.open('output2.csv', 'w')
files = [first_file, second_file]
to_csv_chunk = delayed(_write_csv, pure=False)
print(to_csv_chunk)
kwargs = {}
values = [to_csv_chunk(dfs[0], files[0], **kwargs)]
values.extend(
[to_csv_chunk(d, f, **kwargs) for d, f in zip(dfs[1:], files[1:])]
)
print(values)
# 执行计算并保存文件
import dask
compute_kwargs = {}
aa = list(dask.compute(*values, **compute_kwargs))
print(aa)