View Javadoc

1   package fr.in2p3.jsaga.impl.task;
2   
3   import fr.in2p3.jsaga.EngineProperties;
4   import fr.in2p3.jsaga.impl.monitoring.*;
5   import org.apache.log4j.Logger;
6   import org.ogf.saga.SagaObject;
7   import org.ogf.saga.error.*;
8   import org.ogf.saga.session.Session;
9   import org.ogf.saga.task.State;
10  import org.ogf.saga.task.Task;
11  
12  import java.util.concurrent.ExecutionException;
13  import java.util.concurrent.TimeUnit;
14  
15  /* ***************************************************
16  * *** Centre de Calcul de l'IN2P3 - Lyon (France) ***
17  * ***             http://cc.in2p3.fr/             ***
18  * ***************************************************
19  * File:   AbstractTaskImpl
20  * Author: Sylvain Reynaud (sreynaud@in2p3.fr)
21  * Date:   18 sept. 2007
22  * ***************************************************
23  * Description:                                      */
24  /**
25   *
26   */
27  public abstract class AbstractTaskImpl<T,E> extends AbstractMonitorableImpl implements Task<T,E>, TaskCallback<E> {
28      // metrics
29      private TaskStateMetricImpl<State> m_metric_TaskState;
30      // internal
31      protected T m_object;
32      private E m_result;
33      protected SagaException m_exception;
34      private boolean m_isWaitingFor;
35      /** logger */
36      private static Logger s_logger = Logger.getLogger(AbstractTaskImpl.class);
37  
38      /** constructor */
39      public AbstractTaskImpl(Session session, T object, boolean create) throws NotImplementedException {
40          super(session);
41  
42          // set metrics
43          m_metric_TaskState = new TaskStateMetricFactoryImpl<State>(this).createAndRegister(
44                  Task.TASK_STATE,
45                  "fires on task state change, and has the literal value of the task enum.",
46                  MetricMode.ReadOnly,
47                  "1",
48                  MetricType.Enum,
49                  create ? State.NEW : null);
50  
51          // internal
52          m_object = object;
53          m_result = null;
54          m_exception = null;
55          m_isWaitingFor = false;
56      }
57  
58      /** clone */
59      public SagaObject clone() throws CloneNotSupportedException {
60          AbstractTaskImpl clone = (AbstractTaskImpl) super.clone();
61          clone.m_metric_TaskState = m_metric_TaskState;
62          clone.m_object = m_object;
63          clone.m_result = m_result;
64          clone.m_exception = m_exception;
65          clone.m_isWaitingFor = m_isWaitingFor;
66          return clone;
67      }
68  
69      //////////////////////////////////////////// abstract methods ////////////////////////////////////////////
70  
71      /**
72       * start the task (e.g. start the thread, submit the job...) and returns immediatly
73       */
74      protected abstract void doSubmit() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException;
75  
76      /**
77       * cancel the task (and update its state)
78       */
79      protected abstract void doCancel();
80  
81      /**
82       * query the task state
83       * return the new state if it has been queried, else null
84       */
85      protected abstract State queryState() throws NotImplementedException, TimeoutException, NoSuccessException;
86  
87      /**
88       * start listening to changes of task state
89       * @return true if the task is listening, else false
90       */
91      public abstract boolean startListening() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException;
92  
93      /**
94       * stop listening to changes of task state
95       */
96      public abstract void stopListening() throws NotImplementedException, TimeoutException, NoSuccessException;
97  
98      //////////////////////////////////////////// interface Task ////////////////////////////////////////////
99  
100     public void run() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
101         if (!this.isCancelled()) {
102             this.doSubmit();
103         }
104     }
105 
106     public void waitFor() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
107         this.waitFor(WAIT_FOREVER);
108     }
109 
110     public boolean waitFor(float timeoutInSeconds) throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
111         // returns immediatly if already finished
112         if (this.isDone_fromCache()) {
113             return true;
114         }
115 
116         // start listening
117         boolean isListening = this.startListening(); //WARN: do not breakpoint here (because this may change behavior)
118         this.setWaitingFor(isListening);
119         // fixme: the code below avoids querying the status of finished tasks, but it makes test_simultaneousShortJob hanging with personal-gatekeeper...  :-(
120         /*int cookie;
121         try {
122             cookie = m_metric_TaskState.addCallback(new Callback(){
123                 public boolean cb(Monitorable mt, Metric metric, Context ctx) throws NotImplementedException, AuthorizationFailedException {
124                     boolean stayRegistered;
125                     MetricImpl<State> m = (MetricImpl<State>) metric;
126                     switch(m.getValue()) {
127                         case DONE:
128                         case CANCELED:
129                         case FAILED:
130                             stayRegistered = false;
131                             break;
132                         default:
133                             stayRegistered = true;
134                             break;
135                     }
136                     return stayRegistered;
137                 }
138             });
139         }
140         catch (AuthenticationFailedException e) {throw new NoSuccessException(e);}
141         catch (AuthorizationFailedException e) {throw new NoSuccessException(e);}
142         catch (PermissionDeniedException e) {throw new NoSuccessException(e);}*/
143 
144         // loop until task is finished (done, canceled or failed)
145         try {
146             boolean forever;
147             long endTime;
148             if (timeoutInSeconds == WAIT_FOREVER) {
149                 forever = true;
150                 endTime = -1;
151             } else if (timeoutInSeconds == NO_WAIT) {
152                 forever = false;
153                 endTime = -1;
154             } else {
155                 forever = false;
156                 endTime = System.currentTimeMillis() + (long) (timeoutInSeconds*1000f);
157             }
158             // read notified status, else query status
159             while(!this.isDone() && (forever || System.currentTimeMillis()<endTime)) {
160                 Thread.currentThread().sleep(100);
161             }
162         } catch (InterruptedException e) {/*ignore*/}
163 
164         // stop listening
165         this.stopListening();
166         this.setWaitingFor(false);
167         /*{
168             // callback may have not been removed
169             try {
170                 m_metric_TaskState.removeCallback(cookie);
171             }
172             catch (BadParameterException e2) {throw new NoSuccessException(e);}
173             catch (AuthenticationFailedException e2) {throw new NoSuccessException(e);}
174             catch (AuthorizationFailedException e2) {throw new NoSuccessException(e);}
175             catch (PermissionDeniedException e2) {throw new NoSuccessException(e);}
176         }*/
177 
178         // returns
179         return this.isDone_fromCache();
180     }
181 
182     // exit immediatly
183     private static final int NB_CANCEL_TRY = 3;
184     public synchronized void cancel() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
185         switch (this.getState_fromCache(State.RUNNING)) {
186             case NEW:
187                 try {
188                     // if task can not be monitored, it may stay in NEW state forever. Then we need to free resources.
189                     this.stopListening();
190                 } catch (SagaException e){
191                     s_logger.warn("Failed to stop listening", e);
192                 }
193                 throw new IncorrectStateException("Can not cancel task in 'New' state", this); //as specified in SAGA
194             case DONE:
195             case CANCELED:
196             case FAILED:
197                 // just ignore
198 //                break;
199             case RUNNING:
200                 // try to cancel synchronously
201                 this.doCancel();
202                 boolean checkStatus = EngineProperties.getBoolean(EngineProperties.JOB_CANCEL_CHECK_STATUS);
203                 if (checkStatus && !this.isDone_fromCache()) {
204                     // try to cancel asynchronously (every minutes)
205                     s_logger.warn("Failed to cancel synchronously, trying asynchronously...");
206                     new Thread(new Runnable() {
207                         public void run() {
208                             try {
209                                 for (int i=0; !AbstractTaskImpl.this.isDone_fromCache() && i<NB_CANCEL_TRY; i++) {
210                                     Thread.currentThread().sleep(60000);
211                                     AbstractTaskImpl.this.doCancel();
212                                 }
213                             } catch (InterruptedException e) {/*ignore*/}
214                             if (AbstractTaskImpl.this.isCancelled()) {
215                                 s_logger.info("Asynchronous cancel successfull !");
216                             }
217                         }
218                     }).start();
219                 }
220                 break;
221         }        
222     }
223 
224     // wait for task to be cancelled (or done, or failed)
225     public void cancel(float timeoutInSeconds) throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
226         this.cancel();
227         this.waitFor(timeoutInSeconds);
228     }
229 
230     public State getState() throws NotImplementedException, TimeoutException, NoSuccessException {
231         // do not use getState_fromCache() because it may lead to infinite recursion
232         State oldState = m_metric_TaskState.getValue();
233 
234         switch(oldState!=null ? oldState : State.NEW) {
235             // if oldState is final
236             case DONE:
237             case CANCELED:
238             case FAILED:
239                 return oldState;
240 
241             // if oldState is not final
242             default:
243                 // if not notified or not yet initialized, then update state
244                 if ( (!m_isWaitingFor && !m_metric_TaskState.isListening()) || oldState==null ) {
245                     State state = this.queryState();
246                     if (state != null) {
247                         this.setState(state);
248                     }
249                 }
250                 return m_metric_TaskState.getValue();
251         }
252     }
253 
254     public E getResult() throws NotImplementedException, IncorrectURLException, BadParameterException, AlreadyExistsException, DoesNotExistException, IncorrectStateException, PermissionDeniedException, AuthorizationFailedException, AuthenticationFailedException, TimeoutException, SagaIOException, NoSuccessException {
255         this.waitFor();
256         switch (this.getState_fromCache(State.DONE)) {
257             case NEW:
258             case CANCELED:
259                 throw new IncorrectStateException("Can not get result for task in state: "+this.getState_fromCache().name());
260             case FAILED:
261                 this.rethrow();
262                 throw new NoSuccessException("[INTERNAL ERROR] unexpected exception", this);
263             default:
264                 return m_result;
265         }
266     }
267 
268     public T getObject() throws NotImplementedException, TimeoutException, NoSuccessException {
269         return m_object;
270     }
271 
272     public void rethrow() throws NotImplementedException, IncorrectURLException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, IncorrectStateException, AlreadyExistsException, DoesNotExistException, TimeoutException, SagaIOException, NoSuccessException {
273         switch (this.getState_fromCache()) {
274             case FAILED:
275                 if (m_exception != null) {
276                     try {throw m_exception;}
277                     catch(NotImplementedException e) {throw e;}
278                     catch(IncorrectURLException e) {throw e;}
279                     catch(AuthenticationFailedException e) {throw e;}
280                     catch(AuthorizationFailedException e) {throw e;}
281                     catch(PermissionDeniedException e) {throw e;}
282                     catch(BadParameterException e) {throw e;}
283                     catch(IncorrectStateException e) {throw e;}
284                     catch(AlreadyExistsException e) {throw e;}
285                     catch(DoesNotExistException e) {throw e;}
286                     catch(TimeoutException e) {throw e;}
287                     catch(SagaIOException e) {throw e;}
288                     catch(NoSuccessException e) {throw e;}
289                     catch(SagaException e) {throw new NoSuccessException(m_exception);}
290                 } else {
291                     throw new NoSuccessException("task failed with unknown reason", this);
292                 }
293         }
294     }
295 
296     //////////////////////////////////////////// interface TaskCallback ////////////////////////////////////////////
297 
298     public synchronized void setState(State state) {
299         // do not use getState_fromCache() because it may lead to infinite recursion
300         State oldState = m_metric_TaskState.getValue();
301         if (oldState == null)
302             oldState = State.RUNNING;
303 
304         // if oldState is terminal
305         switch(oldState) {
306             case DONE:
307             case CANCELED:
308             case FAILED:
309                 return;
310             default:
311                 m_metric_TaskState.setValue(state);
312         }
313     }
314 
315     public synchronized void setResult(E result) {
316         m_result = result;
317     }
318 
319     public synchronized void setException(SagaException exception) {
320         m_exception = exception;
321     }
322 
323     //////////////////////////////////////////// interface Future<E> ////////////////////////////////////////////
324 
325     /**
326      * Attempts to cancel execution of this task. This attempt will fail if the task has already completed,
327      * already been cancelled, or could not be cancelled for some other reason. If successful, and this task
328      * has not started when cancel is called, this task should never run. If the task has already started,
329      * then the mayInterruptIfRunning parameter determines whether the thread executing this task should be
330      * interrupted in an attempt to stop the task.
331      * @param mayInterruptIfRunning true if the thread executing this task should be interrupted; otherwise,
332      * in-progress tasks are allowed to complete
333      * @return false if the task could not be cancelled, typically because it has already completed normally; true otherwise
334      */
335     public boolean cancel(boolean mayInterruptIfRunning) {
336         switch (this.getState_fromCache(State.RUNNING)) {
337             case NEW:
338                 this.setState(State.CANCELED);   //as specified in java.util.concurrent
339                 return true;
340             case DONE:
341             case CANCELED:
342             case FAILED:
343                 return false;
344             case RUNNING:
345                 if (mayInterruptIfRunning) {
346                     this.doCancel();
347                     State state = this.getState_fromCache();
348                     return State.CANCELED.equals(state);
349                 }
350         }
351         return false;
352     }
353 
354     /**
355      * Returns true if this task was cancelled before it completed normally.
356      * @return true if task was cancelled before it completed
357      */
358     public boolean isCancelled() {
359         State state = this.getState_fromCache();
360         return State.CANCELED.equals(state);
361     }
362 
363     /**
364      * Returns true if this task completed. Completion may be due to normal termination, an exception,
365      * or cancellation -- in all of these cases, this method will return true.
366      * @return true if this task completed.
367      */
368     public boolean isDone() {
369         State state;
370         try {
371             state = this.getState();
372         } catch (Exception e) {
373             s_logger.warn("Failed to get state", e);
374             return false;
375         }
376         switch(state) {
377             case DONE:
378             case CANCELED:
379             case FAILED:
380                 return true;
381             default:
382                 return false;
383         }
384     }
385 
386     /**
387      * Waits if necessary for the computation to complete, and then retrieves its result.
388      * @return the computed result
389      * @throws InterruptedException if the current thread was interrupted while waiting
390      * @throws ExecutionException if the computation threw an exception
391      */
392     public E get() throws InterruptedException, ExecutionException {
393         try {
394             this.waitFor();
395         } catch (SagaException e) {
396             throw new ExecutionException(e);
397         }
398         switch (this.getState_fromCache(State.DONE)) {
399             case DONE:
400                 return m_result;
401             case CANCELED:
402                 throw new InterruptedException("Task has been cancelled");
403             case FAILED:
404                 throw new ExecutionException(m_exception);
405             default:
406                 throw new ExecutionException("INTERNAL ERROR: unexpected exception", m_exception);
407         }
408     }
409 
410     /**
411      * Waits if necessary for at most the given time for the computation to complete, and then retrieves
412      * its result, if available.
413      * @param timeout the maximum time to wait
414      * @param unit the time unit of the timeout argument
415      * @return the computed result
416      * @throws InterruptedException if the current thread was interrupted while waiting
417      * @throws ExecutionException if the computation threw an exception
418      * @throws java.util.concurrent.TimeoutException if the wait timed out
419      */
420     public E get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
421         try {
422             this.waitFor(unit.toSeconds(timeout));
423         } catch (SagaException e) {
424             throw new ExecutionException(e);
425         }
426         switch (this.getState_fromCache(State.DONE)) {
427             case DONE:
428                 return m_result;
429             case CANCELED:
430                 throw new InterruptedException("Task has been cancelled");
431             case FAILED:
432                 throw new ExecutionException(m_exception);
433             default:
434                 throw new java.util.concurrent.TimeoutException("The wait timed out");
435         }
436     }
437 
438     //////////////////////////////////////////// internal methods ////////////////////////////////////////////
439 
440     void setWaitingFor(boolean isWaitingFor) {
441         m_isWaitingFor = isWaitingFor;
442     }
443 
444     boolean isWaitingFor() {
445         return m_isWaitingFor;
446     }
447 
448     boolean isDone_fromCache() {
449         switch(this.getState_fromCache()) {
450             case DONE:
451             case CANCELED:
452             case FAILED:
453                 return true;
454             default:
455                 return false;
456         }
457     }
458 
459     protected State getState_fromCache() {
460         return getState_fromCache(null);
461     }
462     protected State getState_fromCache(State defaultState) {
463         State state = m_metric_TaskState.getValue();
464         if (state != null) {
465             // returns cached state
466             return state;
467         } else if (defaultState != null) {
468             // returns default state
469             return defaultState;
470         } else {
471             // returns queried state
472             try {
473                 return this.getState();
474             } catch (Exception e) {
475                 throw new RuntimeException("Failed to query state", e);
476             }
477         }
478     }
479 }