1 package fr.in2p3.jsaga.command;
2
3 import org.apache.commons.cli.*;
4 import org.ogf.saga.error.BadParameterException;
5 import org.ogf.saga.error.SagaException;
6 import org.ogf.saga.job.*;
7 import org.ogf.saga.session.Session;
8 import org.ogf.saga.session.SessionFactory;
9 import org.ogf.saga.task.State;
10 import org.ogf.saga.url.URL;
11 import org.ogf.saga.url.URLFactory;
12
13 import java.io.*;
14 import java.util.Iterator;
15 import java.util.Properties;
16
17
18
19
20
21
22
23
24
25
26
27
28
29 public class JobRun extends AbstractCommand {
30 private static final String OPT_HELP = "h", LONGOPT_HELP = "help";
31
32 private static final String OPT_RESOURCE = "r", LONGOPT_RESOURCE = "resource";
33
34 private static final String OPT_FILE = "f", LONGOPT_FILE = "file";
35 private static final String OPT_DESCRIPTION = "d", LONGOPT_DESCRIPTION = "description";
36 private static final String OPT_JOBID = "i", LONGOPT_JOBID = "jobid";
37 private static final String OPT_BATCH = "b", LONGOPT_BATCH = "batch";
38
39 protected JobRun() {
40 super("jsaga-job-run", null, null, new GnuParser());
41 }
42
43 public static void main(String[] args) throws Exception {
44 JobRun command = new JobRun();
45 CommandLine line = command.parse(args);
46 if (line.hasOption(OPT_HELP))
47 {
48 command.printHelpAndExit(null);
49 }
50 else
51 {
52
53 URL serviceURL = URLFactory.createURL(line.getOptionValue(OPT_RESOURCE));
54 String file = line.getOptionValue(OPT_FILE);
55
56
57 Properties prop = new Properties();
58 if (file != null) {
59 prop.load(new FileInputStream(file));
60 }
61 for (Iterator it=line.iterator(); it.hasNext(); ) {
62 Option opt = (Option) it.next();
63 if (opt.getValue() != null) {
64 prop.setProperty(opt.getOpt(), opt.getValue());
65 } else {
66 prop.setProperty(opt.getOpt(), Boolean.toString(true));
67 }
68 }
69 JobDescription desc = createJobDescription(prop);
70 boolean isStreamRedirected = prop.containsKey(JobDescription.INPUT) || prop.containsKey(JobDescription.OUTPUT) || prop.containsKey(JobDescription.ERROR);
71 if (!line.hasOption(OPT_BATCH) && !isStreamRedirected) {
72 desc.setAttribute(JobDescription.INTERACTIVE, "true");
73 }
74
75
76 Session session = SessionFactory.createSession(true);
77 JobService service = JobFactory.createJobService(session, serviceURL);
78 final Job job = service.createJob(desc);
79
80 if (line.hasOption(OPT_DESCRIPTION)) {
81
82 String nativeDesc = job.getAttribute("NativeJobDescription");
83 System.out.println(nativeDesc);
84 } else {
85
86 job.run();
87
88
89 if (line.hasOption(OPT_JOBID) || line.hasOption(OPT_BATCH)) {
90 String jobId = job.getAttribute(Job.JOBID);
91 System.out.println(jobId);
92 }
93
94
95 if (! line.hasOption(OPT_BATCH)) {
96
97 Thread hook = new Thread(){
98 public void run() {
99
100 try {
101 System.out.println("Canceling job: "+job.getAttribute(Job.JOBID));
102 job.cancel();
103 } catch (SagaException e) {
104 e.printStackTrace();
105 }
106
107 try {sleep(1000);} catch(InterruptedException e){e.printStackTrace();}
108 }
109 };
110 Runtime.getRuntime().addShutdownHook(hook);
111
112
113 job.waitFor();
114
115
116 State state = job.getState();
117 if (State.CANCELED.compareTo(state) == 0) {
118 System.out.println("Job canceled.");
119 } else {
120 Runtime.getRuntime().removeShutdownHook(hook);
121 if (State.DONE.compareTo(state) == 0) {
122 try {
123 if ("true".equalsIgnoreCase(desc.getAttribute(JobDescription.INTERACTIVE))) {
124 copyStream(job.getStdout(), System.out);
125 } else {
126 System.out.println("Job done.");
127 }
128 } catch(SagaException e) {
129 System.out.println("Job done.");
130 }
131 } else if (State.FAILED.compareTo(state) == 0) {
132 try {
133 if ("true".equalsIgnoreCase(desc.getAttribute(JobDescription.INTERACTIVE))) {
134 copyStream(job.getStderr(), System.err);
135 }
136 String exitCode = job.getAttribute(Job.EXITCODE);
137 System.out.println("Job failed with exit code: "+exitCode);
138 } catch(SagaException e) {
139 System.out.println("Job failed.");
140 job.rethrow();
141 }
142 } else {
143 throw new Exception("Unexpected state: "+ state);
144 }
145 }
146 }
147 }
148 System.exit(0);
149 }
150 }
151
152 protected Options createOptions() {
153 Options opt = new Options();
154
155
156 opt.addOption(OptionBuilder.withDescription("Display this help and exit")
157 .withLongOpt(LONGOPT_HELP)
158 .create(OPT_HELP));
159
160
161 opt.addOption(OptionBuilder.withDescription("the URL of the job service")
162 .isRequired(true)
163 .hasArg()
164 .withArgName("URL")
165 .withLongOpt(LONGOPT_RESOURCE)
166 .create(OPT_RESOURCE));
167
168
169 opt.addOption(OptionBuilder.withDescription("generate the job description in the targeted grid language " +
170 "and exit (do not submit the job)")
171 .withLongOpt(LONGOPT_DESCRIPTION)
172 .create(OPT_DESCRIPTION));
173
174
175 OptionGroup optGroup = new OptionGroup();
176 optGroup.addOption(OptionBuilder.withDescription("print the job identifier as soon as it is submitted, " +
177 "and wait for it to be finished")
178 .withLongOpt(LONGOPT_JOBID)
179 .create(OPT_JOBID));
180 optGroup.addOption(OptionBuilder.withDescription("print the job identifier as soon as it is submitted, " +
181 "and exit immediatly.")
182 .withLongOpt(LONGOPT_BATCH)
183 .create(OPT_BATCH));
184 optGroup.setRequired(false);
185 opt.addOptionGroup(optGroup);
186
187
188 OptionGroup reqGroup = new OptionGroup();
189 reqGroup.addOption(OptionBuilder.withDescription("read job description from file <path>")
190 .hasArg()
191 .withArgName("path")
192 .withLongOpt(LONGOPT_FILE)
193 .create(OPT_FILE));
194 reqGroup.addOption(o("command to execute").hasArg().create(JobDescription.EXECUTABLE));
195 reqGroup.setRequired(true);
196 opt.addOptionGroup(reqGroup);
197
198
199 opt.addOption(o("positional parameters for the command").hasArgs().create(JobDescription.ARGUMENTS));
200 opt.addOption(o("SPMD job type and startup mechanism").hasArg().create(JobDescription.SPMDVARIATION));
201 opt.addOption(o("total number of cpus requested for this job").hasArg().create(JobDescription.TOTALCPUCOUNT));
202 opt.addOption(o("number of process instances to start").hasArg().create(JobDescription.NUMBEROFPROCESSES));
203 opt.addOption(o("number of processes to start per host").hasArg().create(JobDescription.PROCESSESPERHOST));
204 opt.addOption(o("expected number of threads per process").hasArg().create(JobDescription.THREADSPERPROCESS));
205 opt.addOption(o("set of environment variables for the job").hasArgs().withValueSeparator().create(JobDescription.ENVIRONMENT));
206 opt.addOption(o("working directory for the job").hasArg().create(JobDescription.WORKINGDIRECTORY));
207 opt.addOption(o("run the job in interactive mode").create(JobDescription.INTERACTIVE));
208 opt.addOption(o("pathname of the standard input file").hasArg().create(JobDescription.INPUT));
209 opt.addOption(o("pathname of the standard output file").hasArg().create(JobDescription.OUTPUT));
210 opt.addOption(o("pathname of the standard error file").hasArg().create(JobDescription.ERROR));
211 opt.addOption(o("a list of file transfer directives").hasArgs().create(JobDescription.FILETRANSFER));
212 opt.addOption(o("defines if output files get removed after the job finishes").hasArg().create(JobDescription.CLEANUP));
213 opt.addOption(o("time at which a job should be scheduled").hasArg().create(JobDescription.JOBSTARTTIME));
214 opt.addOption(o("hard limit for the total job runtime").hasArg().create(JobDescription.WALLTIMELIMIT));
215 opt.addOption(o("estimated total number of CPU seconds which the job will require").hasArg().create(JobDescription.TOTALCPUTIME));
216 opt.addOption(o("estimated amount of memory the job requires").hasArg().create(JobDescription.TOTALPHYSICALMEMORY));
217 opt.addOption(o("compatible processor for job submission").hasArg().create(JobDescription.CPUARCHITECTURE));
218 opt.addOption(o("compatible operating system for job submission").hasArg().create(JobDescription.OPERATINGSYSTEMTYPE));
219 opt.addOption(o("list of host names which are to be considered by the resource manager as candidate targets").hasArgs().create(JobDescription.CANDIDATEHOSTS));
220 opt.addOption(o("name of a queue to place the job into").hasArg().create(JobDescription.QUEUE));
221 opt.addOption(o("name of an account or project name").hasArg().create(JobDescription.JOBPROJECT));
222 opt.addOption(o("set of endpoints describing where to report").hasArgs().create(JobDescription.JOBCONTACT));
223
224
225 return opt;
226 }
227 private static OptionBuilder o(String description) {
228 return OptionBuilder.withDescription(description);
229 }
230
231 private static JobDescription createJobDescription(Properties prop) throws Exception {
232 JobDescription desc = JobFactory.createJobDescription();
233 setRequired(desc, prop, JobDescription.EXECUTABLE);
234 setOptMulti(desc, prop, JobDescription.ARGUMENTS);
235 setOptional(desc, prop, JobDescription.SPMDVARIATION);
236 setOptional(desc, prop, JobDescription.TOTALCPUCOUNT);
237 setOptional(desc, prop, JobDescription.NUMBEROFPROCESSES);
238 setOptional(desc, prop, JobDescription.PROCESSESPERHOST);
239 setOptional(desc, prop, JobDescription.THREADSPERPROCESS);
240 setOptMulti(desc, prop, JobDescription.ENVIRONMENT);
241 setOptional(desc, prop, JobDescription.WORKINGDIRECTORY);
242 setOptional(desc, prop, JobDescription.INTERACTIVE);
243 setOptional(desc, prop, JobDescription.INPUT);
244 setOptional(desc, prop, JobDescription.OUTPUT);
245 setOptional(desc, prop, JobDescription.ERROR);
246 setOptMulti(desc, prop, JobDescription.FILETRANSFER);
247 setOptional(desc, prop, JobDescription.CLEANUP);
248 setOptional(desc, prop, JobDescription.JOBSTARTTIME);
249 setOptional(desc, prop, JobDescription.WALLTIMELIMIT);
250 setOptional(desc, prop, JobDescription.TOTALCPUTIME);
251 setOptional(desc, prop, JobDescription.TOTALPHYSICALMEMORY);
252 setOptional(desc, prop, JobDescription.CPUARCHITECTURE);
253 setOptional(desc, prop, JobDescription.OPERATINGSYSTEMTYPE);
254 setOptMulti(desc, prop, JobDescription.CANDIDATEHOSTS);
255 setOptional(desc, prop, JobDescription.QUEUE);
256 setOptional(desc, prop, JobDescription.JOBPROJECT);
257 setOptMulti(desc, prop, JobDescription.JOBCONTACT);
258 return desc;
259 }
260 private static void setRequired(JobDescription desc, Properties prop, String name) throws Exception {
261 String value = prop.getProperty(name);
262 if (value != null) {
263 desc.setAttribute(name, value);
264 } else {
265 throw new BadParameterException("Missing required attribute: "+name);
266 }
267 }
268 private static void setOptional(JobDescription desc, Properties prop, String name) throws Exception {
269 String value = prop.getProperty(name);
270 if (value != null) {
271 desc.setAttribute(name, value);
272 }
273 }
274 private static void setOptMulti(JobDescription desc, Properties prop, String name) throws Exception {
275 String values = prop.getProperty(name);
276 if (values != null) {
277 desc.setVectorAttribute(name, values.split(","));
278 }
279 }
280
281 private static void copyStream(InputStream in, OutputStream out) throws IOException {
282 byte[] buffer = new byte[1024];
283 for (int len; (len=in.read(buffer))>0; ) {
284 out.write(buffer, 0, len);
285 }
286 }
287 }