NWPC业务系统笔记:并行检查数据

目录

之前的一篇文章《NWPC业务系统笔记:检查输出数据》中详细介绍了目前NWPC数值预报业务系统检查输出数据的方法。

其中提到目前数据有两种检查方式:

  • 多个作业并发检测
  • 单个作业循环检测

两种方式各有利弊,详细分析查看该文档。

本文介绍一种使用单个作业并行检测数据文件的方法,综合上面两种方式的优点,寻找效率和资源占用的平衡。

原理

单任务并行检测数据文件的原理如下图所示。

主程序根据预报时效列表生成一系列的goroutine。

每个goroutine会循环查看某个时效的文件是否存在,如果存在会继续查看该文件大小是否变化。 如果文件大小没有变化,则向主程序的通道发送一条消息。 如果循环超过一定次数,则会发送未找到的消息。

主程序会循环从管道中接收消息,每读取一条消息,就会根据用户提供的命令模板生成对应时效的任务并执行。 如果接收到未找到文件的消息,程序则直接出错。

为了降低系统负载,可以设置设置延迟启动时间,每次等待一定时间间隔后再开始检测下一时效。

实现

主程序创建一个通道,接收子程序发送的消息:

type CheckResult struct {
  ForecastTime time.Duration
  FilePath     string
  Error        error
}

ch := make(chan CheckResult)

主程序为每个预报时效创建子程序,每个子程序会休眠指定时间再启动文件检测。

for index, oneTime := range forecastTimeList {
  go func(currentIndex int, forecastTime time.Duration) {
    sleepTime := delayTime * time.Duration(currentIndex)
    log.WithFields(log.Fields{
      "forecast_hour": forecastTime.Hours(),
    }).Infof("sleeping before check...%v", sleepTime)
    time.Sleep(sleepTime)

    log.WithFields(log.Fields{
      "forecast_hour": forecastTime.Hours(),
    }).Infof("checking begin...")
    checkForOneTime(ch, config, levels, forecastTime, checkDuration)
  }(index, oneTime)
}

主程序循环等待子程序发送的检查结果,如果找到文件则执行命令,如果没有找到文件则直接出错。

for _ = range forecastTimeList {
  result := <-ch
  if result.Error != nil {
    log.WithFields(log.Fields{
      "forecast_hour": int(result.ForecastTime.Hours()),
    }).Fatalf("check failed: %v", result.Error)
  } else {
    log.WithFields(log.Fields{
      "forecast_hour": int(result.ForecastTime.Hours()),
    }).Infof("file is available, run command...")

    if executeCommand == "" {
      continue
    }

    err = runCommand(commandTemplate, startTime, result.ForecastTime, result.FilePath)
    if err != nil {
    log.WithFields(log.Fields{
        "forecast_hour": int(result.ForecastTime.Hours()),
      }).Fatalf("run command failed: %v", err)
    } else {
      log.WithFields(log.Fields{
        "forecast_hour": int(result.ForecastTime.Hours()),
      }).Infof("run command success")
    }
  }
}

参考

程序源码参看nwpc-oper/nwpc-data-client