# wrapper around condor_submit_dag, activates basic throttles
# This file or a portion of this file is licensed under the terms of
# the Globus Toolkit Public License, found in file GTPL, or at
# http://www.globus.org/toolkit/download/license.html. This notice must
# appear in redistributions of this file, with or without modification.
# Redistributions of this Software, with or without modification, must
# reproduce the GTPL in: (1) the Software, or (2) the Documentation or
# some other similar material which is provided with the Software (if
# any).
# Copyright 1999-2004 University of Chicago and The University of
# Southern California. All rights reserved.
# Author: Jens-S. Vöckler voeckler@cs.uchicago.edu
# Author: Gaurang Mehta gmehta at isi dot edu
# Revision : $Revision$
use 5.006;
use strict;
use File::Spec;
use File::Copy;
use File::Basename qw(basename dirname);
use Getopt::Long qw(:config bundling no_ignore_case);
use Data::Dumper;
my $pegasus_config = File::Spec->catfile( dirname($0), 'pegasus-config' );
eval `$pegasus_config --perl-dump`;
die("Unable to eval pegasus-config output. $@") if $@;
# load our own local modules, see PERL5LIB settings
use Pegasus::Properties; # parses -Dprop=val from @ARGV
use Pegasus::Common;
# some reasonable defaults
my $maxpre = 20;
my $maxpost = 20;
my $maxjobs = 0;
my $maxidle = 0;
my $dagman;
my $submit=1;
my $notify = 'NEVER';
my $verbose;
my $conffile;
my %props = ();
my $grid=0; #Grid checks enabled
$main::DEBUG = 0; # for now
$main::revision = 'unknown';
$_ = '$Revision$'; # don't edit, automatically updated by CVS
$main::revision=$1 if /Revision:\s+([0-9.]+)/o;
sub myversion() {
my $version = version();
print "Pegasus $version, @{[basename($0)]} $main::revision\n";
exit 0;
sub usage(;$) {
my $msg = shift;
print "ERROR: $msg\n" if defined $msg && lc($msg) ne 'help';
print << "EOF";
Usage: @{[basename($0)]} [-Dprops] [options] dagfile
-Dprop=val Commandline overwrite for properties, must be initial args!
-c|--conf fn Read properties from given filename instead of rundir.
-d|--debug lv Initializes the level lv of verbosity, default $main::DEBUG
-e|--dagman fn Specify an alternative dagman binary to use.
-P|--maxpre N Maximum number of pre-scripts to run, default $maxpre
-p|--maxpost N Maximum number of post-scripts to run, default $maxpost
-j|--maxjobs N Maximum number of jobs to submit to Condor, default $maxjobs
-i|--maxidle N Maximum number of idle jobs, default $maxidle
-n|--notify x When to notify: Never, Error, Complete; default $notify
-v|--verbose Enter DAGMan verbose mode, default is not
-V|--version Print version number and exit.
--grid | --nogrid Enable checks for grid proxy and GLOBUS LOCATION (Default is enabled)
A maximum number of 0 means unlimited.
sub proxy_duration {
# purpose: determine remaining time on grid user proxy
# returns: undef in case of error, or remaining time.
my $gpi = File::Spec->catfile( $ENV{'GLOBUS_LOCATION'}, 'bin',
'grid-proxy-info' );
die "ERROR: Unable to find GLOBUS_LOCATION, please check your setup\n"
die "ERROR: Unable to execute grid-proxy-info\n"
unless -x $gpi;
my $left = 0;
chomp($left=`$gpi -timeleft`);
$? == 0 ? $left + 0 : undef;
sub salvage_logfile($) {
# purpose: salvage Condor common log file from truncation
# paramtr: $dagfn (IN): Name of dag filename
# returns: -
my $dagfn = shift;
my $result = undef;
if ( open( DAG, "<$dagfn" ) ) {
# read to to figure out submit files
my @x;
my %submit = ();
while ( <DAG> ) {
next unless /^\s*job/i;
s/[\r\n]+$//; # safe chomp
@x = split;
$submit{$x[1]} = $x[2]; # dagjobid -> subfn
close DAG;
if ( $main::DEBUG > 2 ) {
print STDERR "# found the following associations:\n";
local $Data::Dumper::Indent = 1;
local $Data::Dumper::Pad = "# ";
print STDERR Data::Dumper->Dump( [\%submit], [qw(config)] );
# read two submit files to figure out condor common log file
foreach my $subfn ( values %submit ) {
if ( open( SUB, "<$subfn" ) ) {
my $logfile = undef;
while ( <SUB> ) {
next unless /^log(=|\s)/i;
s/[\r\n]+$//; # safe chomp
@x = split /\s*=\s*/, $_, 2;
$logfile = ( substr( $x[1], 0, 1 ) =~ /[''""]/ ?
substr( $x[1], 1, -1 ) : $x[1] );
close SUB;
print STDERR "# $subfn points to $logfile\n"
if ( $main::DEBUG > 1 );
if ( ! defined $result ) {
$result = $logfile;
} else {
last if $result eq $logfile;
warn "# Using distinct, different log files, skipping preservation.\n";
return undef;
} else {
warn "Unable to read sub file $subfn: $!\n";
# try to preserve log file
if ( defined $result && -s $result ) {
my $newfn;
print STDERR "# log $result exists, rescuing from DAGMan.\n"
if $main::DEBUG;
for ( my $i=0; $i<1000; ++$i ) {
$newfn = sprintf "%s.%03d", $result, $i;
if ( open( LOG, "<$newfn" ) ) {
# file exists
close LOG;
} else {
# file does not exist, use that
my $newresult=$result;
#check if the file is a smylink then dereference it.
if ( -l $result ) {
print STDOUT "Rescued $result as $newfn\n"
if copy( $newresult, $newfn ) or warn "Could not rescue the log file $newresult to $newfn\n $! \nTrying to continue\n";
} else {
print STDERR "# log $result does not yet exist (good)\n"
if ( $main::DEBUG );
} else {
die "ERROR: Unable to read dag file $dagfn: $!\n";
GetOptions( "debug|d=i" => \$main::DEBUG,
"maxpre|P=i" => \$maxpre,
"maxpost|p=i" => \$maxpost,
'dagman|e=s' => \$dagman,
"submit!" => \$submit,
'conf|c=s' => \$conffile,
"maxjob|maxjobs|j=i" => \$maxjobs,
"maxidle|i=i" => \$maxidle,
"notify|n=s" => \$notify,
"version|V" => \&myversion,
"verbose|v" => \$verbose,
"help|h|?" => \&usage );
#check grid stuff only if $grid enabled
# check Globus proxy lifetime?
my $left = proxy_duration() ||
die "ERROR: Problems with grid-proxy-info. Check your user proxy\n";
if ( $left <= 0 ) {
die "ERROR: Your grid user proxy has expired, please refresh now.\n";
} elsif ( $left < 7200 ) {
warn "Warning: There is little time remaining on your proxy. You need to refresh soon!\n";
my $dag = shift || usage("Need the name of a .dag file\n");
my $c_s_d = find_exec( 'condor_submit_dag' ) ||
die "Unable to locate condor_submit_dag\n";
my $c_s = find_exec('condor_submit') || die "Unable to locate condor_submit\n";
my $run=dirname($dag);
my %config = slurp_braindb( $run ) or die "ERROR: open braindb: $!\n";
# pre-condition: The planner writes all properties per workflow into the DAG dir.
my $props = Pegasus::Properties->new( $conffile, File::Spec->catfile($run,$config{properties} ));
# set true defaults from properties
$maxpre = $props->property('dagman.maxpre') || $maxpre;
$maxpost = $props->property('dagman.maxpost') || $maxpost;
$maxjobs = $props->property('dagman.maxjobs') || $maxjobs;
$maxidle = $props->property('dagman.maxidle') || $maxidle;
$notify = $props->property('dagman.notify') || $notify;
$verbose = 1 if lc($props->property('dagman.verbose')) =~ /(true|on|1)/ || $verbose;
# construct commandline
my @arg = ( $c_s_d );
#push( @arg, '-dagman', $dagman ) if $dagman;
push( @arg, '-MaxPre', $maxpre ) if $maxpre > 0;
push( @arg, '-MaxPost', $maxpost ) if $maxpost > 0;
push( @arg, '-maxjobs', $maxjobs ) if $maxjobs > 0;
push( @arg, '-maxidle', $maxidle ) if $maxidle > 0;
push( @arg, '-notification', $notify );
push( @arg, '-verbose' ) if $verbose;
push( @arg, '-append', 'executable='.$dagman ) if $dagman;
push( @arg, '-append', '+pegasus_wf_uuid="'.$config{'wf_uuid'}.'"' );
push( @arg, '-append', '+pegasus_root_wf_uuid="'.$config{'root_wf_uuid'}.'"' );
push( @arg, '-append', '+pegasus_wf_name="'.$config{'pegasus_wf_name'}.'"' );
push( @arg, '-append', '+pegasus_wf_time="'.$config{timestamp}.'"' );
push( @arg, '-append', '+pegasus_version="'.$config{'planner_version'}.'"' );
push( @arg, '-append', '+pegasus_job_class=11' );
push( @arg, '-append', '+pegasus_cluster_size=1' );
push( @arg, '-append', '+pegasus_site="local"' );
push( @arg, '-append', '+pegasus_wf_xformation="pegasus::dagman"' );
#push( @arg, '-no_submit') if $submit==0;
push( @arg, $dag );
print STDERR "# @arg\n" if $main::DEBUG;
#my $csdresult=`@arg`;
exec { $arg[0] } @arg or die "Cannot execute @arg: $! \n";
exit 127;