在Slurm中使用Dask
目录
本文介绍如何在中国气象局高性能计算机 CMA-PI 的并行节点中运行 Dask 程序。
准备环境
dask-mpi 库支持在诸如 Slurm 等 MPI 环境中部署 Dask 负载。
CMA-PI 上的 Python 环境尚未安装 dask-mpi 库。 我使用自己安装的 Anaconda 环境,通过代理在线安装 dask-mpi 库。 HPC 用户安装 Python 的具体方法请参考文章《HPC用户安装Python解决方案》。
注:未来可以连接 HPC 的工作电脑将无法访问互联网,上述文章中的部分方法将无法使用。
Python 脚本
初始化 MPI 环境仅需要一行语句。注意:
- CMA-PI 使用 Infiniband 网卡,需要将
interface
参数设置为 IB 网卡名"ib0"
- 并行节点与外界不连通,无需启动面板,可以用
dashboard=False
关闭面板
from dask_mpi import initialize
initialize(
interface="ib0",
dashboard=False
)
后续与原有脚本相同,比如创建 Dask 客户端:
from dask.distributed import Client
client = Client()
运行脚本
使用 mpirun
运行脚本,类似如下语句
mpirun python ${script_dir}/bth/grib2_bth_dask.py \
--start-time ${START_TIME} \
--grib-orig-path ${GRIB_ORIG_PATH} \
--output-path ${OUTPUT_PATH}
注意,为了使用 click 库并实时更新输出,还需要在运行 Python 脚本前额外设置环境变量:
export LC_ALL=en_US.UTF-8
export LANG=en_US.UTF-8
export PYTHONUNBUFFERED=1
问题
使用 client.close()
关闭 Dask 集群时会报错,但不影响程序结束。报错信息如下:
distributed.scheduler - INFO - Lost all workers
distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://188.2.212.70:47287 remote=tcp://188.2.212.70:41176>
Traceback (most recent call last):
File "/g2/nwp_sp/app/anaconda3/envs/py39-op/lib/python3.9/site-packages/distributed/batched.py", line 93, in _background_send
nbytes = yield self.comm.write(
File "/g2/nwp_sp/app/anaconda3/envs/py39-op/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/g2/nwp_sp/app/anaconda3/envs/py39-op/lib/python3.9/site-packages/distributed/comm/tcp.py", line 241, in write
raise CommClosedError()
distributed.comm.core.CommClosedError
参考
Dask 相关文章: