View Javadoc

1   package fr.in2p3.jsaga.impl.job.instance;
2   
3   import fr.in2p3.jsaga.EngineProperties;
4   import fr.in2p3.jsaga.adaptor.job.SubState;
5   import fr.in2p3.jsaga.adaptor.job.control.JobControlAdaptor;
6   import fr.in2p3.jsaga.adaptor.job.control.advanced.*;
7   import fr.in2p3.jsaga.adaptor.job.control.interactive.*;
8   import fr.in2p3.jsaga.adaptor.job.control.staging.StagingJobAdaptor;
9   import fr.in2p3.jsaga.adaptor.job.control.staging.StagingJobAdaptorTwoPhase;
10  import fr.in2p3.jsaga.adaptor.job.control.staging.StagingTransfer;
11  import fr.in2p3.jsaga.adaptor.job.monitor.*;
12  import fr.in2p3.jsaga.engine.job.monitor.JobMonitorCallback;
13  import fr.in2p3.jsaga.engine.job.monitor.JobMonitorService;
14  import fr.in2p3.jsaga.impl.attributes.ScalarAttributeImpl;
15  import fr.in2p3.jsaga.impl.attributes.VectorAttributeImpl;
16  import fr.in2p3.jsaga.impl.job.instance.stream.*;
17  import fr.in2p3.jsaga.impl.job.service.AbstractSyncJobServiceImpl;
18  import fr.in2p3.jsaga.impl.job.staging.mgr.*;
19  import fr.in2p3.jsaga.impl.job.streaming.GenericStreamableJobAdaptor;
20  import fr.in2p3.jsaga.impl.job.streaming.mgr.StreamingManagerThroughSandbox;
21  import fr.in2p3.jsaga.impl.job.streaming.mgr.StreamingManagerThroughSandboxTwoPhase;
22  import fr.in2p3.jsaga.impl.permissions.AbstractJobPermissionsImpl;
23  import fr.in2p3.jsaga.sync.job.SyncJob;
24  import org.apache.log4j.Logger;
25  import org.ogf.saga.SagaObject;
26  import org.ogf.saga.error.*;
27  import org.ogf.saga.file.Directory;
28  import org.ogf.saga.job.JobDescription;
29  import org.ogf.saga.session.Session;
30  import org.ogf.saga.task.State;
31  import org.ogf.saga.namespace.Flags;
32  
33  import java.io.*;
34  import java.util.Date;
35  import org.ogf.saga.task.TaskMode;
36  
37  /*
38   * ***************************************************
39   * *** Centre de Calcul de l'IN2P3 - Lyon (France) *** *** http://cc.in2p3.fr/
40   * *** *************************************************** File:
41   * AbstractSyncJobImpl Author: Sylvain Reynaud (sreynaud@in2p3.fr) Date: 26 oct.
42   * 2007 *************************************************** Description:
43   */
44  /**
45   *
46   */
47  public abstract class AbstractSyncJobImpl extends AbstractJobPermissionsImpl implements SyncJob, JobMonitorCallback {
48  
49      /**
50       * Job state detail engine model
51       */
52      private static final String MODEL = "JSAGA";
53      /**
54       * Job attribute (deviation from SAGA specification)
55       */
56      public static final String NATIVEJOBDESCRIPTION = "NativeJobDescription";
57      /**
58       * Job vector attribute (deviation from SAGA specification)
59       */
60      public static final String OUTPUTURL = "OutputURL";
61      /**
62       * logger
63       */
64      private static Logger s_logger = Logger.getLogger(AbstractSyncJobImpl.class);
65      private JobControlAdaptor m_controlAdaptor;
66      private GenericStreamableJobAdaptor m_genericStreamableJobAdaptor;
67      private JobMonitorService m_monitorService;
68      private JobAttributes m_attributes;
69      private JobMetrics m_metrics;
70      private JobDescription m_jobDescription;
71      private DataStagingManager m_stagingMgr;
72      private String m_uniqId;
73      private String m_nativeJobId;
74      private JobIOHandler m_IOHandler;
75      private Stdin m_stdin;
76      private Stdout m_stdout;
77      private Stdout m_stderr;
78      private boolean m_willStartListening;
79      private String m_currentModelState;
80  
81      /**
82       * constructor for submission
83       */
84      protected AbstractSyncJobImpl(Session session, String nativeJobDesc, JobDescription jobDesc, DataStagingManager stagingMgr, String uniqId, AbstractSyncJobServiceImpl service) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, TimeoutException, NoSuccessException {
85          this(session, service, true);
86          m_attributes.m_NativeJobDescription.setObject(nativeJobDesc);
87          m_jobDescription = jobDesc;
88          m_stagingMgr = stagingMgr;
89          m_uniqId = uniqId;
90          m_nativeJobId = null;
91          m_currentModelState = null;
92          m_genericStreamableJobAdaptor = null;
93      }
94  
95      /**
96       * constructor for control and monitoring only
97       */
98      protected AbstractSyncJobImpl(Session session, String nativeJobId, DataStagingManager stagingMgr, AbstractSyncJobServiceImpl service) throws NotImplementedException, BadParameterException, TimeoutException, NoSuccessException {
99          this(session, service, false);
100         m_attributes.m_NativeJobDescription.setObject(null);
101         m_jobDescription = null;
102         m_stagingMgr = stagingMgr;
103         m_uniqId = null;
104         m_nativeJobId = nativeJobId;
105         m_currentModelState = null;
106         m_genericStreamableJobAdaptor = null;
107     }
108 
109     /**
110      * common to all contructors
111      */
112     private AbstractSyncJobImpl(Session session, AbstractSyncJobServiceImpl service, boolean create) throws NotImplementedException, BadParameterException, TimeoutException, NoSuccessException {
113         super(session, create);
114         m_attributes = new JobAttributes(this);
115         m_attributes.m_ServiceUrl.setObject(service.m_resourceManager.getString());
116         m_metrics = new JobMetrics(this);
117         m_controlAdaptor = service.m_controlAdaptor;
118         m_monitorService = service.m_monitorService;
119         m_IOHandler = null;
120         m_stdin = null;
121         m_stdout = null;
122         m_stderr = null;
123         m_willStartListening = false;
124         m_currentModelState = null;
125         m_genericStreamableJobAdaptor = null;
126     }
127 
128     /**
129      * clone
130      */
131     public SagaObject clone() throws CloneNotSupportedException {
132         AbstractSyncJobImpl clone = (AbstractSyncJobImpl) super.clone();
133         clone.m_attributes = m_attributes.clone();
134         clone.m_metrics = m_metrics.clone();
135         clone.m_controlAdaptor = m_controlAdaptor;
136         clone.m_monitorService = m_monitorService;
137         clone.m_jobDescription = m_jobDescription;
138         clone.m_stagingMgr = m_stagingMgr;
139         clone.m_uniqId = m_uniqId;
140         clone.m_nativeJobId = m_nativeJobId;
141         clone.m_IOHandler = m_IOHandler;
142         clone.m_stdin = m_stdin;
143         clone.m_stdout = m_stdout;
144         clone.m_stderr = m_stderr;
145         clone.m_genericStreamableJobAdaptor = m_genericStreamableJobAdaptor;
146         return clone;
147     }
148     ////////////////////////////////////// implementation of AbstractTaskImpl //////////////////////////////////////
149     private static boolean s_checkMatch = EngineProperties.getBoolean(EngineProperties.JOB_CONTROL_CHECK_MATCH);
150 
151     protected void doSubmit() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
152         m_monitorService.checkState();
153         try {
154             // get native job description
155             String nativeJobDesc = m_attributes.m_NativeJobDescription.getObject();
156 
157             // pre-staging (before job submit)
158             m_metrics.m_StateDetail.setValue(MODEL + ":" + SubState.RUNNING_PRE_STAGING.toString());
159             if (m_stagingMgr instanceof DataStagingManagerThroughStream) {
160                 ((DataStagingManagerThroughStream) m_stagingMgr).preStaging(this);
161             } else if (m_stagingMgr instanceof DataStagingManagerThroughSandboxOnePhase) {
162                 ((DataStagingManagerThroughSandboxOnePhase) m_stagingMgr).preStaging(this, nativeJobDesc, m_uniqId);
163             }
164 
165             // submit
166             if (this.isInteractive()) {
167                 if (m_controlAdaptor instanceof StreamableJobInteractiveGet) {
168                     // submit
169                     JobIOGetterInteractive ioHandler = ((StreamableJobInteractiveGet) m_controlAdaptor).submitInteractive(nativeJobDesc, s_checkMatch);
170                     if (ioHandler == null) {
171                         throw new NotImplementedException("ADAPTOR ERROR: Method submitInteractive() must not return null: " + m_controlAdaptor.getClass().getName());
172                     }
173 
174                     // set stdin
175                     if (m_stdin == null) {
176                         m_stdin = new JobStdinOutputStream(this);
177                     }
178                     m_stdin.openJobIOHandler(ioHandler);
179 
180                     // set stdout and stderr
181                     if (m_stdout == null) {
182                         m_stdout = new GetterInputStream(ioHandler.getStdout());
183                     }
184                     if (m_stderr == null) {
185                         m_stderr = new GetterInputStream(ioHandler.getStderr());
186                     }
187 
188                     m_IOHandler = ioHandler;
189                     m_nativeJobId = m_IOHandler.getJobId();
190                 } else if (m_controlAdaptor instanceof StreamableJobInteractiveSet) {
191                     // set stdin
192                     InputStream stdin = null;
193                     if (m_stdin != null) {
194                         stdin = ((PostconnectedStdinOutputStream) m_stdin).getInputStreamContainer();
195                     }
196 
197                     // set stdout and stderr
198                     if (m_stdout == null) {
199                         m_stdout = new PreconnectedStdoutInputStream(this);
200                     }
201                     OutputStream stdout = ((PreconnectedStdoutInputStream) m_stdout).getOutputStreamContainer();
202                     if (m_stderr == null) {
203                         m_stderr = new PreconnectedStderrInputStream(this);
204                     }
205                     OutputStream stderr = ((PreconnectedStderrInputStream) m_stderr).getOutputStreamContainer();
206 
207                     // submit
208                     m_nativeJobId = ((StreamableJobInteractiveSet) m_controlAdaptor).submitInteractive(
209                             nativeJobDesc, s_checkMatch,
210                             stdin, stdout, stderr);
211                 } else if (m_controlAdaptor instanceof StreamableJobBatch) {
212                     // set stdin
213                     InputStream stdin;
214                     if (m_stdin != null && m_stdin.getBuffer().length > 0) {
215                         stdin = new ByteArrayInputStream(m_stdin.getBuffer());
216                     } else {
217                         stdin = null;
218                     }
219 
220                     // submit
221                     m_IOHandler = ((StreamableJobBatch) m_controlAdaptor).submit(nativeJobDesc, s_checkMatch, m_uniqId, stdin);
222                     if (m_IOHandler == null) {
223                         throw new NotImplementedException("ADAPTOR ERROR: Method submit() must not return null: " + m_controlAdaptor.getClass().getName());
224                     }
225                     m_nativeJobId = m_IOHandler.getJobId();
226                 } else {
227                     throw new NotImplementedException("Interactive jobs are not supported by this adaptor: " + m_controlAdaptor.getClass().getName());
228                 }
229             } else {
230                 if (m_stagingMgr instanceof StreamingManagerThroughSandboxTwoPhase) {
231                     m_genericStreamableJobAdaptor = new GenericStreamableJobAdaptor((StagingJobAdaptor) m_controlAdaptor);
232                     // set stdin
233                     InputStream stdin;
234                     if (m_stdin != null && m_stdin.getBuffer().length > 0) {
235                         stdin = new ByteArrayInputStream(m_stdin.getBuffer());
236                     } else {
237                         stdin = null;
238                     }
239 
240                     // submit
241                     m_IOHandler = m_genericStreamableJobAdaptor.submit(nativeJobDesc, s_checkMatch, m_uniqId, stdin);
242                     if (m_IOHandler == null) {
243                         throw new NotImplementedException("ADAPTOR ERROR: Method submit() must not return null: " + m_genericStreamableJobAdaptor.getClass().getName());
244                     }
245                     m_nativeJobId = m_IOHandler.getJobId();
246                 } else {
247                     m_nativeJobId = m_controlAdaptor.submit(nativeJobDesc, s_checkMatch, m_uniqId);
248                 }
249             }
250             String monitorUrl = m_monitorService.getURL().getString();
251             String sagaJobId = "[" + monitorUrl + "]-[" + m_nativeJobId + "]";
252             m_attributes.m_JobId.setObject(sagaJobId);
253 
254             // start listening if a callback was registered
255             if (m_willStartListening) {
256                 m_willStartListening = false;
257                 this.startListening();
258             }
259 
260             // pre-staging (after job register)
261             if (m_stagingMgr instanceof DataStagingManagerThroughSandboxTwoPhase) {
262                 ((DataStagingManagerThroughSandboxTwoPhase) m_stagingMgr).preStaging(this, m_nativeJobId);
263             }
264 
265             // start job
266             if (m_controlAdaptor instanceof StagingJobAdaptorTwoPhase) {
267                 ((StagingJobAdaptorTwoPhase) m_controlAdaptor).start(m_nativeJobId);
268             }
269         } catch (AuthorizationFailedException e) {
270             throw new NoSuccessException(e);
271         } catch (AuthenticationFailedException e) {
272             throw new NoSuccessException(e);
273         } catch (PermissionDeniedException e) {
274             throw new NoSuccessException(e);
275         } catch (DoesNotExistException e) {
276             throw new NoSuccessException(e);
277         } catch (BadParameterException e) {
278             throw new NoSuccessException(e);
279         }
280     }
281 
282     protected void doCancel() {
283         try {
284             m_monitorService.checkState();
285         } catch (SagaException e) {
286             throw new RuntimeException(e);
287         }
288         if (m_nativeJobId == null) {
289             throw new RuntimeException("INTERNAL ERROR: JobID not initialized");
290         }
291         try {
292             m_controlAdaptor.cancel(m_nativeJobId);
293             this.setState(State.CANCELED, "USER:Canceled", SubState.CANCEL_REQUESTED, new IncorrectStateException("Canceled by user"));
294         } catch (SagaException e) {
295             // do nothing (failed to cancel task)
296             s_logger.warn("Could not cancel job " + m_nativeJobId + ": " + e.getMessage());
297         }
298     }
299 
300     protected State queryState() throws NotImplementedException, TimeoutException, NoSuccessException {
301         m_monitorService.checkState();
302         JobStatus status = m_monitorService.getState(m_nativeJobId);
303         // set job state
304         this.setJobState(status.getSagaState(), status.getStateDetail(), status.getSubState(), status.getCause());
305 
306         // close job output and error streams
307         this.closeStreamsIfDoneAndInteractive();
308 
309         // return task state (may trigger finish task)
310         return status.getSagaState();
311     }
312 
313     public boolean startListening() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
314         m_monitorService.checkState();
315         if (m_nativeJobId == null) {
316             m_willStartListening = true;
317         } else {
318             m_monitorService.startListening(m_nativeJobId, this);
319         }
320         return true;    // a job task is always listening (either with notification, or with polling)
321     }
322 
323     public void stopListening() throws NotImplementedException, TimeoutException, NoSuccessException {
324         m_monitorService.checkState();
325         if (m_nativeJobId == null) {
326             return;
327         }
328         m_monitorService.stopListening(m_nativeJobId);
329 
330         if (!(m_stagingMgr instanceof StreamingManagerThroughSandbox)) {
331             // close job output and error streams
332             this.closeStreamsIfDoneAndInteractive();
333         }
334 
335         if (this.isFinalState()) {
336             // post-staging
337             try {
338                 this.postStaging();
339                 if (m_stagingMgr instanceof StreamingManagerThroughSandbox) {
340                     // close job output and error streams after data staging
341                     this.closeStreamsIfDoneAndInteractive();
342                 }
343             } catch (PermissionDeniedException e) {
344                 throw new NoSuccessException(e);
345             } catch (IncorrectStateException e) {
346                 throw new NoSuccessException(e);
347             }
348             // cleanup
349             try {
350                 this.cleanUp();
351             } catch (SagaException e) {
352                 s_logger.warn("Failed to cleanup job: " + m_nativeJobId, e);
353             }
354         }
355     }
356 
357     private void closeStreamsIfDoneAndInteractive() {
358         if (this.isFinalState() && m_IOHandler != null) {  //if job is done and interactive
359             if (m_controlAdaptor instanceof StreamableJobInteractiveGet || m_controlAdaptor instanceof StreamableJobBatch || m_stagingMgr instanceof StreamingManagerThroughSandbox) {
360                 try {
361                     if (m_stdout == null) {
362                         m_stdout = new JobStdoutInputStream(this, m_IOHandler);
363                     }
364                     m_stdout.closeJobIOHandler();
365                 } catch (Exception e) {
366                     s_logger.warn("Failed to get job output stream: " + m_nativeJobId, e);
367                 }
368                 try {
369                     if (m_stderr == null) {
370                         m_stderr = new JobStderrInputStream(this, m_IOHandler);
371                     }
372                     m_stderr.closeJobIOHandler();
373                 } catch (Exception e) {
374                     s_logger.warn("Failed to get job error stream: " + m_nativeJobId, e);
375                 }
376             }
377         }
378     }
379 
380     public void postStagingAndCleanup() throws NotImplementedException, PermissionDeniedException, IncorrectStateException, TimeoutException, NoSuccessException {
381         m_monitorService.checkState();
382         State state = this.getState();  // force state refresh
383         if (this.isFinalState()) {
384             // post-staging
385             this.postStaging();
386             // cleanup
387             this.cleanUp();
388         } else {
389             throw new IncorrectStateException("Can not cleanup unfinished job: " + state, this);
390         }
391     }
392 
393     private void postStaging() throws NotImplementedException, PermissionDeniedException, IncorrectStateException, TimeoutException, NoSuccessException {
394         if (this.isFinalState()) {
395             // post-staging
396             try {
397                 m_stagingMgr.postStaging(this, m_nativeJobId);
398             } catch (AuthenticationFailedException e) {
399                 throw new NoSuccessException(e);
400             } catch (AuthorizationFailedException e) {
401                 throw new NoSuccessException(e);
402             } catch (BadParameterException e) {
403                 throw new NoSuccessException(e);
404             } catch (DoesNotExistException e) {
405                 throw new NoSuccessException(e);
406             }
407         }
408     }
409 
410     private void cleanUp() throws NotImplementedException, PermissionDeniedException, IncorrectStateException, TimeoutException, NoSuccessException {
411         // cleanup staged files
412         Directory dir = null;
413 
414         // remove staging files
415         try {
416             dir = m_stagingMgr.cleanup(this, m_nativeJobId);
417         } catch (AuthenticationFailedException e) {
418             throw new NoSuccessException(e);
419         } catch (AuthorizationFailedException e) {
420             throw new NoSuccessException(e);
421         } catch (BadParameterException e) {
422             throw new NoSuccessException(e);
423         } catch (DoesNotExistException e) {
424             throw new NoSuccessException(e);
425         } catch (NotImplementedException e){
426         	// In case of read-only data adaptor for example.
427         	// continue to be able to clean job
428         	s_logger.info("Could not clean staged files:" + e.getMessage());
429         }
430         try {
431             // adaptor's cleanup
432             if (m_controlAdaptor instanceof CleanableJobAdaptor) {
433                 try {
434                     JobInfoAdaptor jia = getJobInfoAdaptor();
435                     try {
436                         setStaticValue(m_attributes.m_Created, jia.getCreated(m_nativeJobId));
437                     } catch (Exception e) {
438                         s_logger.warn(e.getMessage());
439                     }
440                     try {
441                         setStaticValue(m_attributes.m_Started, jia.getStarted(m_nativeJobId));
442                     } catch (Exception e) {
443                         s_logger.warn(e.getMessage());
444                     }
445                     try {
446                         setStaticValue(m_attributes.m_Finished, jia.getFinished(m_nativeJobId));
447                     } catch (Exception e) {
448                         s_logger.warn(e.getMessage());
449                     }
450                     try {
451                         setStaticValue(m_attributes.m_ExitCode, jia.getExitCode(m_nativeJobId));
452                     } catch (Exception e) {
453                         s_logger.warn(e.getMessage());
454                     }
455                     try {
456                         setStaticValues(m_attributes.m_ExecutionHosts, jia.getExecutionHosts(m_nativeJobId));
457                     } catch (Exception e) {
458                         s_logger.warn(e.getMessage());
459                     }
460                 } catch (NotImplementedException nie) {
461                     // Do not cache
462                 }
463 
464                 ((CleanableJobAdaptor) m_controlAdaptor).clean(m_nativeJobId);
465             }
466 
467             // remove staging directory
468             if (dir != null) {
469                 try {
470                     dir.remove();
471                 } catch (AuthenticationFailedException e) {
472                     throw new NoSuccessException(e);
473                 } catch (AuthorizationFailedException e) {
474                     throw new NoSuccessException(e);
475                 } catch (BadParameterException e) {
476                     throw new NoSuccessException(e);
477                 }
478             }
479 
480         } finally {
481             if (dir != null) {
482                 dir.close();
483             }
484         }
485         // throw NotImplementedException if adaptor not instance of CleanableJobAdaptor
486         if (!(m_controlAdaptor instanceof CleanableJobAdaptor)) {
487             throw new NotImplementedException("Cleanup is not supported by this adaptor: " + m_controlAdaptor.getClass().getName());
488         }
489     }
490 
491     ////////////////////////////////////// implementation of Job //////////////////////////////////////
492     public JobDescription getJobDescriptionSync() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, DoesNotExistException, TimeoutException, NoSuccessException {
493         return m_jobDescription;
494     }
495 
496     public OutputStream getStdinSync() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, DoesNotExistException, TimeoutException, IncorrectStateException, NoSuccessException {
497         m_monitorService.checkState();
498         if (this.isInteractive() || m_stagingMgr instanceof StreamingManagerThroughSandbox) {
499             if (m_stdin == null) {
500                 if (m_controlAdaptor instanceof StreamableJobInteractiveSet) {
501                     m_stdin = new PostconnectedStdinOutputStream(this);
502                 } else {
503                     m_stdin = new JobStdinOutputStream(this);
504                 }
505             }
506             return m_stdin;
507         } else {
508             throw new IncorrectStateException("Method getStdin() is allowed on interactive jobs only", this);
509         }
510     }
511 
512     public InputStream getStdoutSync() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, DoesNotExistException, TimeoutException, IncorrectStateException, NoSuccessException {
513         m_monitorService.checkState();
514         if (this.isInteractive() || m_stagingMgr instanceof StreamingManagerThroughSandbox) {
515             if (m_stdout == null) {
516                 m_stdout = new JobStdoutInputStream(this, m_IOHandler);
517             }
518             return m_stdout;
519         } else {
520             throw new IncorrectStateException("Method getStdout() is allowed on interactive jobs only", this);
521         }
522     }
523 
524     public InputStream getStderrSync() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, DoesNotExistException, TimeoutException, IncorrectStateException, NoSuccessException {
525         m_monitorService.checkState();
526         if (this.isInteractive() || m_stagingMgr instanceof StreamingManagerThroughSandbox) {
527             if (m_stderr == null) {
528                 m_stderr = new JobStderrInputStream(this, m_IOHandler);
529             }
530             return m_stderr;
531         } else {
532             throw new IncorrectStateException("Method getStderr() is allowed on interactive jobs only", this);
533         }
534     }
535 
536     public void suspendSync() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, IncorrectStateException, TimeoutException, NoSuccessException {
537         m_monitorService.checkState();
538         if (m_nativeJobId == null) {
539             throw new IncorrectStateException("Can not suspend job in 'New' state", this);
540         }
541         if (m_controlAdaptor instanceof HoldableJobAdaptor && m_controlAdaptor instanceof SuspendableJobAdaptor) {
542             if (!((HoldableJobAdaptor) m_controlAdaptor).hold(m_nativeJobId)) {
543                 if (!((SuspendableJobAdaptor) m_controlAdaptor).suspend(m_nativeJobId)) {
544                     if (getJobState().equals(State.NEW) || getJobState().equals(State.RUNNING)) {
545                         throw new NoSuccessException("Failed to hold/suspend job, the plugin returned False: " + m_nativeJobId);
546                     } else {
547                         throw new IncorrectStateException("Failed to hold/suspend job because it is neither queued nor active: " + m_nativeJobId);
548                     }
549                 }
550             }
551         } else if (m_controlAdaptor instanceof HoldableJobAdaptor) {
552             if (!((HoldableJobAdaptor) m_controlAdaptor).hold(m_nativeJobId)) {
553                 if (!getJobState().equals(State.NEW)) {
554                     throw new IncorrectStateException("Failed to hold job because it is not queued: " + m_nativeJobId);
555                 } else {
556                     throw new NoSuccessException("Failed to hold job; the plugin returned False");
557                 }
558             }
559         } else if (m_controlAdaptor instanceof SuspendableJobAdaptor) {
560             if (!((SuspendableJobAdaptor) m_controlAdaptor).suspend(m_nativeJobId)) {
561                 if (!getJobState().equals(State.RUNNING)) {
562                     throw new IncorrectStateException("Failed to suspend job because if is not active: " + m_nativeJobId);
563                 } else {
564                     throw new NoSuccessException("Failed to suspend job; the plugin returned False");
565                 }
566             }
567         } else {
568             throw new NotImplementedException("Suspend is not supported by this adaptor: " + m_controlAdaptor.getClass().getName());
569         }
570     }
571 
572     public void resumeSync() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, IncorrectStateException, TimeoutException, NoSuccessException {
573         m_monitorService.checkState();
574         if (m_nativeJobId == null) {
575             throw new IncorrectStateException("Can not resume job in 'New' state", this);
576         }
577         if (m_controlAdaptor instanceof HoldableJobAdaptor && m_controlAdaptor instanceof SuspendableJobAdaptor) {
578             if (!((HoldableJobAdaptor) m_controlAdaptor).release(m_nativeJobId)) {
579                 if (!((SuspendableJobAdaptor) m_controlAdaptor).resume(m_nativeJobId)) {
580                     if (!getState().equals(State.SUSPENDED)) {
581                         throw new IncorrectStateException("Failed to release/resume job because it is neither held nor suspended: " + m_nativeJobId);
582                     } else {
583                         throw new NoSuccessException("Failed to release/resume job; the plugin returned False");
584                     }
585                 }
586             }
587         } else if (m_controlAdaptor instanceof HoldableJobAdaptor) {
588             if (!((HoldableJobAdaptor) m_controlAdaptor).release(m_nativeJobId)) {
589                 if (!getState().equals(State.SUSPENDED)) {
590                     throw new IncorrectStateException("Failed to release job because it is not held: " + m_nativeJobId);
591                 } else {
592                     throw new NoSuccessException("Failed to release job; the plugin returned False");
593                 }
594             }
595         } else if (m_controlAdaptor instanceof SuspendableJobAdaptor) {
596             if (!((SuspendableJobAdaptor) m_controlAdaptor).resume(m_nativeJobId)) {
597                 if (!getState().equals(State.SUSPENDED)) {
598                     throw new IncorrectStateException("Failed to resume job because if is not suspended: " + m_nativeJobId);
599                 } else {
600                     throw new NoSuccessException("Failed to resume job; the plugin returned False");
601                 }
602             }
603         } else {
604             throw new NotImplementedException("Resume is not supported by this adaptor: " + m_controlAdaptor.getClass().getName());
605         }
606     }
607 
608     public void checkpointSync() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, IncorrectStateException, TimeoutException, NoSuccessException {
609         m_monitorService.checkState();
610         if (m_nativeJobId == null) {
611             throw new IncorrectStateException("Can not checkpoint job in 'New' state", this);
612         }
613         if (m_controlAdaptor instanceof CheckpointableJobAdaptor) {
614             if (!((CheckpointableJobAdaptor) m_controlAdaptor).checkpoint(m_nativeJobId)) {
615                 throw new NoSuccessException("Failed to checkpoint job: " + m_nativeJobId);
616             }
617         } else {
618             throw new NotImplementedException("Checkpoint is not supported by this adaptor: " + m_controlAdaptor.getClass().getName());
619         }
620     }
621 
622     public void migrateSync(JobDescription jd) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, IncorrectStateException, TimeoutException, NoSuccessException {
623         m_monitorService.checkState();
624         if (m_nativeJobId == null) {
625             throw new IncorrectStateException("Can not migrate job in 'New' state", this);
626         }
627         throw new NotImplementedException("Not implemented yet..."); //todo: implement method migrate()
628 //        if (super.cancel(true)) {   //synchronous cancel (not the SAGA cancel)
629     }
630 
631     public void signalSync(int signum) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, IncorrectStateException, TimeoutException, NoSuccessException {
632         m_monitorService.checkState();
633         if (m_nativeJobId == null) {
634             throw new IncorrectStateException("Can not send signal to job in 'New' state", this);
635         }
636         if (m_controlAdaptor instanceof SignalableJobAdaptor) {
637             if (!((SignalableJobAdaptor) m_controlAdaptor).signal(m_nativeJobId, signum)) {
638                 throw new NoSuccessException("Failed to signal job: " + m_nativeJobId);
639             }
640         } else {
641             throw new NotImplementedException("Signal is not supported by this adaptor: " + m_controlAdaptor.getClass().getName());
642         }
643     }
644 
645     /////////////////////////////////////////// implementation of AbstractSyncJobImpl ////////////////////////////////////////////
646     public State getJobState() {
647         return m_metrics.m_State.getValue();
648     }
649 
650     ////////////////////////////////////// implementation of JobMonitorCallback //////////////////////////////////////
651     /**
652      * Set job and task state (may finish task)
653      */
654     public void setState(State state, String stateDetail, SubState subState, SagaException cause) {
655         // log
656         if (m_currentModelState == null || !m_currentModelState.equals(stateDetail)) {
657             m_currentModelState = stateDetail;
658             m_monitorService.getStateLogger().debug("State changed to " + stateDetail + " for job " + m_attributes.m_JobId.getObject());
659         }
660         // set job state
661         this.setJobState(state, stateDetail, subState, cause);
662         // set task state (may finish task)
663         super.setState(state);
664     }
665 
666     /**
667      * Set job state only
668      */
669     private synchronized void setJobState(State state, String stateDetail, SubState subState, SagaException cause) {
670         // if not already in a final state
671         if (!this.isFinalState()) {
672             // update cause
673             if (cause != null) {
674                 super.setException(cause);
675             }
676             // update metrics
677             m_metrics.m_State.setValue(state);
678             m_metrics.m_StateDetail.setValue(stateDetail);
679             m_metrics.m_StateDetail.setValue(MODEL + ":" + subState.toString());
680         }
681     }
682 
683     /////////////////////////////////////// friend methods //////////////////////////////////////
684     String getNativeJobId() {
685         return m_nativeJobId;
686     }
687 
688     JobInfoAdaptor getJobInfoAdaptor() throws NotImplementedException {
689         JobMonitorAdaptor monitorAdaptor = m_monitorService.getAdaptor();
690         if (monitorAdaptor instanceof JobInfoAdaptor) {
691             return (JobInfoAdaptor) monitorAdaptor;
692         } else {
693             throw new NotImplementedException("Job attribute not supported by this adaptor: " + monitorAdaptor.getClass());
694         }
695     }
696 
697     StagingTransfer[] getOutputStagingTransfer() throws PermissionDeniedException, TimeoutException, NoSuccessException {
698         if (m_stagingMgr instanceof DataStagingManagerThroughSandbox) {
699             return ((DataStagingManagerThroughSandbox) m_stagingMgr).getOutputStagingTransfer(m_nativeJobId);
700         }
701         return null;
702     }
703 
704     ////////////////////////////////////// private methods //////////////////////////////////////
705     private boolean isFinalState() {
706         // do not use getState_fromCache() because it may lead to infinite recursion
707         State state = m_metrics.m_State.getValue();
708         if (state == null) {
709             state = State.RUNNING;
710         }
711 
712         // if state is terminal
713         switch (state) {
714             case DONE:
715             case CANCELED:
716             case FAILED:
717                 return true;
718             default:
719                 return false;
720         }
721     }
722 
723     private boolean isInteractive() throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, IncorrectStateException, TimeoutException, NoSuccessException {
724         try {
725             return "true".equalsIgnoreCase(m_jobDescription.getAttribute(JobDescription.INTERACTIVE));
726         } catch (DoesNotExistException e) {
727             return false;
728         }
729     }
730 
731     private void setStaticValue(ScalarAttributeImpl<Date> attr, final Date value) {
732         attr = _addAttribute(new ScalarAttributeImpl<Date>(attr.getKey(), null, attr.getMode(), attr.getType(), new Date()) {
733 
734             public String getValue() {
735                 return value.toString();
736             }
737         });
738     }
739 
740     private void setStaticValue(ScalarAttributeImpl<Integer> attr, final Integer value) {
741         attr = _addAttribute(new ScalarAttributeImpl<Integer>(attr.getKey(), null, attr.getMode(), attr.getType(), null) {
742 
743             public String getValue() {
744                 return value.toString();
745             }
746         });
747     }
748 
749     private void setStaticValues(VectorAttributeImpl<String> attr, final String[] values) {
750         attr = _addVectorAttribute(new VectorAttributeImpl<String>(attr.getKey(), null, attr.getMode(), attr.getType(), null) {
751 
752             public String[] getValues() {
753                 return values;
754             }
755         });
756     }
757 }