1 package fr.in2p3.jsaga.impl.task;
2
3 import fr.in2p3.jsaga.impl.monitoring.AbstractMonitorableImpl;
4 import org.ogf.saga.SagaObject;
5 import org.ogf.saga.error.*;
6 import org.ogf.saga.session.Session;
7 import org.ogf.saga.task.*;
8
9 import java.util.*;
10
11
12
13
14
15
16
17
18
19
20
21
22
23 public class TaskContainerImpl extends AbstractMonitorableImpl implements TaskContainer {
24
25
26
27 private final Map<String,AbstractTaskImpl> m_tasks;
28
29
30 public TaskContainerImpl(Session session) throws NoSuccessException {
31 super(session);
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 m_tasks = Collections.synchronizedMap(new HashMap<String,AbstractTaskImpl>());
47 }
48
49
50 public SagaObject clone() throws CloneNotSupportedException {
51 TaskContainerImpl clone = (TaskContainerImpl) super.clone();
52
53 clone.m_tasks.putAll(m_tasks);
54 return clone;
55 }
56
57 public void add(Task<?,?> task) throws NotImplementedException, TimeoutException, NoSuccessException {
58 m_tasks.put(task.getId(), (AbstractTaskImpl) task);
59 }
60
61 public void remove(Task<?,?> task) throws NotImplementedException, DoesNotExistException, TimeoutException, NoSuccessException {
62 Task removed = m_tasks.remove(task.getId());
63 if (removed== null) {
64 throw new DoesNotExistException("Task not in task container: "+task.getId(), this);
65 }
66 }
67
68 public void run() throws NotImplementedException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException {
69 synchronized(m_tasks) {
70 if (m_tasks.isEmpty())
71 throw new DoesNotExistException("Container is empty");
72 for (Task task : m_tasks.values()) {
73 task.run();
74 }
75 }
76 }
77
78 public Task<?, ?> waitFor() throws NotImplementedException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException {
79 return this.waitFor(WAIT_FOREVER, WaitMode.ALL);
80 }
81
82 public Task<?,?> waitFor(WaitMode mode) throws NotImplementedException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException {
83 return this.waitFor(WAIT_FOREVER, mode);
84 }
85
86 public Task<?, ?> waitFor(float timeoutInSeconds) throws NotImplementedException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException {
87 return this.waitFor(timeoutInSeconds, WaitMode.ALL);
88 }
89
90 public Task<?,?> waitFor(float timeoutInSeconds, WaitMode mode) throws NotImplementedException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException {
91 if (m_tasks.isEmpty()) {
92 throw new DoesNotExistException("Task container is empty", this);
93 }
94
95 this.startListening();
96
97
98
99
100
101
102
103
104
105
106
107
108
109 String id = null;
110 try {
111 boolean forever;
112 long endTime;
113 if (timeoutInSeconds == WAIT_FOREVER) {
114 forever = true;
115 endTime = -1;
116 } else if (timeoutInSeconds == NO_WAIT) {
117 forever = false;
118 endTime = -1;
119 } else {
120 forever = false;
121 endTime = System.currentTimeMillis() + (long) timeoutInSeconds;
122 }
123 while((id=this.getFinished(mode))==null && (forever || System.currentTimeMillis()<endTime)) {
124 Thread.currentThread().sleep(100);
125 }
126 } catch(InterruptedException e) {
127
128 this.stopListening();
129
130
131
132
133
134
135
136
137
138
139
140 return m_tasks.remove(id);
141 }
142
143 public void cancel() throws NotImplementedException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException {
144 synchronized(m_tasks) {
145 if (m_tasks.isEmpty())
146 throw new DoesNotExistException("Container is empty");
147 for (Task task : m_tasks.values()) {
148 task.cancel();
149 }
150 }
151 }
152
153 public void cancel(float timeoutInSeconds) throws NotImplementedException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException {
154 synchronized(m_tasks) {
155 if (m_tasks.isEmpty())
156 throw new DoesNotExistException("Container is empty");
157 for (Task task : m_tasks.values()) {
158 task.cancel(timeoutInSeconds);
159 }
160 }
161 }
162
163 public int size() throws NotImplementedException, TimeoutException, NoSuccessException {
164 return m_tasks.size();
165 }
166
167 public Task<?,?> getTask(String id) throws NotImplementedException, DoesNotExistException, TimeoutException, NoSuccessException {
168 Task task = m_tasks.get(id);
169 if (task == null) {
170 throw new DoesNotExistException("Task not in task container: "+id, this);
171 }
172 return task;
173 }
174
175 public Task<?,?>[] getTasks() throws NotImplementedException, TimeoutException, NoSuccessException {
176 synchronized(m_tasks) {
177 return m_tasks.values().toArray(new Task[m_tasks.size()]);
178 }
179 }
180
181 public State[] getStates() throws NotImplementedException, TimeoutException, NoSuccessException {
182 int i=0;
183 synchronized(m_tasks) {
184 State[] states = new State[m_tasks.size()];
185 for (Task task : m_tasks.values()) {
186 states[i++] = task.getState();
187 }
188 return states;
189 }
190 }
191
192
193
194
195
196
197
198 private void startListening() throws NotImplementedException, IncorrectStateException, NoSuccessException {
199 try {
200 synchronized(m_tasks) {
201 for (AbstractTaskImpl task : m_tasks.values()) {
202 if (! task.isDone_fromCache()) {
203 boolean isListening = task.startListening();
204 task.setWaitingFor(isListening);
205 }
206 }
207 }
208 } catch(TimeoutException e) {
209 throw new NoSuccessException(e);
210 }
211 }
212 private void stopListening() throws NotImplementedException, NoSuccessException {
213 try {
214 synchronized(m_tasks) {
215 for (AbstractTaskImpl task : m_tasks.values()) {
216 if (task.isWaitingFor()) {
217 task.stopListening();
218 task.setWaitingFor(false);
219 }
220 }
221 }
222 } catch(TimeoutException e) {
223 throw new NoSuccessException(e);
224 }
225 }
226
227 private String getFinished(WaitMode mode) throws NotImplementedException {
228 switch(mode) {
229 case ALL:
230 synchronized(m_tasks) {
231 String id = null;
232 for (Map.Entry<String,AbstractTaskImpl> entry : m_tasks.entrySet()) {
233 id = entry.getKey();
234 AbstractTaskImpl task = entry.getValue();
235 if (!task.isDone()) {
236 return null;
237 }
238 }
239 return id;
240 }
241 case ANY:
242 synchronized(m_tasks) {
243 for (Map.Entry<String,AbstractTaskImpl> entry : m_tasks.entrySet()) {
244 String id = entry.getKey();
245 AbstractTaskImpl task = entry.getValue();
246 if (task.isDone()) {
247 return id;
248 }
249 }
250 return null;
251 }
252 default:
253 throw new NotImplementedException("INTERNAL ERROR: unexpected exception");
254 }
255 }
256 }