GRAPES MESO模式学习笔记08-2 —— SMS中使用Python脚本

目录

现状

SMS通过SMSCMD变量设置的命令提交作业,默认是直接运行作业脚本

%SMSJOB% 1> %SMSJOBOUT% 2>&1 &

目前我们使用SMS执行作业有两种方式:
1. 直接运行
2. 通过loadleveler提交
我们使用两个脚本实现上述功能
1. 直接运行
SMSCMD设置为

lsubmit %SMSJOB% %SMSJOBOUT%

脚本lsbumit为

#!/bin/ksh
set -ex
jobname=$1
outname=$2
dir=$(dirname $outname)
[ ! -d $dir ] && mkdir -p $dir
$jobname 1> $outname 2>&1 &

先创建SMSJOBOUT的目录(SMS不会自动创建这个目录?),在执行默认的SMSCMD命令。
2. LoadLeveler提交
SMSCMD设置为

llsubmit2 %SMSJOB% %SMSNAME%

llsubmit2脚本为

#!/bin/ksh
# llsubmit2 jobname taskname
set -ex
SUBMITLOG=$WORKDIR/sublog/submit.log
test -d $WORKDIR/sublog ||mkdir -p $WORKDIR/sublog
export SUBMITLOG
if [ $# -ne 2 ] ; then
  echo ...
  exit 1
fi
jobname=$1
taskname=$2
nameofsms=nwpc_$USER
# 提交作业并获取作业号
name=$(llsubmit $jobname 2>>$SUBMITLOG)
rid=$(echo $name | cut -d '"' -f 2)
# 将作业号赋给SMSRID,用于取消作业
cdp << EOF
  login $nameofsms $USER 1
  alter -v $taskname SMSRID $rid
  exit
EOF

该脚本主要将作业号赋给SMSRID变量,便于之后使用llcancel命令取消作业。
SMSKILL变量设置为

llcancel2 %SMSRID% %SMSNAME%

llcancel2脚本为

#!/bin/ksh
# llcancel2 SMSRID taskname  ## host
set -ex
if [ $# -ne 2 ] ; then
  echo ...
  exit 1
fi
smsrid=$1
taskname=$2
nameofsms=nwpc_$USER
# 取消loadleveler作业
llcancel $smsrid
# 修改作业状态为aborted
cdp << EOF
  login $nameofsms $USER 1
  force aborted $taskname
  exit
EOF

目前我们使用Shell脚本作为SMS作业,Shell脚本可以直接运行,也可以在loadleveler中提交。同样Python脚本也可以在命令行直接运行,所以SMS应该支持Python脚本。

SMS作业脚本的简单说明

使用Python改写SMS作业脚本前,先简单说明下SMS中作业的提交机制。
SMS首先对作业脚本(.sms)进行预处理,生成任务文件(.job)。该步骤识别预处理宏语句(默认以%开头,可以自定义),并进行文本替换。例如引入头文件{shell}%include {/shell},将该语句用head.h文件的内容替换;变量替换{shell}%SMSNAME%{/shell},将该字符串替换为SMSNAME变量的值。
再根据SMSCMD设置的命令,运行任务文件。
任务通过RPC方式与SMS服务器交互。脚本调用{shell}smsinit{/shell},{shell}smscomplete{/shell},{shell}smsabort{/shell}等程序,通知服务器自身状态的变化。
通常,任务的基本流程相同,所以对以上程序的调用放在起始头文件head.h和结尾头文件tail.h中。系统开发时一般不需要考虑与SMS服务器交互的细节,直接使用包含头文件即可。
但因为需要用Python改写Shell脚本,原有的Shell头文件无法使用,必须将其转换为Python文件,所以得搞清除这两个头文件到底有什么用。

SMS作业头文件

请参考ecFlow的官网:《Understanding Includes

通用的head.h

#!/bin/ksh
set -e # stop the shell on first error
set -u # fail when using an undefined variable
set -x # echo script lines as they are executed
date
# Defines the three variables that are needed for any
# communication with SMS
export SMS_PROG=%SMS_PROG%  # SMS Remote Procedure Call number
export SMSNODE=%SMSNODE%    # The name sms that issued this task
export SMSNAME=%SMSNAME%    # The name of this current task
export SMSPASS=%SMSPASS%    # A unique password
export SMSTRYNO=%SMSTRYNO%  # Current try number of the task
PATH=$PATH:$HOME/bin
# Tell SMS we have stated
# The SMS variable SMSRID will be set to parameter of smsinit
# Here we give the current PID.
RID=$( echo ${LOADL_JOB_NAME:-0.0} | cut -d "." -f2 )
if [ $RID -eq 0 ] ; then
  RID=$$
fi
export SMSRID=$RID
smsinit $RID
# Defined a error hanlder
ERROR() {
    set +e        # Clear -e flag, so we don't fail
    smsabort      # Notify SMS that something went wrong
    trap 0        # Remove the trap
    exit 0        # End the script
}
# Trap any calls to exit and errors caught by the -e flag
trap ERROR 0
# Trap any signal that may cause the script to fail
#trap ' echo "Killed by a signal"; ERROR ; ' 1 2 3 4 5 6 7 8 10 12 13 15
#trap '{ echo "Killed by a signal"; ERROR ; }' 1 2 3 4 5 6 7 8 10 12 13 15
trap '{ echo "Killed by a signal";trap 0;ERROR; }' 1 2 3 4 5 6 7 8 10 12 13 15

脚本做了三项工作:
1. 为与服务器的通信准备环境变量
设置RPC需要的各项环境变量。
2. 初始化任务
调用smsinit通知sms服务器任务开始执行,并设置SMSRID。直接运行的任务使用pid号,LoadLeveler提交则用LoadLeveler作业号。
3. 设置信号捕获
当脚本非正常结束时,执行函数ERROR,调用smsabort通知服务器任务运行中止。
通用的tail.h

date
smscomplete  # Notify SMS of a normal end
trap 0       # Remove all traps
exit 0       # End the shell

脚本主要执行一项工作:
调用smscomplete通知服务器任务运行结束。
Python版的头文件也需要实现上述功能。

Python版简单头文件

参考SMS升级版——ecFlow——官网上的两篇文章
ecFlow@ECMWF
该文章使用子程序调用方式与服务器通信。
Native Python Tasks
该文章利用ecFlow的Python扩展与服务器的通信。

说明

与服务器通讯
SMS中只能选择第一篇文章的方法,通过{python}os.system{/python}调用子程序。
宏标识
另外,SMS预处理器的宏字符为{shell}%{/shell},而Python中{shell}%{/shell}是操作符,所以需要重新设置宏字符。第二篇文章使用{shell}${/shell}作为宏字符。
提交命令
尽管Python脚本可以在命令行直接运行,但SMS中无法使用默认SMSCMD设置提交作业,需要修改SMSCMD为

python $SMSJOB$ > $SMSJOBOUT$ 2>&1 &

header.py

#############
# START of header.py
#############
import os
import sys
import signal
SMS_PROG="$SMS_PROG$"
SMSNODE="$SMSNODE$"    # The name sms that issued this task
SMSNAME="$SMSNAME$"    # The name of this current task
SMSPASS="$SMSPASS$"    # A unique password
SMSTRYNO="$SMSTRYNO$"
def SigHandler(signum, frame):
    print "caught signal " + signum
    xabort()
    sys.exit(0);
    return
print SMS_PROG
# set -eux ?
# trap 0 ?
# time stamp per executed line
import atexit
early_exit = True
@atexit.register
def goodbye():
    if early_exit:
        print "too early"
        xabort()
    else:
        xcomplete()
os.environ['SMS_PROG'] = "$SMS_PROG$"
os.environ['SMSNAME'] = "$SMSNAME$"
os.environ['SMSNODE'] = "$SMSNODE$"
os.environ['SMSPASS'] = "$SMSPASS$"
os.environ['SMSTRYNO'] = "$SMSTRYNO$"
def xinit():
    print os.getpid()
    os.system('smsinit ' + str(os.getpid()))
def xabort():
    os.system('smsabort')
def xcomplete():
    os.system('smscomplete')
def xmeter(name, step):
    os.system("smsmeter " + name + " " + str(step))
def xevent(name):
    os.system("smsevent " + name)
def xlabel(name, msg):
    os.system("smslabel " + name + " '%s'" % msg)
xinit()
signal.signal(signal.SIGINT,  SigHandler)
signal.signal(signal.SIGHUP,  SigHandler)
signal.signal(signal.SIGQUIT, SigHandler)
signal.signal(signal.SIGILL,  SigHandler)
signal.signal(signal.SIGTRAP, SigHandler)
signal.signal(signal.SIGIOT,  SigHandler)
signal.signal(signal.SIGBUS,  SigHandler)
signal.signal(signal.SIGFPE,  SigHandler)
signal.signal(signal.SIGUSR1, SigHandler)
signal.signal(signal.SIGUSR2, SigHandler)
signal.signal(signal.SIGPIPE, SigHandler)
signal.signal(signal.SIGTERM, SigHandler)
signal.signal(signal.SIGXCPU, SigHandler)
signal.signal(signal.SIGPWR,  SigHandler)
#############
# END of header.py
#############

环境设置
使用{python}os.environ{/python}设置环境变量
信号捕捉
使用{python}signal.signal{/python}设置信号捕捉。
使用atexit模块设置程序退出时调用的参数。
初始化任务
使用{python}os.system{/python}调用{shell}smsinit{/shell}初始化任务。

tail.py

#############
# START of tail.py
#############
os.system('smscomplete')
early_exit = False
#############
# END of tail.py
#############

结束任务
使用{python}os.system{/python}调用{shell}smscomplete{/shell}结束任务。

task定义

用作测试的task定义

task test1
      edit SMSMICRO '$'
      edit SMSCMD 'python $SMSJOB$ > $SMSJOBOUT$ 2>&1 &
      meter step 0 100 100
      label info ""
endtask

Python任务脚本

测试脚本test1.sms

#!/usr/bin/env python
$include
for step in range(0,101):
    print step
    xmeter("step", step)
else:
    print 'the loop is over'
xlabel("info", "news from pure python world")
$include

测试结果

任务运行成功,XCDP中显示如下结果:

更进一步:使用LoadLeveler提交SMS作业

业务任务都由LoadLeveler提交,上面的任务定义和头文件仅考虑直接运行的情况,下面加上针对LoadLeveler的特殊处理。

任务文件

task test1
     time 00:00
     edit SMSMICRO '$'
     edit SMSCMD 'llsubmit2 $SMSJOB$ $SMSNAME$'
     edit SMSKILL 'llcancel2 $SMSRID$ $SMSNAME$'
     edit SMSTRIES 1
     meter step 0 100 100
     label info ""
endtask

使用llsubmit2提交任务,使用llcancel2中止任务。

头文件(节选)

添加判断是否是LoadLeveler作业并设置SMSRID。

os.environ['SMS_PROG'] = "$SMS_PROG$"
os.environ['SMSNAME'] = "$SMSNAME$"
os.environ['SMSNODE'] = "$SMSNODE$"
os.environ['SMSPASS'] = "$SMSPASS$"
os.environ['SMSTRYNO'] = "$SMSTRYNO$"
RID = os.getpid()
if os.environ.has_key('LOADL_JOB_NAME'):
    RID = os.environ['LOADL_JOB_NAME']
os.environ['SMSRID'] = RID
def xinit():
    print os.getpid()
    os.system('smsinit ' + str(RID))
def xabort():
    os.system('smsabort')
def xcomplete():
    os.system('smscomplete')
def xmeter(name, step):
    os.system("smsmeter " + name + " " + str(step))
def xevent(name):
    os.system("smsevent " + name)
def xlabel(name, msg):
    os.system("smslabel " + name + " '%s'" % msg)
xinit()

由LOADL_JOB_NAME判断是否是LoadLeveler作业,如果是,则用作业号启动,否则使用进程号。

任务脚本(节选)

在开头加上LoadLeveler作业脚本即可。

#!/usr/bin/env python
# @ comment = GRAPES
# @ job_type = serial
# @ environment = COPY_ALL;PYTHONUNBUFFERED=1
# @ input = /dev/null
# @ output = $SMSJOBOUT$
# @ error = $SMSJOBOUT$.err
# @ initialdir = ./
## @ notify_user = wangdp@cma19n03
# @ class = normal
# @ notification = complete
# @ checkpoint = no
# @ restart = no
# @ queue
$include <header.py>

测试结果

可以提交作业,也可以中止作业。

总结

SMS完全支持Python脚本,可以使用Python脚本代替Shell脚本(如果真有这个必要的话)。