View Javadoc

1   package fr.in2p3.jsaga.adaptor.job;
2   
3   import fr.in2p3.jsaga.adaptor.base.defaults.Default;
4   import fr.in2p3.jsaga.adaptor.base.usage.*;
5   import fr.in2p3.jsaga.adaptor.job.control.JobControlAdaptor;
6   import fr.in2p3.jsaga.adaptor.job.control.advanced.CleanableJobAdaptor;
7   import fr.in2p3.jsaga.adaptor.job.control.interactive.StreamableJobInteractiveSet;
8   import fr.in2p3.jsaga.adaptor.job.monitor.JobMonitorAdaptor;
9   import org.apache.log4j.Logger;
10  import org.globus.io.gass.server.GassServer;
11  import org.globus.io.gass.server.JobOutputStream;
12  import org.globus.rsl.*;
13  import org.ogf.saga.error.*;
14  
15  import java.io.*;
16  import java.net.InetAddress;
17  import java.net.UnknownHostException;
18  import java.util.Map;
19  
20  /* ***************************************************
21  * *** Centre de Calcul de l'IN2P3 - Lyon (France) ***
22  * ***             http://cc.in2p3.fr/             ***
23  * ***************************************************
24  * File:   GatekeeperJobControlAdaptor
25  * Author: Sylvain Reynaud (sreynaud@in2p3.fr)
26  * Date:   9 nov. 2007
27  * ***************************************************
28  * Description:                                      */
29  /**
30   * TODO: remove stdin file and stop gass server when cleanup
31   */
32  public class GatekeeperJobControlAdaptor extends GkCommonJobControlAdaptor implements JobControlAdaptor, CleanableJobAdaptor, StreamableJobInteractiveSet {
33      private static final String SHELLPATH = "ShellPath";
34      private Logger logger = Logger.getLogger(GatekeeperJobControlAdaptor.class);
35  
36      /** override super.getType() */
37      public String getType() {
38          return "gatekeeper";
39      }
40  
41      /** override super.getUsage() */
42      public Usage getUsage() {
43          return new UAnd.Builder()
44                  .and(new UOptional(SHELLPATH))
45                  .and(new UOptional(IP_ADDRESS))
46                  .and(new UOptional(TCP_PORT_RANGE))
47                  .build();
48      }
49  
50      /** override super.getDefaults() */
51      public Default[] getDefaults(Map attributes) throws IncorrectStateException {
52          try {
53              String defaultIp = InetAddress.getLocalHost().getHostAddress();
54              String defaultTcpPortRange="40000,45000";
55              return new Default[]{new Default(IP_ADDRESS, defaultIp),new Default(TCP_PORT_RANGE, defaultTcpPortRange)};
56          } catch (UnknownHostException e) {
57              return null;
58          }
59      }
60  
61      /** override super.getDefaultJobMonitor() */
62      public JobMonitorAdaptor getDefaultJobMonitor() {
63          return new GatekeeperJobMonitorAdaptor();
64      }
65  
66      public String submitInteractive(String jobDesc, boolean checkMatch, InputStream stdin, OutputStream stdout, OutputStream stderr) throws PermissionDeniedException, TimeoutException, NoSuccessException {
67          RslNode rslTree;
68          String gassURL;
69          try {
70              rslTree = RSLParser.parse(jobDesc);
71              gassURL = startGassServer(stdout, stderr);
72          } catch (Exception e) {
73              throw new NoSuccessException(e);
74          }
75          // update RSL
76          Bindings subst = new Bindings("rsl_substitution");
77          subst.add(new Binding("GLOBUSRUN_GASS_URL", gassURL));
78          rslTree.add(subst);
79          if (stdin != null) {
80              File stdinFile;
81              try{stdinFile=File.createTempFile("stdin-",".txt",new File("./"));} catch(IOException e){throw new NoSuccessException(e);}
82              //todo: remove stdinFile on cleanup() instead of on exit
83              stdinFile.deleteOnExit();
84              save(stdin, stdinFile);
85              NameOpValue stdinUrl = new NameOpValue("stdin", NameOpValue.EQ,
86                      new VarRef("GLOBUSRUN_GASS_URL", null, new Value("/"+stdinFile.getName())));
87              rslTree.add(stdinUrl);
88          }
89          NameOpValue stdoutUrl = new NameOpValue("stdout", NameOpValue.EQ,
90                  new VarRef("GLOBUSRUN_GASS_URL", null, new Value("/dev/stdout-rgs")));
91          rslTree.add(stdoutUrl);
92          NameOpValue stderrUrl = new NameOpValue("stderr", NameOpValue.EQ,
93                  new VarRef("GLOBUSRUN_GASS_URL", null, new Value("/dev/stderr-rgs")));
94          rslTree.add(stderrUrl);
95          return super.submit(rslTree, checkMatch, true);
96      }
97      
98      private String startGassServer(OutputStream stdout, OutputStream stderr) throws Exception {
99          GassServer gassServer;
100         try {
101             gassServer = GassServerFactory.getGassServer(m_credential);
102             gassServer.registerDefaultDeactivator();
103         } catch (Exception e) {
104             throw new Exception("Problems while creating a Gass Server", e);
105         }
106         String gassURL = gassServer.getURL();
107         gassServer.registerJobOutputStream("out-rgs", new JobOutputStream(new GatekeeperJobOutputListener(stdout)));
108         gassServer.registerJobOutputStream("err-rgs", new JobOutputStream(new GatekeeperJobOutputListener(stderr)));
109         logger.debug("Started the GASS server");
110         return gassURL;
111     }
112 
113     private static void save(InputStream in, File file) throws NoSuccessException {
114         try {
115             OutputStream out = new FileOutputStream(file);
116             byte[] buffer = new byte[1024];
117             for (int len; (len=in.read(buffer))>0; ) {
118                 out.write(buffer, 0, len);
119             }
120             out.close();
121         } catch(IOException e) {
122             throw new NoSuccessException(e);
123         }
124     }
125 }