NWPC业务系统笔记:并行检查数据
目录
之前的一篇文章《NWPC业务系统笔记:检查输出数据》中详细介绍了目前NWPC数值预报业务系统检查输出数据的方法。
其中提到目前数据有两种检查方式:
- 多个作业并发检测
- 单个作业循环检测
两种方式各有利弊,详细分析查看该文档。
本文介绍一种使用单个作业并行检测数据文件的方法,综合上面两种方式的优点,寻找效率和资源占用的平衡。
原理
单任务并行检测数据文件的原理如下图所示。
主程序根据预报时效列表生成一系列的goroutine。
每个goroutine会循环查看某个时效的文件是否存在,如果存在会继续查看该文件大小是否变化。 如果文件大小没有变化,则向主程序的通道发送一条消息。 如果循环超过一定次数,则会发送未找到的消息。
主程序会循环从管道中接收消息,每读取一条消息,就会根据用户提供的命令模板生成对应时效的任务并执行。 如果接收到未找到文件的消息,程序则直接出错。
为了降低系统负载,可以设置设置延迟启动时间,每次等待一定时间间隔后再开始检测下一时效。
实现
主程序创建一个通道,接收子程序发送的消息:
type CheckResult struct {
ForecastTime time.Duration
FilePath string
Error error
}
ch := make(chan CheckResult)
主程序为每个预报时效创建子程序,每个子程序会休眠指定时间再启动文件检测。
for index, oneTime := range forecastTimeList {
go func(currentIndex int, forecastTime time.Duration) {
sleepTime := delayTime * time.Duration(currentIndex)
log.WithFields(log.Fields{
"forecast_hour": forecastTime.Hours(),
}).Infof("sleeping before check...%v", sleepTime)
time.Sleep(sleepTime)
log.WithFields(log.Fields{
"forecast_hour": forecastTime.Hours(),
}).Infof("checking begin...")
checkForOneTime(ch, config, levels, forecastTime, checkDuration)
}(index, oneTime)
}
主程序循环等待子程序发送的检查结果,如果找到文件则执行命令,如果没有找到文件则直接出错。
for _ = range forecastTimeList {
result := <-ch
if result.Error != nil {
log.WithFields(log.Fields{
"forecast_hour": int(result.ForecastTime.Hours()),
}).Fatalf("check failed: %v", result.Error)
} else {
log.WithFields(log.Fields{
"forecast_hour": int(result.ForecastTime.Hours()),
}).Infof("file is available, run command...")
if executeCommand == "" {
continue
}
err = runCommand(commandTemplate, startTime, result.ForecastTime, result.FilePath)
if err != nil {
log.WithFields(log.Fields{
"forecast_hour": int(result.ForecastTime.Hours()),
}).Fatalf("run command failed: %v", err)
} else {
log.WithFields(log.Fields{
"forecast_hour": int(result.ForecastTime.Hours()),
}).Infof("run command success")
}
}
}