在Slurm中使用Dask

目录

本文介绍如何在中国气象局高性能计算机 CMA-PI 的并行节点中运行 Dask 程序。

准备环境

dask-mpi 库支持在诸如 Slurm 等 MPI 环境中部署 Dask 负载。

CMA-PI 上的 Python 环境尚未安装 dask-mpi 库。 我使用自己安装的 Anaconda 环境,通过代理在线安装 dask-mpi 库。 HPC 用户安装 Python 的具体方法请参考文章《HPC用户安装Python解决方案》。

注:未来可以连接 HPC 的工作电脑将无法访问互联网,上述文章中的部分方法将无法使用。

Python 脚本

初始化 MPI 环境仅需要一行语句。注意:

  • CMA-PI 使用 Infiniband 网卡,需要将 interface 参数设置为 IB 网卡名 "ib0"
  • 并行节点与外界不连通,无需启动面板,可以用 dashboard=False 关闭面板
from dask_mpi import initialize

initialize(
    interface="ib0",
    dashboard=False
)

后续与原有脚本相同,比如创建 Dask 客户端:

from dask.distributed import Client

client = Client()

运行脚本

使用 mpirun 运行脚本,类似如下语句

mpirun python ${script_dir}/bth/grib2_bth_dask.py \
    --start-time ${START_TIME} \
    --grib-orig-path ${GRIB_ORIG_PATH} \
    --output-path ${OUTPUT_PATH}

注意,为了使用 click 库并实时更新输出,还需要在运行 Python 脚本前额外设置环境变量:

export LC_ALL=en_US.UTF-8
export LANG=en_US.UTF-8
export PYTHONUNBUFFERED=1

问题

使用 client.close() 关闭 Dask 集群时会报错,但不影响程序结束。报错信息如下:

distributed.scheduler - INFO - Lost all workers
distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://188.2.212.70:47287 remote=tcp://188.2.212.70:41176>
Traceback (most recent call last):
  File "/g2/nwp_sp/app/anaconda3/envs/py39-op/lib/python3.9/site-packages/distributed/batched.py", line 93, in _background_send
    nbytes = yield self.comm.write(
  File "/g2/nwp_sp/app/anaconda3/envs/py39-op/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/g2/nwp_sp/app/anaconda3/envs/py39-op/lib/python3.9/site-packages/distributed/comm/tcp.py", line 241, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError

参考

dask-mpi

Dask 相关文章: