1 package fr.in2p3.jsaga.adaptor.job;
2
3 import fr.in2p3.jsaga.adaptor.base.defaults.Default;
4 import fr.in2p3.jsaga.adaptor.base.usage.*;
5 import fr.in2p3.jsaga.adaptor.job.control.JobControlAdaptor;
6 import fr.in2p3.jsaga.adaptor.job.control.advanced.CleanableJobAdaptor;
7 import fr.in2p3.jsaga.adaptor.job.control.interactive.StreamableJobInteractiveSet;
8 import fr.in2p3.jsaga.adaptor.job.monitor.JobMonitorAdaptor;
9 import org.apache.log4j.Logger;
10 import org.globus.io.gass.server.GassServer;
11 import org.globus.io.gass.server.JobOutputStream;
12 import org.globus.rsl.*;
13 import org.ogf.saga.error.*;
14
15 import java.io.*;
16 import java.net.InetAddress;
17 import java.net.UnknownHostException;
18 import java.util.Map;
19
20
21
22
23
24
25
26
27
28
29
30
31
32 public class GatekeeperJobControlAdaptor extends GkCommonJobControlAdaptor implements JobControlAdaptor, CleanableJobAdaptor, StreamableJobInteractiveSet {
33 private static final String SHELLPATH = "ShellPath";
34 private Logger logger = Logger.getLogger(GatekeeperJobControlAdaptor.class);
35
36
37 public String getType() {
38 return "gatekeeper";
39 }
40
41
42 public Usage getUsage() {
43 return new UAnd.Builder()
44 .and(new UOptional(SHELLPATH))
45 .and(new UOptional(IP_ADDRESS))
46 .and(new UOptional(TCP_PORT_RANGE))
47 .build();
48 }
49
50
51 public Default[] getDefaults(Map attributes) throws IncorrectStateException {
52 try {
53 String defaultIp = InetAddress.getLocalHost().getHostAddress();
54 String defaultTcpPortRange="40000,45000";
55 return new Default[]{new Default(IP_ADDRESS, defaultIp),new Default(TCP_PORT_RANGE, defaultTcpPortRange)};
56 } catch (UnknownHostException e) {
57 return null;
58 }
59 }
60
61
62 public JobMonitorAdaptor getDefaultJobMonitor() {
63 return new GatekeeperJobMonitorAdaptor();
64 }
65
66 public String submitInteractive(String jobDesc, boolean checkMatch, InputStream stdin, OutputStream stdout, OutputStream stderr) throws PermissionDeniedException, TimeoutException, NoSuccessException {
67 RslNode rslTree;
68 String gassURL;
69 try {
70 rslTree = RSLParser.parse(jobDesc);
71 gassURL = startGassServer(stdout, stderr);
72 } catch (Exception e) {
73 throw new NoSuccessException(e);
74 }
75
76 Bindings subst = new Bindings("rsl_substitution");
77 subst.add(new Binding("GLOBUSRUN_GASS_URL", gassURL));
78 rslTree.add(subst);
79 if (stdin != null) {
80 File stdinFile;
81 try{stdinFile=File.createTempFile("stdin-",".txt",new File("./"));} catch(IOException e){throw new NoSuccessException(e);}
82
83 stdinFile.deleteOnExit();
84 save(stdin, stdinFile);
85 NameOpValue stdinUrl = new NameOpValue("stdin", NameOpValue.EQ,
86 new VarRef("GLOBUSRUN_GASS_URL", null, new Value("/"+stdinFile.getName())));
87 rslTree.add(stdinUrl);
88 }
89 NameOpValue stdoutUrl = new NameOpValue("stdout", NameOpValue.EQ,
90 new VarRef("GLOBUSRUN_GASS_URL", null, new Value("/dev/stdout-rgs")));
91 rslTree.add(stdoutUrl);
92 NameOpValue stderrUrl = new NameOpValue("stderr", NameOpValue.EQ,
93 new VarRef("GLOBUSRUN_GASS_URL", null, new Value("/dev/stderr-rgs")));
94 rslTree.add(stderrUrl);
95 return super.submit(rslTree, checkMatch, true);
96 }
97
98 private String startGassServer(OutputStream stdout, OutputStream stderr) throws Exception {
99 GassServer gassServer;
100 try {
101 gassServer = GassServerFactory.getGassServer(m_credential);
102 gassServer.registerDefaultDeactivator();
103 } catch (Exception e) {
104 throw new Exception("Problems while creating a Gass Server", e);
105 }
106 String gassURL = gassServer.getURL();
107 gassServer.registerJobOutputStream("out-rgs", new JobOutputStream(new GatekeeperJobOutputListener(stdout)));
108 gassServer.registerJobOutputStream("err-rgs", new JobOutputStream(new GatekeeperJobOutputListener(stderr)));
109 logger.debug("Started the GASS server");
110 return gassURL;
111 }
112
113 private static void save(InputStream in, File file) throws NoSuccessException {
114 try {
115 OutputStream out = new FileOutputStream(file);
116 byte[] buffer = new byte[1024];
117 for (int len; (len=in.read(buffer))>0; ) {
118 out.write(buffer, 0, len);
119 }
120 out.close();
121 } catch(IOException e) {
122 throw new NoSuccessException(e);
123 }
124 }
125 }