1 package fr.in2p3.jsaga.impl.job.instance;
2
3 import fr.in2p3.jsaga.EngineProperties;
4 import fr.in2p3.jsaga.adaptor.job.SubState;
5 import fr.in2p3.jsaga.adaptor.job.control.JobControlAdaptor;
6 import fr.in2p3.jsaga.adaptor.job.control.advanced.*;
7 import fr.in2p3.jsaga.adaptor.job.control.interactive.*;
8 import fr.in2p3.jsaga.adaptor.job.control.staging.StagingJobAdaptor;
9 import fr.in2p3.jsaga.adaptor.job.control.staging.StagingJobAdaptorTwoPhase;
10 import fr.in2p3.jsaga.adaptor.job.control.staging.StagingTransfer;
11 import fr.in2p3.jsaga.adaptor.job.monitor.*;
12 import fr.in2p3.jsaga.engine.job.monitor.JobMonitorCallback;
13 import fr.in2p3.jsaga.engine.job.monitor.JobMonitorService;
14 import fr.in2p3.jsaga.impl.attributes.ScalarAttributeImpl;
15 import fr.in2p3.jsaga.impl.attributes.VectorAttributeImpl;
16 import fr.in2p3.jsaga.impl.job.instance.stream.*;
17 import fr.in2p3.jsaga.impl.job.service.AbstractSyncJobServiceImpl;
18 import fr.in2p3.jsaga.impl.job.staging.mgr.*;
19 import fr.in2p3.jsaga.impl.job.streaming.GenericStreamableJobAdaptor;
20 import fr.in2p3.jsaga.impl.job.streaming.mgr.StreamingManagerThroughSandbox;
21 import fr.in2p3.jsaga.impl.job.streaming.mgr.StreamingManagerThroughSandboxTwoPhase;
22 import fr.in2p3.jsaga.impl.permissions.AbstractJobPermissionsImpl;
23 import fr.in2p3.jsaga.sync.job.SyncJob;
24 import org.apache.log4j.Logger;
25 import org.ogf.saga.SagaObject;
26 import org.ogf.saga.error.*;
27 import org.ogf.saga.file.Directory;
28 import org.ogf.saga.job.JobDescription;
29 import org.ogf.saga.session.Session;
30 import org.ogf.saga.task.State;
31 import org.ogf.saga.namespace.Flags;
32
33 import java.io.*;
34 import java.util.Date;
35 import org.ogf.saga.task.TaskMode;
36
37
38
39
40
41
42
43
44
45
46
47 public abstract class AbstractSyncJobImpl extends AbstractJobPermissionsImpl implements SyncJob, JobMonitorCallback {
48
49
50
51
52 private static final String MODEL = "JSAGA";
53
54
55
56 public static final String NATIVEJOBDESCRIPTION = "NativeJobDescription";
57
58
59
60 public static final String OUTPUTURL = "OutputURL";
61
62
63
64 private static Logger s_logger = Logger.getLogger(AbstractSyncJobImpl.class);
65 private JobControlAdaptor m_controlAdaptor;
66 private GenericStreamableJobAdaptor m_genericStreamableJobAdaptor;
67 private JobMonitorService m_monitorService;
68 private JobAttributes m_attributes;
69 private JobMetrics m_metrics;
70 private JobDescription m_jobDescription;
71 private DataStagingManager m_stagingMgr;
72 private String m_uniqId;
73 private String m_nativeJobId;
74 private JobIOHandler m_IOHandler;
75 private Stdin m_stdin;
76 private Stdout m_stdout;
77 private Stdout m_stderr;
78 private boolean m_willStartListening;
79 private String m_currentModelState;
80
81
82
83
84 protected AbstractSyncJobImpl(Session session, String nativeJobDesc, JobDescription jobDesc, DataStagingManager stagingMgr, String uniqId, AbstractSyncJobServiceImpl service) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, TimeoutException, NoSuccessException {
85 this(session, service, true);
86 m_attributes.m_NativeJobDescription.setObject(nativeJobDesc);
87 m_jobDescription = jobDesc;
88 m_stagingMgr = stagingMgr;
89 m_uniqId = uniqId;
90 m_nativeJobId = null;
91 m_currentModelState = null;
92 m_genericStreamableJobAdaptor = null;
93 }
94
95
96
97
98 protected AbstractSyncJobImpl(Session session, String nativeJobId, DataStagingManager stagingMgr, AbstractSyncJobServiceImpl service) throws NotImplementedException, BadParameterException, TimeoutException, NoSuccessException {
99 this(session, service, false);
100 m_attributes.m_NativeJobDescription.setObject(null);
101 m_jobDescription = null;
102 m_stagingMgr = stagingMgr;
103 m_uniqId = null;
104 m_nativeJobId = nativeJobId;
105 m_currentModelState = null;
106 m_genericStreamableJobAdaptor = null;
107 }
108
109
110
111
112 private AbstractSyncJobImpl(Session session, AbstractSyncJobServiceImpl service, boolean create) throws NotImplementedException, BadParameterException, TimeoutException, NoSuccessException {
113 super(session, create);
114 m_attributes = new JobAttributes(this);
115 m_attributes.m_ServiceUrl.setObject(service.m_resourceManager.getString());
116 m_metrics = new JobMetrics(this);
117 m_controlAdaptor = service.m_controlAdaptor;
118 m_monitorService = service.m_monitorService;
119 m_IOHandler = null;
120 m_stdin = null;
121 m_stdout = null;
122 m_stderr = null;
123 m_willStartListening = false;
124 m_currentModelState = null;
125 m_genericStreamableJobAdaptor = null;
126 }
127
128
129
130
131 public SagaObject clone() throws CloneNotSupportedException {
132 AbstractSyncJobImpl clone = (AbstractSyncJobImpl) super.clone();
133 clone.m_attributes = m_attributes.clone();
134 clone.m_metrics = m_metrics.clone();
135 clone.m_controlAdaptor = m_controlAdaptor;
136 clone.m_monitorService = m_monitorService;
137 clone.m_jobDescription = m_jobDescription;
138 clone.m_stagingMgr = m_stagingMgr;
139 clone.m_uniqId = m_uniqId;
140 clone.m_nativeJobId = m_nativeJobId;
141 clone.m_IOHandler = m_IOHandler;
142 clone.m_stdin = m_stdin;
143 clone.m_stdout = m_stdout;
144 clone.m_stderr = m_stderr;
145 clone.m_genericStreamableJobAdaptor = m_genericStreamableJobAdaptor;
146 return clone;
147 }
148
149 private static boolean s_checkMatch = EngineProperties.getBoolean(EngineProperties.JOB_CONTROL_CHECK_MATCH);
150
151 protected void doSubmit() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
152 m_monitorService.checkState();
153 try {
154
155 String nativeJobDesc = m_attributes.m_NativeJobDescription.getObject();
156
157
158 m_metrics.m_StateDetail.setValue(MODEL + ":" + SubState.RUNNING_PRE_STAGING.toString());
159 if (m_stagingMgr instanceof DataStagingManagerThroughStream) {
160 ((DataStagingManagerThroughStream) m_stagingMgr).preStaging(this);
161 } else if (m_stagingMgr instanceof DataStagingManagerThroughSandboxOnePhase) {
162 ((DataStagingManagerThroughSandboxOnePhase) m_stagingMgr).preStaging(this, nativeJobDesc, m_uniqId);
163 }
164
165
166 if (this.isInteractive()) {
167 if (m_controlAdaptor instanceof StreamableJobInteractiveGet) {
168
169 JobIOGetterInteractive ioHandler = ((StreamableJobInteractiveGet) m_controlAdaptor).submitInteractive(nativeJobDesc, s_checkMatch);
170 if (ioHandler == null) {
171 throw new NotImplementedException("ADAPTOR ERROR: Method submitInteractive() must not return null: " + m_controlAdaptor.getClass().getName());
172 }
173
174
175 if (m_stdin == null) {
176 m_stdin = new JobStdinOutputStream(this);
177 }
178 m_stdin.openJobIOHandler(ioHandler);
179
180
181 if (m_stdout == null) {
182 m_stdout = new GetterInputStream(ioHandler.getStdout());
183 }
184 if (m_stderr == null) {
185 m_stderr = new GetterInputStream(ioHandler.getStderr());
186 }
187
188 m_IOHandler = ioHandler;
189 m_nativeJobId = m_IOHandler.getJobId();
190 } else if (m_controlAdaptor instanceof StreamableJobInteractiveSet) {
191
192 InputStream stdin = null;
193 if (m_stdin != null) {
194 stdin = ((PostconnectedStdinOutputStream) m_stdin).getInputStreamContainer();
195 }
196
197
198 if (m_stdout == null) {
199 m_stdout = new PreconnectedStdoutInputStream(this);
200 }
201 OutputStream stdout = ((PreconnectedStdoutInputStream) m_stdout).getOutputStreamContainer();
202 if (m_stderr == null) {
203 m_stderr = new PreconnectedStderrInputStream(this);
204 }
205 OutputStream stderr = ((PreconnectedStderrInputStream) m_stderr).getOutputStreamContainer();
206
207
208 m_nativeJobId = ((StreamableJobInteractiveSet) m_controlAdaptor).submitInteractive(
209 nativeJobDesc, s_checkMatch,
210 stdin, stdout, stderr);
211 } else if (m_controlAdaptor instanceof StreamableJobBatch) {
212
213 InputStream stdin;
214 if (m_stdin != null && m_stdin.getBuffer().length > 0) {
215 stdin = new ByteArrayInputStream(m_stdin.getBuffer());
216 } else {
217 stdin = null;
218 }
219
220
221 m_IOHandler = ((StreamableJobBatch) m_controlAdaptor).submit(nativeJobDesc, s_checkMatch, m_uniqId, stdin);
222 if (m_IOHandler == null) {
223 throw new NotImplementedException("ADAPTOR ERROR: Method submit() must not return null: " + m_controlAdaptor.getClass().getName());
224 }
225 m_nativeJobId = m_IOHandler.getJobId();
226 } else {
227 throw new NotImplementedException("Interactive jobs are not supported by this adaptor: " + m_controlAdaptor.getClass().getName());
228 }
229 } else {
230 if (m_stagingMgr instanceof StreamingManagerThroughSandboxTwoPhase) {
231 m_genericStreamableJobAdaptor = new GenericStreamableJobAdaptor((StagingJobAdaptor) m_controlAdaptor);
232
233 InputStream stdin;
234 if (m_stdin != null && m_stdin.getBuffer().length > 0) {
235 stdin = new ByteArrayInputStream(m_stdin.getBuffer());
236 } else {
237 stdin = null;
238 }
239
240
241 m_IOHandler = m_genericStreamableJobAdaptor.submit(nativeJobDesc, s_checkMatch, m_uniqId, stdin);
242 if (m_IOHandler == null) {
243 throw new NotImplementedException("ADAPTOR ERROR: Method submit() must not return null: " + m_genericStreamableJobAdaptor.getClass().getName());
244 }
245 m_nativeJobId = m_IOHandler.getJobId();
246 } else {
247 m_nativeJobId = m_controlAdaptor.submit(nativeJobDesc, s_checkMatch, m_uniqId);
248 }
249 }
250 String monitorUrl = m_monitorService.getURL().getString();
251 String sagaJobId = "[" + monitorUrl + "]-[" + m_nativeJobId + "]";
252 m_attributes.m_JobId.setObject(sagaJobId);
253
254
255 if (m_willStartListening) {
256 m_willStartListening = false;
257 this.startListening();
258 }
259
260
261 if (m_stagingMgr instanceof DataStagingManagerThroughSandboxTwoPhase) {
262 ((DataStagingManagerThroughSandboxTwoPhase) m_stagingMgr).preStaging(this, m_nativeJobId);
263 }
264
265
266 if (m_controlAdaptor instanceof StagingJobAdaptorTwoPhase) {
267 ((StagingJobAdaptorTwoPhase) m_controlAdaptor).start(m_nativeJobId);
268 }
269 } catch (AuthorizationFailedException e) {
270 throw new NoSuccessException(e);
271 } catch (AuthenticationFailedException e) {
272 throw new NoSuccessException(e);
273 } catch (PermissionDeniedException e) {
274 throw new NoSuccessException(e);
275 } catch (DoesNotExistException e) {
276 throw new NoSuccessException(e);
277 } catch (BadParameterException e) {
278 throw new NoSuccessException(e);
279 }
280 }
281
282 protected void doCancel() {
283 try {
284 m_monitorService.checkState();
285 } catch (SagaException e) {
286 throw new RuntimeException(e);
287 }
288 if (m_nativeJobId == null) {
289 throw new RuntimeException("INTERNAL ERROR: JobID not initialized");
290 }
291 try {
292 m_controlAdaptor.cancel(m_nativeJobId);
293 this.setState(State.CANCELED, "USER:Canceled", SubState.CANCEL_REQUESTED, new IncorrectStateException("Canceled by user"));
294 } catch (SagaException e) {
295
296 s_logger.warn("Could not cancel job " + m_nativeJobId + ": " + e.getMessage());
297 }
298 }
299
300 protected State queryState() throws NotImplementedException, TimeoutException, NoSuccessException {
301 m_monitorService.checkState();
302 JobStatus status = m_monitorService.getState(m_nativeJobId);
303
304 this.setJobState(status.getSagaState(), status.getStateDetail(), status.getSubState(), status.getCause());
305
306
307 this.closeStreamsIfDoneAndInteractive();
308
309
310 return status.getSagaState();
311 }
312
313 public boolean startListening() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
314 m_monitorService.checkState();
315 if (m_nativeJobId == null) {
316 m_willStartListening = true;
317 } else {
318 m_monitorService.startListening(m_nativeJobId, this);
319 }
320 return true;
321 }
322
323 public void stopListening() throws NotImplementedException, TimeoutException, NoSuccessException {
324 m_monitorService.checkState();
325 if (m_nativeJobId == null) {
326 return;
327 }
328 m_monitorService.stopListening(m_nativeJobId);
329
330 if (!(m_stagingMgr instanceof StreamingManagerThroughSandbox)) {
331
332 this.closeStreamsIfDoneAndInteractive();
333 }
334
335 if (this.isFinalState()) {
336
337 try {
338 this.postStaging();
339 if (m_stagingMgr instanceof StreamingManagerThroughSandbox) {
340
341 this.closeStreamsIfDoneAndInteractive();
342 }
343 } catch (PermissionDeniedException e) {
344 throw new NoSuccessException(e);
345 } catch (IncorrectStateException e) {
346 throw new NoSuccessException(e);
347 }
348
349 try {
350 this.cleanUp();
351 } catch (SagaException e) {
352 s_logger.warn("Failed to cleanup job: " + m_nativeJobId, e);
353 }
354 }
355 }
356
357 private void closeStreamsIfDoneAndInteractive() {
358 if (this.isFinalState() && m_IOHandler != null) {
359 if (m_controlAdaptor instanceof StreamableJobInteractiveGet || m_controlAdaptor instanceof StreamableJobBatch || m_stagingMgr instanceof StreamingManagerThroughSandbox) {
360 try {
361 if (m_stdout == null) {
362 m_stdout = new JobStdoutInputStream(this, m_IOHandler);
363 }
364 m_stdout.closeJobIOHandler();
365 } catch (Exception e) {
366 s_logger.warn("Failed to get job output stream: " + m_nativeJobId, e);
367 }
368 try {
369 if (m_stderr == null) {
370 m_stderr = new JobStderrInputStream(this, m_IOHandler);
371 }
372 m_stderr.closeJobIOHandler();
373 } catch (Exception e) {
374 s_logger.warn("Failed to get job error stream: " + m_nativeJobId, e);
375 }
376 }
377 }
378 }
379
380 public void postStagingAndCleanup() throws NotImplementedException, PermissionDeniedException, IncorrectStateException, TimeoutException, NoSuccessException {
381 m_monitorService.checkState();
382 State state = this.getState();
383 if (this.isFinalState()) {
384
385 this.postStaging();
386
387 this.cleanUp();
388 } else {
389 throw new IncorrectStateException("Can not cleanup unfinished job: " + state, this);
390 }
391 }
392
393 private void postStaging() throws NotImplementedException, PermissionDeniedException, IncorrectStateException, TimeoutException, NoSuccessException {
394 if (this.isFinalState()) {
395
396 try {
397 m_stagingMgr.postStaging(this, m_nativeJobId);
398 } catch (AuthenticationFailedException e) {
399 throw new NoSuccessException(e);
400 } catch (AuthorizationFailedException e) {
401 throw new NoSuccessException(e);
402 } catch (BadParameterException e) {
403 throw new NoSuccessException(e);
404 } catch (DoesNotExistException e) {
405 throw new NoSuccessException(e);
406 }
407 }
408 }
409
410 private void cleanUp() throws NotImplementedException, PermissionDeniedException, IncorrectStateException, TimeoutException, NoSuccessException {
411
412 Directory dir = null;
413
414
415 try {
416 dir = m_stagingMgr.cleanup(this, m_nativeJobId);
417 } catch (AuthenticationFailedException e) {
418 throw new NoSuccessException(e);
419 } catch (AuthorizationFailedException e) {
420 throw new NoSuccessException(e);
421 } catch (BadParameterException e) {
422 throw new NoSuccessException(e);
423 } catch (DoesNotExistException e) {
424 throw new NoSuccessException(e);
425 } catch (NotImplementedException e){
426
427
428 s_logger.info("Could not clean staged files:" + e.getMessage());
429 }
430 try {
431
432 if (m_controlAdaptor instanceof CleanableJobAdaptor) {
433 try {
434 JobInfoAdaptor jia = getJobInfoAdaptor();
435 try {
436 setStaticValue(m_attributes.m_Created, jia.getCreated(m_nativeJobId));
437 } catch (Exception e) {
438 s_logger.warn(e.getMessage());
439 }
440 try {
441 setStaticValue(m_attributes.m_Started, jia.getStarted(m_nativeJobId));
442 } catch (Exception e) {
443 s_logger.warn(e.getMessage());
444 }
445 try {
446 setStaticValue(m_attributes.m_Finished, jia.getFinished(m_nativeJobId));
447 } catch (Exception e) {
448 s_logger.warn(e.getMessage());
449 }
450 try {
451 setStaticValue(m_attributes.m_ExitCode, jia.getExitCode(m_nativeJobId));
452 } catch (Exception e) {
453 s_logger.warn(e.getMessage());
454 }
455 try {
456 setStaticValues(m_attributes.m_ExecutionHosts, jia.getExecutionHosts(m_nativeJobId));
457 } catch (Exception e) {
458 s_logger.warn(e.getMessage());
459 }
460 } catch (NotImplementedException nie) {
461
462 }
463
464 ((CleanableJobAdaptor) m_controlAdaptor).clean(m_nativeJobId);
465 }
466
467
468 if (dir != null) {
469 try {
470 dir.remove();
471 } catch (AuthenticationFailedException e) {
472 throw new NoSuccessException(e);
473 } catch (AuthorizationFailedException e) {
474 throw new NoSuccessException(e);
475 } catch (BadParameterException e) {
476 throw new NoSuccessException(e);
477 }
478 }
479
480 } finally {
481 if (dir != null) {
482 dir.close();
483 }
484 }
485
486 if (!(m_controlAdaptor instanceof CleanableJobAdaptor)) {
487 throw new NotImplementedException("Cleanup is not supported by this adaptor: " + m_controlAdaptor.getClass().getName());
488 }
489 }
490
491
492 public JobDescription getJobDescriptionSync() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, DoesNotExistException, TimeoutException, NoSuccessException {
493 return m_jobDescription;
494 }
495
496 public OutputStream getStdinSync() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, DoesNotExistException, TimeoutException, IncorrectStateException, NoSuccessException {
497 m_monitorService.checkState();
498 if (this.isInteractive() || m_stagingMgr instanceof StreamingManagerThroughSandbox) {
499 if (m_stdin == null) {
500 if (m_controlAdaptor instanceof StreamableJobInteractiveSet) {
501 m_stdin = new PostconnectedStdinOutputStream(this);
502 } else {
503 m_stdin = new JobStdinOutputStream(this);
504 }
505 }
506 return m_stdin;
507 } else {
508 throw new IncorrectStateException("Method getStdin() is allowed on interactive jobs only", this);
509 }
510 }
511
512 public InputStream getStdoutSync() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, DoesNotExistException, TimeoutException, IncorrectStateException, NoSuccessException {
513 m_monitorService.checkState();
514 if (this.isInteractive() || m_stagingMgr instanceof StreamingManagerThroughSandbox) {
515 if (m_stdout == null) {
516 m_stdout = new JobStdoutInputStream(this, m_IOHandler);
517 }
518 return m_stdout;
519 } else {
520 throw new IncorrectStateException("Method getStdout() is allowed on interactive jobs only", this);
521 }
522 }
523
524 public InputStream getStderrSync() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, DoesNotExistException, TimeoutException, IncorrectStateException, NoSuccessException {
525 m_monitorService.checkState();
526 if (this.isInteractive() || m_stagingMgr instanceof StreamingManagerThroughSandbox) {
527 if (m_stderr == null) {
528 m_stderr = new JobStderrInputStream(this, m_IOHandler);
529 }
530 return m_stderr;
531 } else {
532 throw new IncorrectStateException("Method getStderr() is allowed on interactive jobs only", this);
533 }
534 }
535
536 public void suspendSync() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, IncorrectStateException, TimeoutException, NoSuccessException {
537 m_monitorService.checkState();
538 if (m_nativeJobId == null) {
539 throw new IncorrectStateException("Can not suspend job in 'New' state", this);
540 }
541 if (m_controlAdaptor instanceof HoldableJobAdaptor && m_controlAdaptor instanceof SuspendableJobAdaptor) {
542 if (!((HoldableJobAdaptor) m_controlAdaptor).hold(m_nativeJobId)) {
543 if (!((SuspendableJobAdaptor) m_controlAdaptor).suspend(m_nativeJobId)) {
544 if (getJobState().equals(State.NEW) || getJobState().equals(State.RUNNING)) {
545 throw new NoSuccessException("Failed to hold/suspend job, the plugin returned False: " + m_nativeJobId);
546 } else {
547 throw new IncorrectStateException("Failed to hold/suspend job because it is neither queued nor active: " + m_nativeJobId);
548 }
549 }
550 }
551 } else if (m_controlAdaptor instanceof HoldableJobAdaptor) {
552 if (!((HoldableJobAdaptor) m_controlAdaptor).hold(m_nativeJobId)) {
553 if (!getJobState().equals(State.NEW)) {
554 throw new IncorrectStateException("Failed to hold job because it is not queued: " + m_nativeJobId);
555 } else {
556 throw new NoSuccessException("Failed to hold job; the plugin returned False");
557 }
558 }
559 } else if (m_controlAdaptor instanceof SuspendableJobAdaptor) {
560 if (!((SuspendableJobAdaptor) m_controlAdaptor).suspend(m_nativeJobId)) {
561 if (!getJobState().equals(State.RUNNING)) {
562 throw new IncorrectStateException("Failed to suspend job because if is not active: " + m_nativeJobId);
563 } else {
564 throw new NoSuccessException("Failed to suspend job; the plugin returned False");
565 }
566 }
567 } else {
568 throw new NotImplementedException("Suspend is not supported by this adaptor: " + m_controlAdaptor.getClass().getName());
569 }
570 }
571
572 public void resumeSync() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, IncorrectStateException, TimeoutException, NoSuccessException {
573 m_monitorService.checkState();
574 if (m_nativeJobId == null) {
575 throw new IncorrectStateException("Can not resume job in 'New' state", this);
576 }
577 if (m_controlAdaptor instanceof HoldableJobAdaptor && m_controlAdaptor instanceof SuspendableJobAdaptor) {
578 if (!((HoldableJobAdaptor) m_controlAdaptor).release(m_nativeJobId)) {
579 if (!((SuspendableJobAdaptor) m_controlAdaptor).resume(m_nativeJobId)) {
580 if (!getState().equals(State.SUSPENDED)) {
581 throw new IncorrectStateException("Failed to release/resume job because it is neither held nor suspended: " + m_nativeJobId);
582 } else {
583 throw new NoSuccessException("Failed to release/resume job; the plugin returned False");
584 }
585 }
586 }
587 } else if (m_controlAdaptor instanceof HoldableJobAdaptor) {
588 if (!((HoldableJobAdaptor) m_controlAdaptor).release(m_nativeJobId)) {
589 if (!getState().equals(State.SUSPENDED)) {
590 throw new IncorrectStateException("Failed to release job because it is not held: " + m_nativeJobId);
591 } else {
592 throw new NoSuccessException("Failed to release job; the plugin returned False");
593 }
594 }
595 } else if (m_controlAdaptor instanceof SuspendableJobAdaptor) {
596 if (!((SuspendableJobAdaptor) m_controlAdaptor).resume(m_nativeJobId)) {
597 if (!getState().equals(State.SUSPENDED)) {
598 throw new IncorrectStateException("Failed to resume job because if is not suspended: " + m_nativeJobId);
599 } else {
600 throw new NoSuccessException("Failed to resume job; the plugin returned False");
601 }
602 }
603 } else {
604 throw new NotImplementedException("Resume is not supported by this adaptor: " + m_controlAdaptor.getClass().getName());
605 }
606 }
607
608 public void checkpointSync() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, IncorrectStateException, TimeoutException, NoSuccessException {
609 m_monitorService.checkState();
610 if (m_nativeJobId == null) {
611 throw new IncorrectStateException("Can not checkpoint job in 'New' state", this);
612 }
613 if (m_controlAdaptor instanceof CheckpointableJobAdaptor) {
614 if (!((CheckpointableJobAdaptor) m_controlAdaptor).checkpoint(m_nativeJobId)) {
615 throw new NoSuccessException("Failed to checkpoint job: " + m_nativeJobId);
616 }
617 } else {
618 throw new NotImplementedException("Checkpoint is not supported by this adaptor: " + m_controlAdaptor.getClass().getName());
619 }
620 }
621
622 public void migrateSync(JobDescription jd) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, IncorrectStateException, TimeoutException, NoSuccessException {
623 m_monitorService.checkState();
624 if (m_nativeJobId == null) {
625 throw new IncorrectStateException("Can not migrate job in 'New' state", this);
626 }
627 throw new NotImplementedException("Not implemented yet...");
628
629 }
630
631 public void signalSync(int signum) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, IncorrectStateException, TimeoutException, NoSuccessException {
632 m_monitorService.checkState();
633 if (m_nativeJobId == null) {
634 throw new IncorrectStateException("Can not send signal to job in 'New' state", this);
635 }
636 if (m_controlAdaptor instanceof SignalableJobAdaptor) {
637 if (!((SignalableJobAdaptor) m_controlAdaptor).signal(m_nativeJobId, signum)) {
638 throw new NoSuccessException("Failed to signal job: " + m_nativeJobId);
639 }
640 } else {
641 throw new NotImplementedException("Signal is not supported by this adaptor: " + m_controlAdaptor.getClass().getName());
642 }
643 }
644
645
646 public State getJobState() {
647 return m_metrics.m_State.getValue();
648 }
649
650
651
652
653
654 public void setState(State state, String stateDetail, SubState subState, SagaException cause) {
655
656 if (m_currentModelState == null || !m_currentModelState.equals(stateDetail)) {
657 m_currentModelState = stateDetail;
658 m_monitorService.getStateLogger().debug("State changed to " + stateDetail + " for job " + m_attributes.m_JobId.getObject());
659 }
660
661 this.setJobState(state, stateDetail, subState, cause);
662
663 super.setState(state);
664 }
665
666
667
668
669 private synchronized void setJobState(State state, String stateDetail, SubState subState, SagaException cause) {
670
671 if (!this.isFinalState()) {
672
673 if (cause != null) {
674 super.setException(cause);
675 }
676
677 m_metrics.m_State.setValue(state);
678 m_metrics.m_StateDetail.setValue(stateDetail);
679 m_metrics.m_StateDetail.setValue(MODEL + ":" + subState.toString());
680 }
681 }
682
683
684 String getNativeJobId() {
685 return m_nativeJobId;
686 }
687
688 JobInfoAdaptor getJobInfoAdaptor() throws NotImplementedException {
689 JobMonitorAdaptor monitorAdaptor = m_monitorService.getAdaptor();
690 if (monitorAdaptor instanceof JobInfoAdaptor) {
691 return (JobInfoAdaptor) monitorAdaptor;
692 } else {
693 throw new NotImplementedException("Job attribute not supported by this adaptor: " + monitorAdaptor.getClass());
694 }
695 }
696
697 StagingTransfer[] getOutputStagingTransfer() throws PermissionDeniedException, TimeoutException, NoSuccessException {
698 if (m_stagingMgr instanceof DataStagingManagerThroughSandbox) {
699 return ((DataStagingManagerThroughSandbox) m_stagingMgr).getOutputStagingTransfer(m_nativeJobId);
700 }
701 return null;
702 }
703
704
705 private boolean isFinalState() {
706
707 State state = m_metrics.m_State.getValue();
708 if (state == null) {
709 state = State.RUNNING;
710 }
711
712
713 switch (state) {
714 case DONE:
715 case CANCELED:
716 case FAILED:
717 return true;
718 default:
719 return false;
720 }
721 }
722
723 private boolean isInteractive() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, IncorrectStateException, TimeoutException, NoSuccessException {
724 try {
725 return "true".equalsIgnoreCase(m_jobDescription.getAttribute(JobDescription.INTERACTIVE));
726 } catch (DoesNotExistException e) {
727 return false;
728 }
729 }
730
731 private void setStaticValue(ScalarAttributeImpl<Date> attr, final Date value) {
732 attr = _addAttribute(new ScalarAttributeImpl<Date>(attr.getKey(), null, attr.getMode(), attr.getType(), new Date()) {
733
734 public String getValue() {
735 return value.toString();
736 }
737 });
738 }
739
740 private void setStaticValue(ScalarAttributeImpl<Integer> attr, final Integer value) {
741 attr = _addAttribute(new ScalarAttributeImpl<Integer>(attr.getKey(), null, attr.getMode(), attr.getType(), null) {
742
743 public String getValue() {
744 return value.toString();
745 }
746 });
747 }
748
749 private void setStaticValues(VectorAttributeImpl<String> attr, final String[] values) {
750 attr = _addVectorAttribute(new VectorAttributeImpl<String>(attr.getKey(), null, attr.getMode(), attr.getType(), null) {
751
752 public String[] getValues() {
753 return values;
754 }
755 });
756 }
757 }