View Javadoc

1   package fr.in2p3.jsaga.impl.resource.task;
2   
3   import org.ogf.saga.context.Context;
4   import org.ogf.saga.error.*;
5   import org.ogf.saga.monitoring.Callback;
6   import org.ogf.saga.monitoring.Metric;
7   import org.ogf.saga.monitoring.Monitorable;
8   import org.ogf.saga.resource.instance.Resource;
9   import org.ogf.saga.resource.task.ResourceTask;
10  import org.ogf.saga.resource.task.State;
11  import org.ogf.saga.session.Session;
12  
13  import fr.in2p3.jsaga.adaptor.resource.ResourceAdaptor;
14  import fr.in2p3.jsaga.helpers.SAGAId;
15  import fr.in2p3.jsaga.impl.resource.instance.ResourceAttributes;
16  
17  /* ***************************************************
18   * *** Centre de Calcul de l'IN2P3 - Lyon (France) ***
19   * ***             http://cc.in2p3.fr/             ***
20   * ***************************************************/
21  public class AbstractResourceTaskImpl<R extends Resource>
22          extends AbstractMonitorableWithAsyncAttributes<R>
23          implements ResourceTask, ResourceMonitorCallback
24  {
25      protected ResourceAdaptor m_adaptor;
26      protected ResourceAttributes m_attributes;
27      private StateListener m_listener;
28      private ResourceMetrics m_metrics;
29      private State m_state = State.NEW;
30      private long m_stateLastUpdate = 0;
31  
32      // TODO: make this a parameter
33      private long m_stateLifetimeMillis = 30000;
34      
35      /** common to all constructors */
36      public AbstractResourceTaskImpl(Session session, StateListener listener, ResourceAdaptor adaptor) {
37          super(session);
38          m_adaptor = adaptor;
39          m_listener = listener;
40          m_metrics = new ResourceMetrics(this);
41      }
42  
43      @Override
44      public String getId() {
45          return m_attributes.m_ResourceID.getObject();
46      }
47  
48      public State getState() throws NotImplementedException, TimeoutException, NoSuccessException {
49          // either take status from cache or ask the adaptor
50          if (m_state != null && m_stateLastUpdate != 0 
51                  && (System.currentTimeMillis() < m_stateLastUpdate + m_stateLifetimeMillis)) {
52              return m_state;
53          } else {
54              try {
55                  return m_adaptor.getResourceStatus(SAGAId.idFromSagaId(getId())).getSagaState();
56              } catch (DoesNotExistException e) {
57                  throw new NoSuccessException(e);
58              } catch (BadParameterException e) {
59                  throw new NoSuccessException(e);
60              }
61          }
62      }
63      
64      public void waitFor() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
65          this.waitFor(WAIT_FOREVER, State.FINAL);
66      }
67      public void waitFor(float timeoutInSeconds) throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
68          this.waitFor(timeoutInSeconds, State.FINAL);
69      }
70      public void waitFor(State state) throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
71          this.waitFor(WAIT_FOREVER, state);
72      }
73      public void waitFor(float timeoutInSeconds, State state) throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
74          try {
75              // start listening
76              int cookie = m_metrics.m_State.addCallback(new Callback() {
77                  public boolean cb(Monitorable mt, Metric metric, Context ctx) throws NotImplementedException, AuthorizationFailedException {
78                      AbstractResourceTaskImpl resource = (AbstractResourceTaskImpl) mt;
79                      try {
80                          String value = metric.getAttribute(Metric.VALUE);
81                          State current = State.valueOf(value);
82                          resource.setState(current, null);
83                      }
84                      catch (NotImplementedException e) {throw e;}
85                      catch (AuthorizationFailedException e) {throw e;}
86                      catch (Exception e) {e.printStackTrace();}
87                      // callback must stay registered
88                      return true;
89                  }
90              });
91  
92              // wait for specified state
93              long endTime;
94              if (timeoutInSeconds == WAIT_FOREVER) {
95                  endTime = -1;
96              } else {
97                  endTime = System.currentTimeMillis() + (long) (timeoutInSeconds*1000f);
98              }
99              int mask = state.getValue();
100             State current;
101             do {
102                 try {
103                     Thread.sleep(500);
104                 } catch (InterruptedException e) {
105                     throw new NoSuccessException(e);
106                 }
107                 current = this.m_state;
108                 if (System.currentTimeMillis()>=endTime) {
109                     throw new TimeoutException();
110                 }
111             } while ((current.getValue() & mask) == 0);
112 
113             // stop listening
114             m_metrics.m_State.removeCallback(cookie);
115         } catch (AuthenticationFailedException e) {
116             throw new NoSuccessException(e);
117         } catch (AuthorizationFailedException e) {
118             throw new NoSuccessException(e);
119         } catch (PermissionDeniedException e) {
120             throw new NoSuccessException(e);
121         } catch (BadParameterException e) {
122             throw new NoSuccessException(e);
123         }
124     }
125 
126     ////////////////////////////////////////// internal methods //////////////////////////////////////////
127 
128     public void startListening() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
129         // forward to manager
130         try {
131             m_listener.startListening(SAGAId.idFromSagaId(getId()), this);
132         } catch (BadParameterException e) {
133             throw new NoSuccessException(e);
134         }
135     }
136     public void stopListening() throws NotImplementedException, TimeoutException, NoSuccessException {
137         // forward to manager
138         try {
139             m_listener.stopListening(SAGAId.idFromSagaId(getId()));
140         } catch (BadParameterException e) {
141             throw new NoSuccessException(e);
142         }
143     }
144     
145     @Override
146     public void setState(State state, String stateDetail) {
147         // save the notified state
148         m_state = state;
149         m_metrics.m_StateDetail.setValue(stateDetail);
150         m_stateLastUpdate = System.currentTimeMillis();
151     }
152 
153 }