1 package fr.in2p3.jsaga.engine.job.monitor;
2
3 import fr.in2p3.jsaga.adaptor.job.monitor.*;
4 import fr.in2p3.jsaga.engine.job.monitor.listen.FilteredJobStatusListener;
5 import fr.in2p3.jsaga.engine.job.monitor.listen.IndividualJobStatusListener;
6 import fr.in2p3.jsaga.engine.job.monitor.poll.*;
7 import fr.in2p3.jsaga.engine.job.monitor.request.JobStatusRequestor;
8 import fr.in2p3.jsaga.impl.job.service.ReconnectionException;
9 import org.apache.log4j.Logger;
10 import org.ogf.saga.error.*;
11 import org.ogf.saga.url.URL;
12
13 import java.util.Map;
14
15
16
17
18
19
20
21
22
23
24
25
26
27 public class JobMonitorService {
28 private URL m_url;
29 private JobMonitorAdaptor m_adaptor;
30 private Map m_attributes;
31 private Logger m_stateLogger;
32
33 private JobStatusRequestor m_requestor;
34 private JobRegistry m_registry;
35
36
37 public JobMonitorService(URL url, JobMonitorAdaptor adaptor, Map attributes) throws NotImplementedException, TimeoutException, NoSuccessException {
38 m_url = url;
39 m_adaptor = adaptor;
40 m_attributes = attributes;
41 m_stateLogger = Logger.getLogger(adaptor.getClass());
42
43
44 m_requestor = new JobStatusRequestor(adaptor);
45
46
47 if (adaptor instanceof ListenFilteredJob) {
48 m_registry = new FilteredJobStatusListener((ListenFilteredJob)adaptor, m_requestor);
49 } else if (adaptor instanceof ListenIndividualJob) {
50 m_registry = new IndividualJobStatusListener((ListenIndividualJob)adaptor, m_requestor);
51 } else if (adaptor instanceof QueryFilteredJob) {
52 m_registry = new FilteredJobStatusPoller((QueryFilteredJob)adaptor);
53 } else if (adaptor instanceof QueryListJob) {
54 m_registry = new ListJobStatusPoller((QueryListJob)adaptor);
55 } else if (adaptor instanceof QueryIndividualJob) {
56 m_registry = new IndividualJobStatusPoller((QueryIndividualJob)adaptor);
57 } else {
58 throw new NotImplementedException("Adaptor does not implement any monitoring interface: "+adaptor.getClass().getName());
59 }
60 }
61
62 public URL getURL() {
63 return m_url;
64 }
65
66 public JobMonitorAdaptor getAdaptor() {
67 return m_adaptor;
68 }
69
70 public Map getAttributes() {
71 return m_attributes;
72 }
73
74 public Logger getStateLogger() {
75 return m_stateLogger;
76 }
77
78 public JobStatus getState(String nativeJobId) throws NotImplementedException, TimeoutException, NoSuccessException {
79 return m_requestor.getJobStatus(nativeJobId);
80 }
81
82 public void startListening(String nativeJobId, JobMonitorCallback callback) throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
83 m_registry.subscribeJob(nativeJobId, callback);
84 }
85
86 public void stopListening(String nativeJobId) throws NotImplementedException, TimeoutException, NoSuccessException {
87 m_registry.unsubscribeJob(nativeJobId);
88 }
89
90 public void disconnect() throws NoSuccessException {
91 m_adaptor.disconnect();
92 }
93
94 private boolean m_isReseting = false;
95 private SagaException m_exception = null;
96 public synchronized void startReset() {m_isReseting = true;}
97 public synchronized void stopReset() {m_isReseting = false;}
98 public synchronized void failReset(SagaException exception) {m_exception = exception;}
99 public synchronized void checkState() throws ReconnectionException, NoSuccessException {
100 if (m_isReseting) {
101 throw new ReconnectionException(new TimeoutException("Currently reconnecting to job service, please retry later..."));
102 } else if (m_exception != null) {
103 throw new NoSuccessException(m_exception);
104 }
105 }
106 }