学习R语言:简单并行计算

目录

本文内容来自《R 语言编程艺术》(The Art of R Programming),有部分修改

不要对并行抱有太高期望,很多情况下并行版本实际上比串行版本运行速度更慢。

共同外链问题

mutual outlink

对于 n x n 矩阵

sum = 0
for i = 0...n-1
  for j = i+1...n-1
    for k = 0...n-1 sum = sum + a[i][k] * a[j][k]
mean = sum / (n*(n-1)/2)

snow 包简介

定义问题

上述问题的源代码

mtl <- function(ichunk, m) {
  n <- ncol(m)
  matches <- 0
  for (i in ichunk) {
    if (i < n) {
      rowi <- m[i,]
      matches <- matches + 
        sum(m[(i+1):n,] %*% rowi)
    }
  }
  return (matches)
}
mutlinks <- function(cls, m) {
  n <- nrow(m)
  nc <- length(cls)
  
  ichunks <- split(1:n, 1:nc)
  
  counts <- clusterApply(cls, ichunks, mtl, m)
  do.call(sum, counts) / (n*(n-1)/2)
}

其中 split() 函数用于将矩阵分块

split(1:4, 1:2)
$`1`
[1] 1 3

$`2`
[1] 2 4

运行 snow 代码

加载 snow 包

library(snow)

创建 snow 集群,在本机开启两个新进程,称为 worker。 原有 R 进程称为 manager。

所有 worker 形成一个集群 cluster。

snow 使用 分布/聚合 (scatter/gather) 范式:

  • manager 将数据分割为若干块,打包发送给 worker
  • 每个 worker 处理自己的分块
  • manager 从 worker 收集结果 (聚合阶段),并将其组合成应用需要的正确形式
cl <- makeCluster(
  type="SOCK",
  c("localhost", "localhost")
)
cl
socket cluster with 2 nodes on host ‘localhost’

建立相关的邻接矩阵

testm <- matrix(
  sample(
    0:1,
    16,
    replace=TRUE
  ),
  nrow=4
)
testm
     [,1] [,2] [,3] [,4]
[1,]    0    0    0    1
[2,]    0    0    0    0
[3,]    1    0    1    1
[4,]    1    1    1    0

在创建的集群上针对邻接矩阵运算代码

mutlinks(cl, testm)
[1] 0.5

借助于 C

扩展案例:利用 OpenMP 解决共同外链问题

注:以下代码在 Ubuntu 20.04 上运行

#include <omp.h>
#include <R.h>

int tot;

int procpairs(int i, int *m, int n) {
  int j, k, sum = 0;
  for(int j = i + 1; j < n; j++) {
    for(int k = 0; k < n; k++) {
      sum += m[n*k + i] * m[n*k + j];
    }
  }
  return sum;
}

void mutlinks(int *m, int *n, double *mlmean) {
  int nval = *n;
  tot = 0;
  #pragma omp parallel
  {
    int mysum = 0;
    int me = omp_get_thread_num();
    int nth = omp_get_num_threads();
    for(int i=me; i < nval; i += nth) {
      mysum += procpairs(i, m, nval);
    }
    #pragma omp atomic
    tot += mysum;
  }
  int divisor = nval * (nval - 1)/2;
  *mlmean = ((float) tot) / divisor;
}

运行 OpenMP 代码

R CMD SHLIB romp.c -fopenmp
gcc -I"/opt/R/4.0.3/lib/R/include" -DNDEBUG   -I/usr/local/include   -fpic  -g -O2  -c romp.c -o romp.o
gcc -shared -L/opt/R/4.0.3/lib/R/lib -L/usr/local/lib -o romp.so romp.o -fopenmp -L/opt/R/4.0.3/lib/R/lib -lR
dyn.load("romp.so")
Sys.setenv(OMP_NUM_THREADS=4)
n <- 1000
m <- matrix(sample(0:1, n^2, replace=TRUE), nrow=n)
system.time(z<-.C("mutlinks", as.integer(m), as.integer(n), result=double(1)))
   user  system elapsed
  1.428   0.010   1.439
z$result
[1] 250.5263

普遍的性能考虑

开销的主要来源

共享内存的机器

内存访问

缓存

潜伏期 (latency),尤其是主机和设备之间的传输

网络计算机群系统

网络传输

snow 中传输前将对象转为字符串 (序列化?)

简单并行程序,以及那些不简单的

“简单并行” embarrassingly parallel

静态和动态任务分配

静态任务分配

for (i = me; i < nval; i += nth) {
  mysum += procpairs(i, m, nval);
}

动态任务分配

int nexti = 0;
// ...skip...
for( ; myi < n;) {
  #pragma omp critical
  {
    nexti += 1;
    myi = nexti;
  }
  if (myi <n) {
    mysum += procpairs(myi, m, nval)
    // ...skip...
  }
}
// ...skip..

注意:因为有 critical 指令,动态分配可能比静态分配的版本要慢得多

软件炼金术:将一般的问题转化为简单并行问题

利用统计的性质,可以将某些非简单并行问题转化为简单并行问题

共同外链问题,w 个 worker 对链接矩阵 m 进行操作:

  • 将 m 的行分成 w 个块
  • 让每个 worker 计算对应的小块中所有定点配对的平均外链数
  • 对所有 worker 返回的结果取平均值

调试 R 语言并行计算的代码

非常困难

调试在 worker 上运行的函数

尝试使用 message() 函数,或者使用 cat() 将信息写入到文件中

参考

学习 R 语言系列文章

快速入门

向量

矩阵和数组

列表

数据框

因子和表

编程结构

数学运算与模拟

面向对象编程

输入与输出

字符串操作

基础绘图

性能提升——速度和内存

R与其他语言的接口

本文代码请访问如下项目:

https://github.com/perillaroc/the-art-of-r-programming