1 package fr.in2p3.jsaga.impl.task;
2
3 import fr.in2p3.jsaga.EngineProperties;
4 import fr.in2p3.jsaga.impl.monitoring.*;
5 import org.apache.log4j.Logger;
6 import org.ogf.saga.SagaObject;
7 import org.ogf.saga.error.*;
8 import org.ogf.saga.session.Session;
9 import org.ogf.saga.task.State;
10 import org.ogf.saga.task.Task;
11
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.TimeUnit;
14
15
16
17
18
19
20
21
22
23
24
25
26
27 public abstract class AbstractTaskImpl<T,E> extends AbstractMonitorableImpl implements Task<T,E>, TaskCallback<E> {
28
29 private TaskStateMetricImpl<State> m_metric_TaskState;
30
31 protected T m_object;
32 private E m_result;
33 protected SagaException m_exception;
34 private boolean m_isWaitingFor;
35
36 private static Logger s_logger = Logger.getLogger(AbstractTaskImpl.class);
37
38
39 public AbstractTaskImpl(Session session, T object, boolean create) throws NotImplementedException {
40 super(session);
41
42
43 m_metric_TaskState = new TaskStateMetricFactoryImpl<State>(this).createAndRegister(
44 Task.TASK_STATE,
45 "fires on task state change, and has the literal value of the task enum.",
46 MetricMode.ReadOnly,
47 "1",
48 MetricType.Enum,
49 create ? State.NEW : null);
50
51
52 m_object = object;
53 m_result = null;
54 m_exception = null;
55 m_isWaitingFor = false;
56 }
57
58
59 public SagaObject clone() throws CloneNotSupportedException {
60 AbstractTaskImpl clone = (AbstractTaskImpl) super.clone();
61 clone.m_metric_TaskState = m_metric_TaskState;
62 clone.m_object = m_object;
63 clone.m_result = m_result;
64 clone.m_exception = m_exception;
65 clone.m_isWaitingFor = m_isWaitingFor;
66 return clone;
67 }
68
69
70
71
72
73
74 protected abstract void doSubmit() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException;
75
76
77
78
79 protected abstract void doCancel();
80
81
82
83
84
85 protected abstract State queryState() throws NotImplementedException, TimeoutException, NoSuccessException;
86
87
88
89
90
91 public abstract boolean startListening() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException;
92
93
94
95
96 public abstract void stopListening() throws NotImplementedException, TimeoutException, NoSuccessException;
97
98
99
100 public void run() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
101 if (!this.isCancelled()) {
102 this.doSubmit();
103 }
104 }
105
106 public void waitFor() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
107 this.waitFor(WAIT_FOREVER);
108 }
109
110 public boolean waitFor(float timeoutInSeconds) throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
111
112 if (this.isDone_fromCache()) {
113 return true;
114 }
115
116
117 boolean isListening = this.startListening();
118 this.setWaitingFor(isListening);
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145 try {
146 boolean forever;
147 long endTime;
148 if (timeoutInSeconds == WAIT_FOREVER) {
149 forever = true;
150 endTime = -1;
151 } else if (timeoutInSeconds == NO_WAIT) {
152 forever = false;
153 endTime = -1;
154 } else {
155 forever = false;
156 endTime = System.currentTimeMillis() + (long) (timeoutInSeconds*1000f);
157 }
158
159 while(!this.isDone() && (forever || System.currentTimeMillis()<endTime)) {
160 Thread.currentThread().sleep(100);
161 }
162 } catch (InterruptedException e) {
163
164
165 this.stopListening();
166 this.setWaitingFor(false);
167
168
169
170
171
172
173
174
175
176
177
178
179 return this.isDone_fromCache();
180 }
181
182
183 private static final int NB_CANCEL_TRY = 3;
184 public synchronized void cancel() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
185 switch (this.getState_fromCache(State.RUNNING)) {
186 case NEW:
187 try {
188
189 this.stopListening();
190 } catch (SagaException e){
191 s_logger.warn("Failed to stop listening", e);
192 }
193 throw new IncorrectStateException("Can not cancel task in 'New' state", this);
194 case DONE:
195 case CANCELED:
196 case FAILED:
197
198
199 case RUNNING:
200
201 this.doCancel();
202 boolean checkStatus = EngineProperties.getBoolean(EngineProperties.JOB_CANCEL_CHECK_STATUS);
203 if (checkStatus && !this.isDone_fromCache()) {
204
205 s_logger.warn("Failed to cancel synchronously, trying asynchronously...");
206 new Thread(new Runnable() {
207 public void run() {
208 try {
209 for (int i=0; !AbstractTaskImpl.this.isDone_fromCache() && i<NB_CANCEL_TRY; i++) {
210 Thread.currentThread().sleep(60000);
211 AbstractTaskImpl.this.doCancel();
212 }
213 } catch (InterruptedException e) {
214 if (AbstractTaskImpl.this.isCancelled()) {
215 s_logger.info("Asynchronous cancel successfull !");
216 }
217 }
218 }).start();
219 }
220 break;
221 }
222 }
223
224
225 public void cancel(float timeoutInSeconds) throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
226 this.cancel();
227 this.waitFor(timeoutInSeconds);
228 }
229
230 public State getState() throws NotImplementedException, TimeoutException, NoSuccessException {
231
232 State oldState = m_metric_TaskState.getValue();
233
234 switch(oldState!=null ? oldState : State.NEW) {
235
236 case DONE:
237 case CANCELED:
238 case FAILED:
239 return oldState;
240
241
242 default:
243
244 if ( (!m_isWaitingFor && !m_metric_TaskState.isListening()) || oldState==null ) {
245 State state = this.queryState();
246 if (state != null) {
247 this.setState(state);
248 }
249 }
250 return m_metric_TaskState.getValue();
251 }
252 }
253
254 public E getResult() throws NotImplementedException, IncorrectURLException, BadParameterException, AlreadyExistsException, DoesNotExistException, IncorrectStateException, PermissionDeniedException, AuthorizationFailedException, AuthenticationFailedException, TimeoutException, SagaIOException, NoSuccessException {
255 this.waitFor();
256 switch (this.getState_fromCache(State.DONE)) {
257 case NEW:
258 case CANCELED:
259 throw new IncorrectStateException("Can not get result for task in state: "+this.getState_fromCache().name());
260 case FAILED:
261 this.rethrow();
262 throw new NoSuccessException("[INTERNAL ERROR] unexpected exception", this);
263 default:
264 return m_result;
265 }
266 }
267
268 public T getObject() throws NotImplementedException, TimeoutException, NoSuccessException {
269 return m_object;
270 }
271
272 public void rethrow() throws NotImplementedException, IncorrectURLException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, IncorrectStateException, AlreadyExistsException, DoesNotExistException, TimeoutException, SagaIOException, NoSuccessException {
273 switch (this.getState_fromCache()) {
274 case FAILED:
275 if (m_exception != null) {
276 try {throw m_exception;}
277 catch(NotImplementedException e) {throw e;}
278 catch(IncorrectURLException e) {throw e;}
279 catch(AuthenticationFailedException e) {throw e;}
280 catch(AuthorizationFailedException e) {throw e;}
281 catch(PermissionDeniedException e) {throw e;}
282 catch(BadParameterException e) {throw e;}
283 catch(IncorrectStateException e) {throw e;}
284 catch(AlreadyExistsException e) {throw e;}
285 catch(DoesNotExistException e) {throw e;}
286 catch(TimeoutException e) {throw e;}
287 catch(SagaIOException e) {throw e;}
288 catch(NoSuccessException e) {throw e;}
289 catch(SagaException e) {throw new NoSuccessException(m_exception);}
290 } else {
291 throw new NoSuccessException("task failed with unknown reason", this);
292 }
293 }
294 }
295
296
297
298 public synchronized void setState(State state) {
299
300 State oldState = m_metric_TaskState.getValue();
301 if (oldState == null)
302 oldState = State.RUNNING;
303
304
305 switch(oldState) {
306 case DONE:
307 case CANCELED:
308 case FAILED:
309 return;
310 default:
311 m_metric_TaskState.setValue(state);
312 }
313 }
314
315 public synchronized void setResult(E result) {
316 m_result = result;
317 }
318
319 public synchronized void setException(SagaException exception) {
320 m_exception = exception;
321 }
322
323
324
325
326
327
328
329
330
331
332
333
334
335 public boolean cancel(boolean mayInterruptIfRunning) {
336 switch (this.getState_fromCache(State.RUNNING)) {
337 case NEW:
338 this.setState(State.CANCELED);
339 return true;
340 case DONE:
341 case CANCELED:
342 case FAILED:
343 return false;
344 case RUNNING:
345 if (mayInterruptIfRunning) {
346 this.doCancel();
347 State state = this.getState_fromCache();
348 return State.CANCELED.equals(state);
349 }
350 }
351 return false;
352 }
353
354
355
356
357
358 public boolean isCancelled() {
359 State state = this.getState_fromCache();
360 return State.CANCELED.equals(state);
361 }
362
363
364
365
366
367
368 public boolean isDone() {
369 State state;
370 try {
371 state = this.getState();
372 } catch (Exception e) {
373 s_logger.warn("Failed to get state", e);
374 return false;
375 }
376 switch(state) {
377 case DONE:
378 case CANCELED:
379 case FAILED:
380 return true;
381 default:
382 return false;
383 }
384 }
385
386
387
388
389
390
391
392 public E get() throws InterruptedException, ExecutionException {
393 try {
394 this.waitFor();
395 } catch (SagaException e) {
396 throw new ExecutionException(e);
397 }
398 switch (this.getState_fromCache(State.DONE)) {
399 case DONE:
400 return m_result;
401 case CANCELED:
402 throw new InterruptedException("Task has been cancelled");
403 case FAILED:
404 throw new ExecutionException(m_exception);
405 default:
406 throw new ExecutionException("INTERNAL ERROR: unexpected exception", m_exception);
407 }
408 }
409
410
411
412
413
414
415
416
417
418
419
420 public E get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
421 try {
422 this.waitFor(unit.toSeconds(timeout));
423 } catch (SagaException e) {
424 throw new ExecutionException(e);
425 }
426 switch (this.getState_fromCache(State.DONE)) {
427 case DONE:
428 return m_result;
429 case CANCELED:
430 throw new InterruptedException("Task has been cancelled");
431 case FAILED:
432 throw new ExecutionException(m_exception);
433 default:
434 throw new java.util.concurrent.TimeoutException("The wait timed out");
435 }
436 }
437
438
439
440 void setWaitingFor(boolean isWaitingFor) {
441 m_isWaitingFor = isWaitingFor;
442 }
443
444 boolean isWaitingFor() {
445 return m_isWaitingFor;
446 }
447
448 boolean isDone_fromCache() {
449 switch(this.getState_fromCache()) {
450 case DONE:
451 case CANCELED:
452 case FAILED:
453 return true;
454 default:
455 return false;
456 }
457 }
458
459 protected State getState_fromCache() {
460 return getState_fromCache(null);
461 }
462 protected State getState_fromCache(State defaultState) {
463 State state = m_metric_TaskState.getValue();
464 if (state != null) {
465
466 return state;
467 } else if (defaultState != null) {
468
469 return defaultState;
470 } else {
471
472 try {
473 return this.getState();
474 } catch (Exception e) {
475 throw new RuntimeException("Failed to query state", e);
476 }
477 }
478 }
479 }