View Javadoc

1   package fr.in2p3.jsaga.engine.job.monitor;
2   
3   import fr.in2p3.jsaga.adaptor.job.monitor.*;
4   import fr.in2p3.jsaga.engine.job.monitor.listen.FilteredJobStatusListener;
5   import fr.in2p3.jsaga.engine.job.monitor.listen.IndividualJobStatusListener;
6   import fr.in2p3.jsaga.engine.job.monitor.poll.*;
7   import fr.in2p3.jsaga.engine.job.monitor.request.JobStatusRequestor;
8   import fr.in2p3.jsaga.impl.job.service.ReconnectionException;
9   import org.apache.log4j.Logger;
10  import org.ogf.saga.error.*;
11  import org.ogf.saga.url.URL;
12  
13  import java.util.Map;
14  
15  /* ***************************************************
16  * *** Centre de Calcul de l'IN2P3 - Lyon (France) ***
17  * ***             http://cc.in2p3.fr/             ***
18  * ***************************************************
19  * File:   JobMonitorService
20  * Author: Sylvain Reynaud (sreynaud@in2p3.fr)
21  * Date:   22 nov. 2007
22  * ***************************************************
23  * Description:                                      */
24  /**
25   *
26   */
27  public class JobMonitorService {
28      private URL m_url;
29      private JobMonitorAdaptor m_adaptor;
30      private Map m_attributes;
31      private Logger m_stateLogger;
32  
33      private JobStatusRequestor m_requestor;
34      private JobRegistry m_registry;
35  
36      /** constructor */
37      public JobMonitorService(URL url, JobMonitorAdaptor adaptor, Map attributes) throws NotImplementedException, TimeoutException, NoSuccessException {
38          m_url = url;
39          m_adaptor = adaptor;
40          m_attributes = attributes;
41          m_stateLogger = Logger.getLogger(adaptor.getClass());
42  
43          // set requestor
44          m_requestor = new JobStatusRequestor(adaptor);
45  
46          // set registry (listeners first, then pollers)
47          if (adaptor instanceof ListenFilteredJob) {
48              m_registry = new FilteredJobStatusListener((ListenFilteredJob)adaptor, m_requestor);
49          } else if (adaptor instanceof ListenIndividualJob) {
50              m_registry = new IndividualJobStatusListener((ListenIndividualJob)adaptor, m_requestor);
51          } else if (adaptor instanceof QueryFilteredJob) {
52              m_registry = new FilteredJobStatusPoller((QueryFilteredJob)adaptor);
53          } else if (adaptor instanceof QueryListJob) {
54              m_registry = new ListJobStatusPoller((QueryListJob)adaptor);
55          } else if (adaptor instanceof QueryIndividualJob) {
56              m_registry = new IndividualJobStatusPoller((QueryIndividualJob)adaptor);
57          } else {
58              throw new NotImplementedException("Adaptor does not implement any monitoring interface: "+adaptor.getClass().getName());
59          }
60      }
61  
62      public URL getURL() {
63          return m_url;
64      }
65  
66      public JobMonitorAdaptor getAdaptor() {
67          return m_adaptor;
68      }
69  
70      public Map getAttributes() {
71          return m_attributes;
72      }
73  
74      public Logger getStateLogger() {
75          return m_stateLogger;
76      }
77  
78      public JobStatus getState(String nativeJobId) throws NotImplementedException, TimeoutException, NoSuccessException {
79          return m_requestor.getJobStatus(nativeJobId);
80      }
81  
82      public void startListening(String nativeJobId, JobMonitorCallback callback) throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
83          m_registry.subscribeJob(nativeJobId, callback);
84      }
85  
86      public void stopListening(String nativeJobId) throws NotImplementedException, TimeoutException, NoSuccessException {
87          m_registry.unsubscribeJob(nativeJobId);
88      }
89  
90      public void disconnect() throws NoSuccessException {
91      	m_adaptor.disconnect();
92      }
93      
94      private boolean m_isReseting = false;
95      private SagaException m_exception = null;
96      public synchronized void startReset() {m_isReseting = true;}
97      public synchronized void stopReset() {m_isReseting = false;}
98      public synchronized void failReset(SagaException exception) {m_exception = exception;}
99      public synchronized void checkState() throws ReconnectionException, NoSuccessException {
100         if (m_isReseting) {
101             throw new ReconnectionException(new TimeoutException("Currently reconnecting to job service, please retry later..."));
102         } else if (m_exception != null) {
103             throw new NoSuccessException(m_exception);
104         }
105     }
106 }