说明
《Python 教程》 持续更新中,提供建议、纠错、催更等加作者微信: gairuo123(备注:pandas教程)和关注公众号「盖若」ID: gairuo。跟作者学习,请进入 Python学习课程。欢迎关注作者出版的书籍:《深入浅出Pandas》 和 《Python之光》。
Dask 是 Python 中用于大量数据并行计算的库,支持各种灵活的方法。Dask 可以轻松扩展您熟悉和喜爱的 Python 库,例如 NumPy、pandas 和 scikit-learn。
使用 Dask Futures 并行化任何 Python 代码,让您扩展任何函数和 for 循环,并在任何情况下为您提供解决能力。
Dask由两部分组成:
Dask 有以下优点:
以下是高级集合用于生成可由调度程序在单台机器或集群上执行的任务图:
Python 已经发展成为数据分析和通用编程领域的主导语言。 NumPy、pandas 和 scikit-learn 等计算库推动了这种增长。 然而,这些包的设计并不是为了扩展到单台机器之外。 Dask 被开发用于在数据集超过内存时将这些包和周围的生态系统本地扩展为多核机器和分布式集群。
以下是 Dask 的一些使用方法。
安装 Dask:
pip install dask # 仅安装 dask 核心功能
pip install "dask[complete]" # 安装所有功能
pip install "dask[array]" # array 支持
pip install "dask[dataframe]" # dataframe 支持
pip install "dask[diagnostics]" # 可视化dask诊断
pip install "dask[distributed]" # 分布式支持
conda install dask # conda 安装
常用导入模块及约定别名:
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.array as da
import dask.bag as db
根据您使用的数据类型,导入适当的模块。
利用现有数据构造 Dask 对象。
# 构造 DataFrame
index = pd.date_range("2021-09-01", periods=2400, freq="1H")
df = pd.DataFrame({"a": np.arange(2400),
"b": list("abcaddbe" * 300)},
index=index)
ddf = dd.from_pandas(df, npartitions=10)
ddf
'''
Dask DataFrame Structure:
a b
npartitions=10
2021-09-01 00:00:00 int64 object
2021-09-11 00:00:00 ... ...
... ...
2021-11-30 00:00:00 ... ...
2021-12-09 23:00:00 ... ...
Dask Name: from_pandas, 10 tasks
'''
现在我们有一个包含 2 列和 2400 行的 DataFrame,由 10 个分区组成,每个分区有 240 行。 每个分区代表一段数据。
一些关键的属性:
# 每个分区覆盖的索引值
ddf.divisions
# 访问特定分区
ddf.partitions[1]
Array 结构:
data = np.arange(100_000).reshape(200, 500)
a = da.from_array(data, chunks=(100, 100))
a
'''
dask.array<array, shape=(200, 500),
dtype=int64, chunksize=(100, 100),
chunktype=numpy.ndarray>
'''
现在我们有一个形状为 (200, 500) 的二维数组,由 10 个块组成,其中每个块的形状为 (100, 100)。 每个块代表一段数据。
以下是数组的一些关键属性:
# 检查块
a.chunks
# ((100, 100), (100, 100, 100, 100, 100))
# 访问特定的数据块
a.blocks[1, 3]
索引 Dask 集合就像切片 numpy 数组或 pandas 的 DataFrame。
ddf.b
'''
Dask Series Structure:
npartitions=10
2021-09-01 00:00:00 object
2021-09-11 00:00:00 ...
...
2021-11-30 00:00:00 ...
2021-12-09 23:00:00 ...
Name: b, dtype: object
Dask Name: getitem, 20 tasks
'''
ddf["2021-10-01": "2021-10-09 5:00"]
'''
Dask DataFrame Structure:
a b
npartitions=1
2021-10-01 00:00:00.000000000 int64 object
2021-10-09 05:00:59.999999999 ... ...
Dask Name: loc, 11 tasks
'''
Array 结构的索引:
a[:50, 200]
'''
dask.array<getitem, shape=(50,),
dtype=int64, chunksize=(50,),
chunktype=numpy.ndarray>
'''
Dask 采用懒惰评估机制,直到你要求计算的结果才会被计算出来。相应地,会生成一个用于计算的 Dask 任务图。
任何时候你有一个 Dask 对象并且你想得到结果,调用 compute 计算:
ddf.a.mean()
# dd.Scalar<series-..., dtype=float64>
ddf.a.mean().compute()
# 1199.5
ddf.b.unique()
'''
Dask Series Structure:
npartitions=1
object
...
Name: b, dtype: object
Dask Name: unique-agg, 33 tasks
'''
ddf.b.unique().compute()
'''
0 a
1 b
2 c
3 d
4 e
Name: b, dtype: object
'''
方法可以像 pandas 一样链接在一起:
result = ddf["2021-10-01": "2021-10-09 5:00"].a.cumsum() - 100
result
'''
Dask Series Structure:
npartitions=1
2021-10-01 00:00:00.000000000 int64
2021-10-09 05:00:59.999999999 ...
Name: a, dtype: int64
Dask Name: sub, 16 tasks
'''
result.compute()
'''
2021-10-01 00:00:00 620
2021-10-01 01:00:00 1341
2021-10-01 02:00:00 2063
2021-10-01 03:00:00 2786
2021-10-01 04:00:00 3510
...
2021-10-09 01:00:00 158301
2021-10-09 02:00:00 159215
2021-10-09 03:00:00 160130
2021-10-09 04:00:00 161046
2021-10-09 05:00:00 161963
Freq: H, Name: a, Length: 198, dtype: int64
'''
Array 的计算:
a.mean()
'''
dask.array<mean_agg-aggregate, shape=(),
dtype=float64, chunksize=(), chunktype=numpy.ndarray>
'''
a.mean().compute()
# 49999.5
np.sin(a)
'''
dask.array<sin, shape=(200, 500),
dtype=float64, chunksize=(100, 100),
chunktype=numpy.ndarray>
'''
np.sin(a).compute()
'''
array([[ 0. , 0.84147098, 0.90929743, ..., 0.58781939,
0.99834363, 0.49099533],
[-0.46777181, -0.9964717 , -0.60902011, ..., -0.89796748,
-0.85547315, -0.02646075],
[ 0.82687954, 0.9199906 , 0.16726654, ..., 0.99951642,
0.51387502, -0.4442207 ],
...,
[-0.99720859, -0.47596473, 0.48287891, ..., -0.76284376,
0.13191447, 0.90539115],
[ 0.84645538, 0.00929244, -0.83641393, ..., 0.37178568,
-0.5802765 , -0.99883514],
[-0.49906936, 0.45953849, 0.99564877, ..., 0.10563876,
0.89383946, 0.86024828]])
'''
a.T
'''
dask.array<transpose, shape=(500, 200),
dtype=int64, chunksize=(100, 100),
chunktype=numpy.ndarray>
'''
a.T.compute()
'''
array([[ 0, 500, 1000, ..., 98500, 99000, 99500],
[ 1, 501, 1001, ..., 98501, 99001, 99501],
[ 2, 502, 1002, ..., 98502, 99002, 99502],
...,
[ 497, 997, 1497, ..., 98997, 99497, 99997],
[ 498, 998, 1498, ..., 98998, 99498, 99998],
[ 499, 999, 1499, ..., 98999, 99499, 99999]])
'''
方法可以像在NumPy中一样链接在一起
b = a.max(axis=1)[::-1] + 10
b
'''
dask.array<add, shape=(200,),
dtype=int64, chunksize=(100,),
chunktype=numpy.ndarray>
'''
b[:10].compute()
'''
array([100009, 99509, 99009, 98509, 98009,
97509, 97009, 96509,
96009, 95509])
'''
通常在并行化现有代码库或构建自定义算法时,您会遇到可并行化的代码,但不仅仅是大型 DataFrame 或数组。
Dask Delayed 允许您将单个函数调用包装到一个延迟构造的任务图中:
import dask
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def add(x, y):
return x + y
a = inc(1) # 没有开始计算
b = inc(2) # 没有开始计算
c = add(a, b) # 没有开始计算
c = c.compute() # 这会触发上述所有计算
与之前描述的接口不同,Futures 是即将执行的,提交函数后立即开始计算。
from dask.distributed import Client
client = Client()
def inc(x):
return x + 1
def add(x, y):
return x + y
a = client.submit(inc, 1) # 任务立即开始
b = client.submit(inc, 2) # 任务立即开始
c = client.submit(add, a, b) # 任务立即开始
c = c.result() # 阻塞直到任务完成,然后收集结果
生成任务图后,调度程序的工作就是执行它。
默认情况下,当您在 Dask 对象上调用 compute 时,Dask 使用计算机上的线程池并行运行计算。
如果您想要更多控制,请改用分布式调度程序。 尽管名称中有“分布式”,但分布式调度程序在单台和多台机器上都能很好地工作。 将其视为“高级调度程序”。
用自己的计算机的集群的方式:
from dask.distributed import Client
client = Client()
client
# <Client: 'tcp://127.0.0.1:41703'
# processes=4 threads=12, memory=31.08 GiB>
连接到已在运行的集群的方式。
from dask.distributed import Client
client = Client("<url-of-scheduler>")
client
# <Client: 'tcp://127.0.0.1:41703' processes=4
# threads=12, memory=31.08 GiB>
以上各方法更加详细的内容可以参考官方文档。
更新时间:2023-03-14 08:42:03 标签:python dask 大数据 并行