View Javadoc

1   package fr.in2p3.jsaga.adaptor.job;
2   
3   import fr.in2p3.jsaga.adaptor.base.defaults.Default;
4   import fr.in2p3.jsaga.adaptor.base.usage.*;
5   import fr.in2p3.jsaga.adaptor.job.monitor.JobStatusNotifier;
6   import fr.in2p3.jsaga.adaptor.job.monitor.ListenFilteredJob;
7   import org.globus.io.gass.server.GassServer;
8   import org.globus.io.gass.server.JobOutputStream;
9   import org.ogf.saga.error.*;
10  
11  import java.net.InetAddress;
12  import java.net.UnknownHostException;
13  import java.util.Map;
14  
15  /* ***************************************************
16   * *** Centre de Calcul de l'IN2P3 - Lyon (France) ***
17   * ***             http://cc.in2p3.fr/             ***
18   * ***************************************************
19   * File:   LCGCEJobMonitorAdaptor
20   * Author: Sylvain Reynaud (sreynaud@in2p3.fr)
21   * Date:   9 juin 2009
22   * ***************************************************
23   * Description:                                      */
24  /**
25   * fixme: tell globus-gma to monitor our jobs!
26   * fixme: grid-monitor is not stopped when jsaga-job-status fails
27   */
28  public class LCGCEJobMonitorAdaptor extends GatekeeperJobAdaptorAbstract implements ListenFilteredJob {
29      private static final String GASS_PATH = "out";
30  
31      private GassServer m_gass;
32      private LCGCEJobMonitorWatchdog m_watchdog;
33  
34      public String getType() {
35          return "lcgce";
36      }
37  
38      /** override super.getUsage() */
39      public Usage getUsage() {
40          return new UAnd.Builder()
41                          .and(new UOptional(IP_ADDRESS))
42                          .and(new UOptional(TCP_PORT_RANGE))
43                          .build();
44      }
45  
46      /** override super.getDefaults() */
47      public Default[] getDefaults(Map attributes) throws IncorrectStateException {
48          String defaultIp;
49          try {
50              defaultIp = InetAddress.getLocalHost().getHostAddress();
51          } catch (UnknownHostException e) {
52              defaultIp = null;
53          }
54          return new Default[]{
55                  new Default(IP_ADDRESS, defaultIp),
56                  new Default(TCP_PORT_RANGE, "40000,45000")
57          };
58      }
59  
60      public void connect(String userInfo, String host, int port, String basePath, Map attributes) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, BadParameterException, TimeoutException, NoSuccessException {
61          super.connect(userInfo, host, port, basePath, attributes);
62  
63          // start grid-monitor
64          m_watchdog = new LCGCEJobMonitorWatchdog(m_credential, host, port);
65      }
66  
67      public void disconnect() throws NoSuccessException {
68          super.disconnect();
69  
70          // stop grid-monitor
71          m_watchdog.stopAll();
72      }
73  
74      public void subscribeFilteredJob(JobStatusNotifier notifier) throws TimeoutException, NoSuccessException {
75          try {
76              m_gass = new GassServer(m_credential, 0);
77          } catch (Exception e) {
78              throw new NoSuccessException("Failed to create Gass Server", e);
79          }
80          m_gass.registerDefaultDeactivator();
81          m_gass.registerJobOutputStream(GASS_PATH, new JobOutputStream(new LCGCEJobMonitorListener(notifier)));
82      }
83  
84      public void unsubscribeFilteredJob() throws TimeoutException, NoSuccessException {
85          m_gass.unregisterJobOutputStream(GASS_PATH);
86          m_gass.unregisterDefaultDeactivator();
87          m_gass = null;
88      }
89  }