View Javadoc

1   package fr.in2p3.jsaga.command;
2   
3   import org.apache.commons.cli.*;
4   import org.ogf.saga.error.BadParameterException;
5   import org.ogf.saga.error.SagaException;
6   import org.ogf.saga.job.*;
7   import org.ogf.saga.session.Session;
8   import org.ogf.saga.session.SessionFactory;
9   import org.ogf.saga.task.State;
10  import org.ogf.saga.url.URL;
11  import org.ogf.saga.url.URLFactory;
12  
13  import java.io.*;
14  import java.util.Iterator;
15  import java.util.Properties;
16  
17  /* ***************************************************
18  * *** Centre de Calcul de l'IN2P3 - Lyon (France) ***
19  * ***             http://cc.in2p3.fr/             ***
20  * ***************************************************
21  * File:   JobRun
22  * Author: Sylvain Reynaud (sreynaud@in2p3.fr)
23  * Date:   4 avr. 2007
24  * ***************************************************
25  * Description:                                      */
26  /**
27   * -r local://localhost -Executable job.sh -FileTransfer input>input,output<<output
28   */
29  public class JobRun extends AbstractCommand {
30      private static final String OPT_HELP = "h", LONGOPT_HELP = "help";
31      // required arguments
32      private static final String OPT_RESOURCE = "r", LONGOPT_RESOURCE = "resource";
33      // optional arguments
34      private static final String OPT_FILE = "f", LONGOPT_FILE = "file";
35      private static final String OPT_DESCRIPTION = "d", LONGOPT_DESCRIPTION = "description";
36      private static final String OPT_JOBID = "i", LONGOPT_JOBID = "jobid";
37      private static final String OPT_BATCH = "b", LONGOPT_BATCH = "batch";
38  
39      protected JobRun() {
40          super("jsaga-job-run", null, null, new GnuParser());
41      }
42  
43      public static void main(String[] args) throws Exception {
44          JobRun command = new JobRun();
45          CommandLine line = command.parse(args);
46          if (line.hasOption(OPT_HELP))
47          {
48              command.printHelpAndExit(null);
49          }
50          else
51          {
52              // get arguments
53              URL serviceURL = URLFactory.createURL(line.getOptionValue(OPT_RESOURCE));
54              String file = line.getOptionValue(OPT_FILE);
55  
56              // create the job description
57              Properties prop = new Properties();
58              if (file != null) {
59                  prop.load(new FileInputStream(file));
60              }
61              for (Iterator it=line.iterator(); it.hasNext(); ) {
62                  Option opt = (Option) it.next();
63                  if (opt.getValue() != null) {
64                      prop.setProperty(opt.getOpt(), opt.getValue());
65                  } else {
66                      prop.setProperty(opt.getOpt(), Boolean.toString(true));
67                  }
68              }
69              JobDescription desc = createJobDescription(prop);
70              boolean isStreamRedirected = prop.containsKey(JobDescription.INPUT) || prop.containsKey(JobDescription.OUTPUT) || prop.containsKey(JobDescription.ERROR);
71              if (!line.hasOption(OPT_BATCH) && !isStreamRedirected) {
72                  desc.setAttribute(JobDescription.INTERACTIVE, "true");
73              }
74  
75              // create the job
76              Session session = SessionFactory.createSession(true);
77              JobService service = JobFactory.createJobService(session, serviceURL);
78              final Job job = service.createJob(desc);
79  
80              if (line.hasOption(OPT_DESCRIPTION)) {
81                  // dump job description
82                  String nativeDesc = job.getAttribute("NativeJobDescription");
83                  System.out.println(nativeDesc);
84              } else {
85                  // submit
86                  job.run();
87  
88                  // print job identier
89                  if (line.hasOption(OPT_JOBID) || line.hasOption(OPT_BATCH)) {
90                      String jobId = job.getAttribute(Job.JOBID);
91                      System.out.println(jobId);
92                  }
93  
94                  // monitor
95                  if (! line.hasOption(OPT_BATCH)) {
96                      // add shutdown hook
97                      Thread hook = new Thread(){
98                          public void run() {
99                              // cancel the job
100                             try {
101                                 System.out.println("Canceling job: "+job.getAttribute(Job.JOBID));
102                                 job.cancel();
103                             } catch (SagaException e) {
104                                 e.printStackTrace();
105                             }
106                             // give it a change to display final job state
107                             try {sleep(1000);} catch(InterruptedException e){e.printStackTrace();}
108                         }
109                     };
110                     Runtime.getRuntime().addShutdownHook(hook);
111 
112                     // wait
113                     job.waitFor();
114 
115                     // display final state
116                     State state = job.getState();
117                     if (State.CANCELED.compareTo(state) == 0) {
118                         System.out.println("Job canceled.");
119                     } else {
120                         Runtime.getRuntime().removeShutdownHook(hook);
121                         if (State.DONE.compareTo(state) == 0) {
122                             try {
123                                 if ("true".equalsIgnoreCase(desc.getAttribute(JobDescription.INTERACTIVE))) {
124                                     copyStream(job.getStdout(), System.out);
125                                 } else {
126                                     System.out.println("Job done.");
127                                 }
128                             } catch(SagaException e) {
129                                 System.out.println("Job done.");
130                             }
131                         } else if (State.FAILED.compareTo(state) == 0) {
132                             try {
133                                 if ("true".equalsIgnoreCase(desc.getAttribute(JobDescription.INTERACTIVE))) {
134                                     copyStream(job.getStderr(), System.err);
135                                 }
136                                 String exitCode = job.getAttribute(Job.EXITCODE);
137                                 System.out.println("Job failed with exit code: "+exitCode);
138                             } catch(SagaException e) {
139                                 System.out.println("Job failed.");
140                                 job.rethrow();
141                             }
142                         } else {
143                             throw new Exception("Unexpected state: "+ state);
144                         }
145                     }
146                 }
147             }
148             System.exit(0);
149         }
150     }
151 
152     protected Options createOptions() {
153         Options opt = new Options();
154 
155         // command
156         opt.addOption(OptionBuilder.withDescription("Display this help and exit")
157                 .withLongOpt(LONGOPT_HELP)
158                 .create(OPT_HELP));
159 
160         // required arguments
161         opt.addOption(OptionBuilder.withDescription("the URL of the job service")
162                 .isRequired(true)
163                 .hasArg()
164                 .withArgName("URL")
165                 .withLongOpt(LONGOPT_RESOURCE)
166                 .create(OPT_RESOURCE));
167 
168         // optional arguments
169         opt.addOption(OptionBuilder.withDescription("generate the job description in the targeted grid language " +
170                 "and exit (do not submit the job)")
171                 .withLongOpt(LONGOPT_DESCRIPTION)
172                 .create(OPT_DESCRIPTION));
173 
174         // optional group
175         OptionGroup optGroup = new OptionGroup();
176         optGroup.addOption(OptionBuilder.withDescription("print the job identifier as soon as it is submitted, " +
177                 "and wait for it to be finished")
178                 .withLongOpt(LONGOPT_JOBID)
179                 .create(OPT_JOBID));
180         optGroup.addOption(OptionBuilder.withDescription("print the job identifier as soon as it is submitted, " +
181                 "and exit immediatly.")
182                 .withLongOpt(LONGOPT_BATCH)
183                 .create(OPT_BATCH));
184         optGroup.setRequired(false);
185         opt.addOptionGroup(optGroup);
186 
187         // required group
188         OptionGroup reqGroup = new OptionGroup();
189         reqGroup.addOption(OptionBuilder.withDescription("read job description from file <path>")
190                 .hasArg()
191                 .withArgName("path")
192                 .withLongOpt(LONGOPT_FILE)
193                 .create(OPT_FILE));
194         reqGroup.addOption(o("command to execute").hasArg().create(JobDescription.EXECUTABLE));
195         reqGroup.setRequired(true);
196         opt.addOptionGroup(reqGroup);
197 
198         // job description
199         opt.addOption(o("positional parameters for the command").hasArgs().create(JobDescription.ARGUMENTS));
200         opt.addOption(o("SPMD job type and startup mechanism").hasArg().create(JobDescription.SPMDVARIATION));
201         opt.addOption(o("total number of cpus requested for this job").hasArg().create(JobDescription.TOTALCPUCOUNT));
202         opt.addOption(o("number of process instances to start").hasArg().create(JobDescription.NUMBEROFPROCESSES));
203         opt.addOption(o("number of processes to start per host").hasArg().create(JobDescription.PROCESSESPERHOST));
204         opt.addOption(o("expected number of threads per process").hasArg().create(JobDescription.THREADSPERPROCESS));
205         opt.addOption(o("set of environment variables for the job").hasArgs().withValueSeparator().create(JobDescription.ENVIRONMENT));
206         opt.addOption(o("working directory for the job").hasArg().create(JobDescription.WORKINGDIRECTORY));
207         opt.addOption(o("run the job in interactive mode").create(JobDescription.INTERACTIVE));
208         opt.addOption(o("pathname of the standard input file").hasArg().create(JobDescription.INPUT));
209         opt.addOption(o("pathname of the standard output file").hasArg().create(JobDescription.OUTPUT));
210         opt.addOption(o("pathname of the standard error file").hasArg().create(JobDescription.ERROR));
211         opt.addOption(o("a list of file transfer directives").hasArgs().create(JobDescription.FILETRANSFER));
212         opt.addOption(o("defines if output files get removed after the job finishes").hasArg().create(JobDescription.CLEANUP));
213         opt.addOption(o("time at which a job should be scheduled").hasArg().create(JobDescription.JOBSTARTTIME));
214         opt.addOption(o("hard limit for the total job runtime").hasArg().create(JobDescription.WALLTIMELIMIT));
215         opt.addOption(o("estimated total number of CPU seconds which the job will require").hasArg().create(JobDescription.TOTALCPUTIME));
216         opt.addOption(o("estimated amount of memory the job requires").hasArg().create(JobDescription.TOTALPHYSICALMEMORY));
217         opt.addOption(o("compatible processor for job submission").hasArg().create(JobDescription.CPUARCHITECTURE));
218         opt.addOption(o("compatible operating system for job submission").hasArg().create(JobDescription.OPERATINGSYSTEMTYPE));
219         opt.addOption(o("list of host names which are to be considered by the resource manager as candidate targets").hasArgs().create(JobDescription.CANDIDATEHOSTS));
220         opt.addOption(o("name of a queue to place the job into").hasArg().create(JobDescription.QUEUE));
221         opt.addOption(o("name of an account or project name").hasArg().create(JobDescription.JOBPROJECT));
222         opt.addOption(o("set of endpoints describing where to report").hasArgs().create(JobDescription.JOBCONTACT));
223 
224         // returns
225         return opt;
226     }
227     private static OptionBuilder o(String description) {
228         return OptionBuilder.withDescription(description);
229     }
230 
231     private static JobDescription createJobDescription(Properties prop) throws Exception {
232         JobDescription desc = JobFactory.createJobDescription();
233         setRequired(desc, prop, JobDescription.EXECUTABLE);
234         setOptMulti(desc, prop, JobDescription.ARGUMENTS);
235         setOptional(desc, prop, JobDescription.SPMDVARIATION);
236         setOptional(desc, prop, JobDescription.TOTALCPUCOUNT);
237         setOptional(desc, prop, JobDescription.NUMBEROFPROCESSES);
238         setOptional(desc, prop, JobDescription.PROCESSESPERHOST);
239         setOptional(desc, prop, JobDescription.THREADSPERPROCESS);
240         setOptMulti(desc, prop, JobDescription.ENVIRONMENT);
241         setOptional(desc, prop, JobDescription.WORKINGDIRECTORY);
242         setOptional(desc, prop, JobDescription.INTERACTIVE);
243         setOptional(desc, prop, JobDescription.INPUT);
244         setOptional(desc, prop, JobDescription.OUTPUT);
245         setOptional(desc, prop, JobDescription.ERROR);
246         setOptMulti(desc, prop, JobDescription.FILETRANSFER);
247         setOptional(desc, prop, JobDescription.CLEANUP);
248         setOptional(desc, prop, JobDescription.JOBSTARTTIME);
249         setOptional(desc, prop, JobDescription.WALLTIMELIMIT);
250         setOptional(desc, prop, JobDescription.TOTALCPUTIME);
251         setOptional(desc, prop, JobDescription.TOTALPHYSICALMEMORY);
252         setOptional(desc, prop, JobDescription.CPUARCHITECTURE);
253         setOptional(desc, prop, JobDescription.OPERATINGSYSTEMTYPE);
254         setOptMulti(desc, prop, JobDescription.CANDIDATEHOSTS);
255         setOptional(desc, prop, JobDescription.QUEUE);
256         setOptional(desc, prop, JobDescription.JOBPROJECT);
257         setOptMulti(desc, prop, JobDescription.JOBCONTACT);
258         return desc;
259     }
260     private static void setRequired(JobDescription desc, Properties prop, String name) throws Exception {
261         String value = prop.getProperty(name);
262         if (value != null) {
263             desc.setAttribute(name, value);
264         } else {
265             throw new BadParameterException("Missing required attribute: "+name);
266         }
267     }
268     private static void setOptional(JobDescription desc, Properties prop, String name) throws Exception {
269         String value = prop.getProperty(name);
270         if (value != null) {
271             desc.setAttribute(name, value);
272         }
273     }
274     private static void setOptMulti(JobDescription desc, Properties prop, String name) throws Exception {
275         String values = prop.getProperty(name);
276         if (values != null) {
277             desc.setVectorAttribute(name, values.split(","));
278         }
279     }
280 
281     private static void copyStream(InputStream in, OutputStream out) throws IOException {
282         byte[] buffer = new byte[1024];
283         for (int len; (len=in.read(buffer))>0; ) {
284             out.write(buffer, 0, len);
285         }
286     }
287 }