View Javadoc

1   package fr.in2p3.jsaga.adaptor.job;
2   
3   import fr.in2p3.jsaga.adaptor.base.defaults.Default;
4   import fr.in2p3.jsaga.adaptor.base.usage.*;
5   import fr.in2p3.jsaga.adaptor.job.control.JobControlAdaptor;
6   import fr.in2p3.jsaga.adaptor.job.control.advanced.CleanableJobAdaptor;
7   import fr.in2p3.jsaga.adaptor.job.monitor.JobMonitorAdaptor;
8   import org.apache.log4j.Logger;
9   import org.globus.io.gass.server.GassServer;
10  import org.globus.rsl.*;
11  import org.ogf.saga.error.*;
12  
13  import java.io.File;
14  import java.net.InetAddress;
15  import java.net.UnknownHostException;
16  import java.util.Map;
17  
18  /*
19   * Aurelien Croc (CEA Saclay, FRANCE)
20   * May, 1st 2010
21   *
22   * LCGCE+ Job Control adaptor
23   *
24   * This LCGCE adaptor provides the upload of the standard input stream for each
25   * job through a unique GASS server loaded once.
26   */
27  
28  public class GatekeeperCondorJobControlAdaptor extends GkCommonJobControlAdaptor implements JobControlAdaptor, CleanableJobAdaptor {
29      private Logger logger = Logger.getLogger(GatekeeperCondorJobControlAdaptor.class);
30  
31      /** override super.getType() */
32      public String getType() {
33          return "gatekeeper-condor";
34      }
35  
36      /** override super.getUsage() */
37      public Usage getUsage() {
38          return new UAnd.Builder()
39                  .and(new UOptional(IP_ADDRESS))
40                  .and(new UOptional(TCP_PORT_RANGE))
41                  .build();
42      }
43  
44      /** override super.getDefaults() */
45      public Default[] getDefaults(Map attributes) throws IncorrectStateException {
46          try {
47              String defaultIp = InetAddress.getLocalHost().getHostAddress();
48              String defaultTcpPortRange="40000,45000";
49              return new Default[]{new Default(IP_ADDRESS, defaultIp),new Default(TCP_PORT_RANGE, defaultTcpPortRange)};
50          } catch (UnknownHostException e) {
51              return null;
52          }
53      }
54  
55      /** override super.getDefaultJobMonitor() */
56      public JobMonitorAdaptor getDefaultJobMonitor() {
57          return new GatekeeperCondorJobMonitorAdaptor();
58      }
59  
60      protected String submit(RslNode rslTree, boolean checkMatch, boolean isInteractive) throws PermissionDeniedException, TimeoutException, NoSuccessException, BadResource {
61          String gassURL;
62  
63          if (rslTree.getParam("stdin") != null && rslTree.getParam("stdin").
64                  getValues().size() > 0) {
65              String inputFilename;
66              Bindings subst;
67              File inputFile;
68  
69              // Start the GASS server if it's not started yet
70              try {
71                  gassURL = startGassServer();
72              } catch (Exception e) {
73                  throw new NoSuccessException(e);
74              }
75  
76              // Check the validity of the input file and get its absolute path
77              inputFilename = rslTree.getParam("stdin").getValues().get(0).
78                  toString();
79              inputFile = new File(inputFilename);
80              if (!(inputFile.isFile()))
81                  throw new NoSuccessException("The input file " + inputFilename +
82                      " is not accessible");
83              // Bug workaround?? The first slash is eaten by the Gass server for
84              // unknown reason!
85              inputFilename = "/" + inputFile.getAbsolutePath();
86  
87              // Update the RSL to use the GASS server to access to the input file
88              subst = new Bindings("rsl_substitution");
89              subst.add(new Binding("GLOBUSRUN_GASS_URL", gassURL));
90              rslTree.add(subst);
91              NameOpValue stdinUrl = new NameOpValue("stdin", NameOpValue.EQ,
92                  new VarRef("GLOBUSRUN_GASS_URL", null,
93                  new Value(inputFilename)));
94              rslTree.add(stdinUrl);
95  
96          }
97  
98          // Submit the job
99          return super.submit(rslTree, checkMatch, isInteractive);
100     }
101 
102     private String startGassServer() throws Exception {
103         GassServer gassServer;
104 
105         try {
106             gassServer = GassServerFactory.getGassServer(m_credential);
107             gassServer.registerDefaultDeactivator();
108         } catch (Exception e) {
109             throw new Exception("Problems while creating a Gass Server", e);
110         }
111         String gassURL = gassServer.getURL();
112         logger.debug("Started the GASS server");
113         return gassURL;
114     }
115 
116 }