View Javadoc

1   package fr.in2p3.jsaga.impl.task;
2   
3   import fr.in2p3.jsaga.impl.monitoring.AbstractMonitorableImpl;
4   import org.ogf.saga.SagaObject;
5   import org.ogf.saga.error.*;
6   import org.ogf.saga.session.Session;
7   import org.ogf.saga.task.*;
8   
9   import java.util.*;
10  
11  /* ***************************************************
12  * *** Centre de Calcul de l'IN2P3 - Lyon (France) ***
13  * ***             http://cc.in2p3.fr/             ***
14  * ***************************************************
15  * File:   TaskContainerImpl
16  * Author: Sylvain Reynaud (sreynaud@in2p3.fr)
17  * Date:   18 sept. 2007
18  * ***************************************************
19  * Description:                                      */
20  /**
21   *
22   */
23  public class TaskContainerImpl extends AbstractMonitorableImpl implements TaskContainer {
24      // metrics
25  //    private MetricImpl<Task> m_metric_TaskContainerState;
26      // internal
27      private final Map<String,AbstractTaskImpl> m_tasks;
28  
29      /** constructor */
30      public TaskContainerImpl(Session session) throws NoSuccessException {
31          super(session);
32  
33          // set metrics
34  /*
35          m_metric_TaskContainerState = this._addMetric(new MetricImpl<Task>(
36                  this,
37                  TaskContainer.TASKCONTAINER_STATE,
38                  "fires on state changes of any task in the container, and has the value of that task's handle",
39                  MetricMode.ReadOnly,
40                  "1",
41                  MetricType.Enum,
42                  null));
43  */
44  
45          // internal
46          m_tasks = Collections.synchronizedMap(new HashMap<String,AbstractTaskImpl>());
47      }
48  
49      /** clone */
50      public SagaObject clone() throws CloneNotSupportedException {
51          TaskContainerImpl clone = (TaskContainerImpl) super.clone();
52  //        clone.m_metric_TaskContainerState = m_metric_TaskContainerState;
53          clone.m_tasks.putAll(m_tasks);
54          return clone;
55      }
56  
57      public void add(Task<?,?> task) throws NotImplementedException, TimeoutException, NoSuccessException {
58          m_tasks.put(task.getId(), (AbstractTaskImpl) task);
59      }
60  
61      public void remove(Task<?,?> task) throws NotImplementedException, DoesNotExistException, TimeoutException, NoSuccessException {
62          Task removed = m_tasks.remove(task.getId());
63          if (removed== null) {
64              throw new DoesNotExistException("Task not in task container: "+task.getId(), this);
65          }
66      }
67  
68      public void run() throws NotImplementedException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException {
69          synchronized(m_tasks) {
70          	if (m_tasks.isEmpty())
71          		throw new DoesNotExistException("Container is empty");
72              for (Task task : m_tasks.values()) {
73                  task.run();
74              }
75          }
76      }
77  
78      public Task<?, ?> waitFor() throws NotImplementedException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException {
79          return this.waitFor(WAIT_FOREVER, WaitMode.ALL);
80      }
81  
82      public Task<?,?> waitFor(WaitMode mode) throws NotImplementedException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException {
83          return this.waitFor(WAIT_FOREVER, mode);
84      }
85  
86      public Task<?, ?> waitFor(float timeoutInSeconds) throws NotImplementedException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException {
87          return this.waitFor(timeoutInSeconds, WaitMode.ALL);
88      }
89  
90      public Task<?,?> waitFor(float timeoutInSeconds, WaitMode mode) throws NotImplementedException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException {
91          if (m_tasks.isEmpty()) {
92              throw new DoesNotExistException("Task container is empty", this);
93          }
94  
95          this.startListening();
96          // todo: use addCallback instead of startListening
97          /*int tcCookie;
98          try {
99              tcCookie = m_metric_TaskContainerState.addCallback(new Callback(){
100                 public boolean cb(Monitorable mt, Metric metric, Context ctx) throws NotImplementedException, AuthorizationFailedException {
101                     return !m_tasks.isEmpty();
102                 }
103             });
104         }
105         catch (AuthenticationFailedException e) {throw new NoSuccessException(e);}
106         catch (AuthorizationFailedException e) {throw new NoSuccessException(e);}
107         catch (PermissionDeniedException e) {throw new NoSuccessException(e);}*/
108 
109         String id = null;
110         try {
111             boolean forever;
112             long endTime;
113             if (timeoutInSeconds == WAIT_FOREVER) {
114                 forever = true;
115                 endTime = -1;
116             } else if (timeoutInSeconds == NO_WAIT) {
117                 forever = false;
118                 endTime = -1;
119             } else {
120                 forever = false;
121                 endTime = System.currentTimeMillis() + (long) timeoutInSeconds;
122             }
123             while((id=this.getFinished(mode))==null && (forever || System.currentTimeMillis()<endTime)) {
124                 Thread.currentThread().sleep(100);
125             }
126         } catch(InterruptedException e) {/*ignore*/}
127 
128         this.stopListening();
129         /*{
130             // callback may have not been removed
131             try {
132                 m_metric_TaskContainerState.removeCallback(tcCookie);
133             }
134             catch (BadParameterException e2) {throw new NoSuccessException(e);}
135             catch (AuthenticationFailedException e2) {throw new NoSuccessException(e);}
136             catch (AuthorizationFailedException e2) {throw new NoSuccessException(e);}
137             catch (PermissionDeniedException e2) {throw new NoSuccessException(e);}
138         }*/
139 
140         return m_tasks.remove(id);
141     }
142 
143     public void cancel() throws NotImplementedException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException {
144         synchronized(m_tasks) {
145         	if (m_tasks.isEmpty())
146         		throw new DoesNotExistException("Container is empty");
147             for (Task task : m_tasks.values()) {
148                 task.cancel();
149             }
150         }
151     }
152 
153     public void cancel(float timeoutInSeconds) throws NotImplementedException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException {
154         synchronized(m_tasks) {
155         	if (m_tasks.isEmpty())
156         		throw new DoesNotExistException("Container is empty");
157             for (Task task : m_tasks.values()) {
158                 task.cancel(timeoutInSeconds);
159             }
160         }
161     }
162 
163     public int size() throws NotImplementedException, TimeoutException, NoSuccessException {
164         return m_tasks.size();
165     }
166 
167     public Task<?,?> getTask(String id) throws NotImplementedException, DoesNotExistException, TimeoutException, NoSuccessException {
168         Task task = m_tasks.get(id);
169         if (task == null) {
170             throw new DoesNotExistException("Task not in task container: "+id, this);
171         }
172         return task;
173     }
174 
175     public Task<?,?>[] getTasks() throws NotImplementedException, TimeoutException, NoSuccessException {
176         synchronized(m_tasks) {
177             return m_tasks.values().toArray(new Task[m_tasks.size()]);
178         }
179     }
180 
181     public State[] getStates() throws NotImplementedException, TimeoutException, NoSuccessException {
182         int i=0;
183         synchronized(m_tasks) {
184             State[] states = new State[m_tasks.size()];
185             for (Task task : m_tasks.values()) {
186                 states[i++] = task.getState();
187             }
188             return states;
189         }
190     }
191 
192 /*
193     protected void notifyStateChange(Task task) {
194         m_metric_TaskContainerState.setValue(task);
195     }
196 */
197 
198     private void startListening() throws NotImplementedException, IncorrectStateException, NoSuccessException {
199         try {
200             synchronized(m_tasks) {
201                 for (AbstractTaskImpl task : m_tasks.values()) {
202                     if (! task.isDone_fromCache()) {
203                         boolean isListening = task.startListening();
204                         task.setWaitingFor(isListening);
205                     }
206                 }
207             }
208         } catch(TimeoutException e) {
209             throw new NoSuccessException(e);
210         }
211     }
212     private void stopListening() throws NotImplementedException, NoSuccessException {
213         try {
214             synchronized(m_tasks) {
215                 for (AbstractTaskImpl task : m_tasks.values()) {
216                     if (task.isWaitingFor()) {
217                         task.stopListening();
218                         task.setWaitingFor(false);
219                     }
220                 }
221             }
222         } catch(TimeoutException e) {
223             throw new NoSuccessException(e);
224         }
225     }
226 
227     private String getFinished(WaitMode mode) throws NotImplementedException {
228         switch(mode) {
229             case ALL:
230                 synchronized(m_tasks) {
231                     String id = null;
232                     for (Map.Entry<String,AbstractTaskImpl> entry : m_tasks.entrySet()) {
233                         id = entry.getKey();
234                         AbstractTaskImpl task = entry.getValue();
235                         if (!task.isDone()) {
236                             return null;
237                         }
238                     }
239                     return id;
240                 }
241             case ANY:
242                 synchronized(m_tasks) {
243                     for (Map.Entry<String,AbstractTaskImpl> entry : m_tasks.entrySet()) {
244                         String id = entry.getKey();
245                         AbstractTaskImpl task = entry.getValue();
246                         if (task.isDone()) {
247                             return id;
248                         }
249                     }
250                     return null;
251                 }
252             default:
253                 throw new NotImplementedException("INTERNAL ERROR: unexpected exception");
254         }
255     }
256 }