GRAPES MESO模式学习笔记08-2 —— SMS中使用Python脚本
现状
SMS通过SMSCMD变量设置的命令提交作业,默认是直接运行作业脚本
%SMSJOB% 1> %SMSJOBOUT% 2>&1 &
目前我们使用SMS执行作业有两种方式:
- 直接运行
- 通过loadleveler提交
我们使用两个脚本实现上述功能 - 直接运行
SMSCMD设置为
lsubmit %SMSJOB% %SMSJOBOUT%脚本lsbumit为
#!/bin/ksh
set -ex
jobname=$1
outname=$2
dir=$(dirname $outname)
[ ! -d $dir ] && mkdir -p $dir
$jobname 1> $outname 2>&1 &先创建SMSJOBOUT的目录(SMS不会自动创建这个目录?),在执行默认的SMSCMD命令。
2. LoadLeveler提交
SMSCMD设置为
llsubmit2 %SMSJOB% %SMSNAME%llsubmit2脚本为
#!/bin/ksh
# llsubmit2 jobname taskname
set -ex
SUBMITLOG=$WORKDIR/sublog/submit.log
test -d $WORKDIR/sublog ||mkdir -p $WORKDIR/sublog
export SUBMITLOG
if [ $# -ne 2 ] ; then
echo ...
exit 1
fi
jobname=$1
taskname=$2
nameofsms=nwpc_$USER
# 提交作业并获取作业号
name=$(llsubmit $jobname 2>>$SUBMITLOG)
rid=$(echo $name | cut -d '"' -f 2)
# 将作业号赋给SMSRID,用于取消作业
cdp << EOF
login $nameofsms $USER 1
alter -v $taskname SMSRID $rid
exit
EOF该脚本主要将作业号赋给SMSRID变量,便于之后使用llcancel命令取消作业。
SMSKILL变量设置为
llcancel2 %SMSRID% %SMSNAME%llcancel2脚本为
#!/bin/ksh
# llcancel2 SMSRID taskname ## host
set -ex
if [ $# -ne 2 ] ; then
echo ...
exit 1
fi
smsrid=$1
taskname=$2
nameofsms=nwpc_$USER
# 取消loadleveler作业
llcancel $smsrid
# 修改作业状态为aborted
cdp << EOF
login $nameofsms $USER 1
force aborted $taskname
exit
EOF目前我们使用Shell脚本作为SMS作业,Shell脚本可以直接运行,也可以在loadleveler中提交。同样Python脚本也可以在命令行直接运行,所以SMS应该支持Python脚本。
SMS作业脚本的简单说明
使用Python改写SMS作业脚本前,先简单说明下SMS中作业的提交机制。
SMS首先对作业脚本(.sms)进行预处理,生成任务文件(.job)。该步骤识别预处理宏语句(默认以%开头,可以自定义),并进行文本替换。例如引入头文件{shell}%include {/shell},将该语句用head.h文件的内容替换;变量替换{shell}%SMSNAME%{/shell},将该字符串替换为SMSNAME变量的值。
再根据SMSCMD设置的命令,运行任务文件。
任务通过RPC方式与SMS服务器交互。脚本调用{shell}smsinit{/shell},{shell}smscomplete{/shell},{shell}smsabort{/shell}等程序,通知服务器自身状态的变化。
通常,任务的基本流程相同,所以对以上程序的调用放在起始头文件head.h和结尾头文件tail.h中。系统开发时一般不需要考虑与SMS服务器交互的细节,直接使用包含头文件即可。
但因为需要用Python改写Shell脚本,原有的Shell头文件无法使用,必须将其转换为Python文件,所以得搞清除这两个头文件到底有什么用。
SMS作业头文件
请参考ecFlow的官网:《Understanding Includes》
通用的head.h
#!/bin/ksh
set -e # stop the shell on first error
set -u # fail when using an undefined variable
set -x # echo script lines as they are executed
date
# Defines the three variables that are needed for any
# communication with SMS
export SMS_PROG=%SMS_PROG% # SMS Remote Procedure Call number
export SMSNODE=%SMSNODE% # The name sms that issued this task
export SMSNAME=%SMSNAME% # The name of this current task
export SMSPASS=%SMSPASS% # A unique password
export SMSTRYNO=%SMSTRYNO% # Current try number of the task
PATH=$PATH:$HOME/bin
# Tell SMS we have stated
# The SMS variable SMSRID will be set to parameter of smsinit
# Here we give the current PID.
RID=$( echo ${LOADL_JOB_NAME:-0.0} | cut -d "." -f2 )
if [ $RID -eq 0 ] ; then
RID=$$
fi
export SMSRID=$RID
smsinit $RID
# Defined a error hanlder
ERROR() {
set +e # Clear -e flag, so we don't fail
smsabort # Notify SMS that something went wrong
trap 0 # Remove the trap
exit 0 # End the script
}
# Trap any calls to exit and errors caught by the -e flag
trap ERROR 0
# Trap any signal that may cause the script to fail
#trap ' echo "Killed by a signal"; ERROR ; ' 1 2 3 4 5 6 7 8 10 12 13 15
#trap '{ echo "Killed by a signal"; ERROR ; }' 1 2 3 4 5 6 7 8 10 12 13 15
trap '{ echo "Killed by a signal";trap 0;ERROR; }' 1 2 3 4 5 6 7 8 10 12 13 15脚本做了三项工作:
- 为与服务器的通信准备环境变量
设置RPC需要的各项环境变量。 - 初始化任务
调用smsinit通知sms服务器任务开始执行,并设置SMSRID。直接运行的任务使用pid号,LoadLeveler提交则用LoadLeveler作业号。 - 设置信号捕获
当脚本非正常结束时,执行函数ERROR,调用smsabort通知服务器任务运行中止。
通用的tail.h
date smscomplete # Notify SMS of a normal end trap 0 # Remove all traps exit 0 # End the shell
脚本主要执行一项工作:
调用smscomplete通知服务器任务运行结束。
Python版的头文件也需要实现上述功能。
Python版简单头文件
参考SMS升级版——ecFlow——官网上的两篇文章
ecFlow@ECMWF
该文章使用子程序调用方式与服务器通信。
Native Python Tasks
该文章利用ecFlow的Python扩展与服务器的通信。
说明
与服务器通讯
SMS中只能选择第一篇文章的方法,通过{python}os.system{/python}调用子程序。
宏标识
另外,SMS预处理器的宏字符为{shell}%{/shell},而Python中{shell}%{/shell}是操作符,所以需要重新设置宏字符。第二篇文章使用{shell}${/shell}作为宏字符。
提交命令
尽管Python脚本可以在命令行直接运行,但SMS中无法使用默认SMSCMD设置提交作业,需要修改SMSCMD为
python $SMSJOB$ > $SMSJOBOUT$ 2>&1 &
header.py
#############
# START of header.py
#############
import os
import sys
import signal
SMS_PROG="$SMS_PROG$"
SMSNODE="$SMSNODE$" # The name sms that issued this task
SMSNAME="$SMSNAME$" # The name of this current task
SMSPASS="$SMSPASS$" # A unique password
SMSTRYNO="$SMSTRYNO$"
def SigHandler(signum, frame):
print "caught signal " + signum
xabort()
sys.exit(0);
return
print SMS_PROG
# set -eux ?
# trap 0 ?
# time stamp per executed line
import atexit
early_exit = True
@atexit.register
def goodbye():
if early_exit:
print "too early"
xabort()
else:
xcomplete()
os.environ['SMS_PROG'] = "$SMS_PROG$"
os.environ['SMSNAME'] = "$SMSNAME$"
os.environ['SMSNODE'] = "$SMSNODE$"
os.environ['SMSPASS'] = "$SMSPASS$"
os.environ['SMSTRYNO'] = "$SMSTRYNO$"
def xinit():
print os.getpid()
os.system('smsinit ' + str(os.getpid()))
def xabort():
os.system('smsabort')
def xcomplete():
os.system('smscomplete')
def xmeter(name, step):
os.system("smsmeter " + name + " " + str(step))
def xevent(name):
os.system("smsevent " + name)
def xlabel(name, msg):
os.system("smslabel " + name + " '%s'" % msg)
xinit()
signal.signal(signal.SIGINT, SigHandler)
signal.signal(signal.SIGHUP, SigHandler)
signal.signal(signal.SIGQUIT, SigHandler)
signal.signal(signal.SIGILL, SigHandler)
signal.signal(signal.SIGTRAP, SigHandler)
signal.signal(signal.SIGIOT, SigHandler)
signal.signal(signal.SIGBUS, SigHandler)
signal.signal(signal.SIGFPE, SigHandler)
signal.signal(signal.SIGUSR1, SigHandler)
signal.signal(signal.SIGUSR2, SigHandler)
signal.signal(signal.SIGPIPE, SigHandler)
signal.signal(signal.SIGTERM, SigHandler)
signal.signal(signal.SIGXCPU, SigHandler)
signal.signal(signal.SIGPWR, SigHandler)
#############
# END of header.py
#############
环境设置
使用{python}os.environ{/python}设置环境变量
信号捕捉
使用{python}signal.signal{/python}设置信号捕捉。
使用atexit模块设置程序退出时调用的参数。
初始化任务
使用{python}os.system{/python}调用{shell}smsinit{/shell}初始化任务。
tail.py
#############
# START of tail.py
#############
os.system('smscomplete')
early_exit = False
#############
# END of tail.py
#############
结束任务
使用{python}os.system{/python}调用{shell}smscomplete{/shell}结束任务。
task定义
用作测试的task定义
task test1
edit SMSMICRO '$'
edit SMSCMD 'python $SMSJOB$ > $SMSJOBOUT$ 2>&1 &
meter step 0 100 100
label info ""
endtask
Python任务脚本
测试脚本test1.sms
#!/usr/bin/env python
$include
for step in range(0,101):
print step
xmeter("step", step)
else:
print 'the loop is over'
xlabel("info", "news from pure python world")
$include
测试结果
任务运行成功,XCDP中显示如下结果:
更进一步:使用LoadLeveler提交SMS作业
业务任务都由LoadLeveler提交,上面的任务定义和头文件仅考虑直接运行的情况,下面加上针对LoadLeveler的特殊处理。
任务文件
task test1
time 00:00
edit SMSMICRO '$'
edit SMSCMD 'llsubmit2 $SMSJOB$ $SMSNAME$'
edit SMSKILL 'llcancel2 $SMSRID$ $SMSNAME$'
edit SMSTRIES 1
meter step 0 100 100
label info ""
endtask使用llsubmit2提交任务,使用llcancel2中止任务。
头文件(节选)
添加判断是否是LoadLeveler作业并设置SMSRID。
os.environ['SMS_PROG'] = "$SMS_PROG$"
os.environ['SMSNAME'] = "$SMSNAME$"
os.environ['SMSNODE'] = "$SMSNODE$"
os.environ['SMSPASS'] = "$SMSPASS$"
os.environ['SMSTRYNO'] = "$SMSTRYNO$"
RID = os.getpid()
if os.environ.has_key('LOADL_JOB_NAME'):
RID = os.environ['LOADL_JOB_NAME']
os.environ['SMSRID'] = RID
def xinit():
print os.getpid()
os.system('smsinit ' + str(RID))
def xabort():
os.system('smsabort')
def xcomplete():
os.system('smscomplete')
def xmeter(name, step):
os.system("smsmeter " + name + " " + str(step))
def xevent(name):
os.system("smsevent " + name)
def xlabel(name, msg):
os.system("smslabel " + name + " '%s'" % msg)
xinit()由LOADL_JOB_NAME判断是否是LoadLeveler作业,如果是,则用作业号启动,否则使用进程号。
任务脚本(节选)
在开头加上LoadLeveler作业脚本即可。
#!/usr/bin/env python # @ comment = GRAPES # @ job_type = serial # @ environment = COPY_ALL;PYTHONUNBUFFERED=1 # @ input = /dev/null # @ output = $SMSJOBOUT$ # @ error = $SMSJOBOUT$.err # @ initialdir = ./ ## @ notify_user = wangdp@cma19n03 # @ class = normal # @ notification = complete # @ checkpoint = no # @ restart = no # @ queue $include <header.py>
测试结果
可以提交作业,也可以中止作业。
总结
SMS完全支持Python脚本,可以使用Python脚本代替Shell脚本(如果真有这个必要的话)。
