/usr/bin/pegasus-dagman is in pegasus-wms 4.4.0+dfsg-7.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 | #!/usr/bin/env python
"""
pegasus-dagman
This program is to be run as a replacement for condor_dagman inside
of a submit file. The dag can be submitted by running the command
condor_submit_dag -dagman /path/to/pegasus-dagman my.dag
Usage: pegasus-dagman [options]
"""
##
# Copyright 2007-2010 University Of Southern California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# Author : gmehta at isi dot edu
# Revision : $Revision$
__author__ = "Gaurang Mehta"
__author__ = "Mats Rynge"
import os, sys, signal, subprocess
import logging
import time
import math
import shutil
def find_prog(prog,dir=[]):
def is_prog(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, fname = os.path.split(prog)
if fpath:
if is_prog(prog):
return prog
else:
for path in dir+os.environ["PATH"].split(os.pathsep):
exe_file = os.path.join(path, prog)
if is_prog(exe_file):
return exe_file
return None
# Use pegasus-config to find our lib path
bin_dir = os.path.normpath(os.path.join(os.path.dirname(sys.argv[0])))
pegasus_config = find_prog("pegasus-config",[bin_dir])
lib_dir = subprocess.Popen([pegasus_config,"--noeoln","--python"], stdout=subprocess.PIPE, shell=False).communicate()[0]
lib_ext_dir = subprocess.Popen([pegasus_config,"--noeoln","--python-externals"], stdout=subprocess.PIPE, shell=False).communicate()[0]
print "Pegasus LIB %s" % lib_dir
print "Pegasus LIB %s" % lib_ext_dir
print "Pegasus BIN_DIR %s" % bin_dir
print "Pegasus DAGMAN is %s" % sys.argv[0]
# Insert this directory in our search path
os.sys.path.insert(0, lib_ext_dir)
os.sys.path.insert(0, lib_dir)
import Pegasus.common
from Pegasus.tools import utils
# logger is setup in common
logger = logging.getLogger()
logger.setLevel(logging.INFO)
SLEEP_TIME = 15
DIED_TOO_QUICKLY_TIME = 120
MONITORD_KILL_TIME = 100 #the number of seconds after pegasus-remove is called, that monitord is sent the kill signal
dagman = None
monitord = None
monitord_last_start = 0
monitord_next_start = 0
monitord_current_restarts = 0
monitord_shutdown_mode = False
monitord_shutdown_time = 0
def dagman_launch(dagman_bin,arguments=[]):
'''Launches the condor_dagman program with all
the arguments passed to pegasus-dagman'''
if dagman_bin != None :
arguments.insert(0, "condor_scheduniv_exec."+os.getenv("CONDOR_ID"))
try :
dagman_proc = subprocess.Popen(arguments,
stdout=sys.stdout,
stderr=sys.stderr,
executable=dagman_bin)
logger.info("Launched Dagman with Pid %d" % dagman_proc.pid)
except OSError, err :
logger.error("Could not launch Dagman.", err)
sys.exit(1)
else :
logger.error("Condor Dagman not found")
sys.exit(127)
return dagman_proc
def monitord_launch(monitord_bin,arguments=[]):
'''Launches Monitord in condor foreground mode'''
if monitord_bin != None :
try :
# Rotate log file, if it exists
# PM-688
logfile="monitord.log"
utils.rotate_log_file(logfile)
#we have the right name of the log file
log = open(logfile, 'a')
monitord_proc = subprocess.Popen(
[monitord_bin,
"-N",
os.getenv('_CONDOR_DAGMAN_LOG')],
stdout=log,
stderr=subprocess.STDOUT)
logger.info("Launched Monitord with Pid %d" % monitord_proc.pid)
return monitord_proc
except OSError, err :
logger.error("Could not launch Monitord.", err)
else :
logger.error("pegausus-monitord not found")
return None
def is_dagman_copy_to_spool():
'''Checks using condor_config_val if dagman_copy_to_spool is set
then copy condor_dagman to the current dir "bin_dir"
'''
condor_config_val = find_prog("condor_config_val")
copy_to_spool = subprocess.Popen([condor_config_val,"DAGMAN_COPY_TO_SPOOL"], stdout=subprocess.PIPE, shell=False).communicate()[0]
logger.info("DAGMAN_COPY_TO_SPOOL is set to %s" % copy_to_spool)
if copy_to_spool.lower().strip() == "true":
return True
else :
return False
def sighandler(signum, frame):
''' Signal handler to catch and pass SIGTERM, SIGABRT, SIGUSR1, SIGTERM '''
# global dagman, monitord
logger.info( "pegasus-dagman caught SIGNAL %s" %signum)
if dagman != None :
os.kill(dagman.pid, signum)
if monitord != None:
# PM-767 when pegasus-remove is called, internally condor_rm is called
# that sends a SIGUSR1 to pegasus-dagman.
# we pass that signal to condor_dagman. But for monitord, we don't, as
# we want monitord to gracefully exit after reaching the end of dagman
# log file, so that the stampede database records workflow failed.
if signum == signal.SIGUSR1 :
signum = signal.SIGINT
global monitord_shutdown_mode, monitord_shutdown_time
monitord_shutdown_mode = True
monitord_shutdown_time = time.time()
else:
# All signals other than SIGUSR1 are passed as is
logger.info( "pegasus-dagman sent signal %s to monitord" %signum)
os.kill(monitord.pid, signum)
#-- main--------------------------------------------------------------
if __name__ == "__main__":
os.setpgid(0, 0)
signal.signal(signal.SIGTERM, sighandler)
signal.signal(signal.SIGINT, sighandler)
signal.signal(signal.SIGABRT, sighandler)
signal.signal(signal.SIGUSR1, sighandler)
signal.signal(signal.SIGUSR2, sighandler)
copy_to_spool = is_dagman_copy_to_spool()
# Find dagman Binary
dagman_bin = find_prog("condor_dagman")
if dagman_bin != None :
# If copy_to_spool is set copy dagman binary to dag submit directory
if copy_to_spool :
old_dagman_bin=dagman_bin
dagman_bin=os.path.join(os.getcwd(),"condor_scheduniv_exec."+os.getenv("CONDOR_ID"))
shutil.copy2(old_dagman_bin,dagman_bin)
logger.info("Copied condor_dagman from %s to %s" % (old_dagman_bin, dagman_bin))
# Launch DAGMAN
dagman = dagman_launch(dagman_bin,sys.argv[1:])
# Find monitord Binary
monitord_bin = find_prog("pegasus-monitord",[bin_dir])
# Launch Monitord
monitord = monitord_launch(monitord_bin)
dagman.poll()
monitord.poll()
while monitord.returncode == None or dagman.returncode == None :
if dagman.returncode == None and monitord.returncode != None :
# monitord is not running
t = time.time()
if monitord_next_start == 0:
logger.error("monitord is not running")
# did the process die too quickly?
if t - monitord_last_start < DIED_TOO_QUICKLY_TIME:
monitord_current_restarts += 1
else:
monitord_current_restarts = 0
# backoff with upper limit
backoff = min(math.exp(monitord_current_restarts) * 10, 3600)
logger.info("next monitord launch scheduled in about %d seconds" % (backoff))
monitord_next_start = t + backoff - 1
# time to restart yet?
if monitord_next_start <= t:
monitord_next_start = 0
monitord_last_start = t
monitord = monitord_launch(monitord_bin)
#PM-767 if in shutdown mode, check to see if we need to kill monitord
if monitord_shutdown_mode:
t = time.time();
if t - monitord_shutdown_time > MONITORD_KILL_TIME:
logger.info( "monitord shudown time expired. Sending SIGINT to process %d" %monitord.pid)
os.kill(monitord.pid, signal.SIGINT)
# sleep in between polls
time.sleep(SLEEP_TIME)
monitord.poll()
dagman.poll()
# Dagman and Monitord have exited. Lets exit pegasus-dagman with
#a merged returncode
logger.info("Dagman exited with code %d" % dagman.returncode)
logger.info("Monitord exited with code %d" % monitord.returncode)
if copy_to_spool:
logger.info("Removing copied condor_dagman from submit directory %s" % dagman_bin)
os.remove(dagman_bin);
sys.exit(dagman.returncode & monitord.returncode)
|