1 package fr.in2p3.jsaga.adaptor.job;
2
3 import fr.in2p3.jsaga.adaptor.base.defaults.Default;
4 import fr.in2p3.jsaga.adaptor.base.usage.*;
5 import fr.in2p3.jsaga.adaptor.job.monitor.JobStatusNotifier;
6 import fr.in2p3.jsaga.adaptor.job.monitor.ListenFilteredJob;
7 import org.globus.io.gass.server.GassServer;
8 import org.globus.io.gass.server.JobOutputStream;
9 import org.ogf.saga.error.*;
10
11 import java.net.InetAddress;
12 import java.net.UnknownHostException;
13 import java.util.Map;
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 public class LCGCEJobMonitorAdaptor extends GatekeeperJobAdaptorAbstract implements ListenFilteredJob {
29 private static final String GASS_PATH = "out";
30
31 private GassServer m_gass;
32 private LCGCEJobMonitorWatchdog m_watchdog;
33
34 public String getType() {
35 return "lcgce";
36 }
37
38
39 public Usage getUsage() {
40 return new UAnd.Builder()
41 .and(new UOptional(IP_ADDRESS))
42 .and(new UOptional(TCP_PORT_RANGE))
43 .build();
44 }
45
46
47 public Default[] getDefaults(Map attributes) throws IncorrectStateException {
48 String defaultIp;
49 try {
50 defaultIp = InetAddress.getLocalHost().getHostAddress();
51 } catch (UnknownHostException e) {
52 defaultIp = null;
53 }
54 return new Default[]{
55 new Default(IP_ADDRESS, defaultIp),
56 new Default(TCP_PORT_RANGE, "40000,45000")
57 };
58 }
59
60 public void connect(String userInfo, String host, int port, String basePath, Map attributes) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, BadParameterException, TimeoutException, NoSuccessException {
61 super.connect(userInfo, host, port, basePath, attributes);
62
63
64 m_watchdog = new LCGCEJobMonitorWatchdog(m_credential, host, port);
65 }
66
67 public void disconnect() throws NoSuccessException {
68 super.disconnect();
69
70
71 m_watchdog.stopAll();
72 }
73
74 public void subscribeFilteredJob(JobStatusNotifier notifier) throws TimeoutException, NoSuccessException {
75 try {
76 m_gass = new GassServer(m_credential, 0);
77 } catch (Exception e) {
78 throw new NoSuccessException("Failed to create Gass Server", e);
79 }
80 m_gass.registerDefaultDeactivator();
81 m_gass.registerJobOutputStream(GASS_PATH, new JobOutputStream(new LCGCEJobMonitorListener(notifier)));
82 }
83
84 public void unsubscribeFilteredJob() throws TimeoutException, NoSuccessException {
85 m_gass.unregisterJobOutputStream(GASS_PATH);
86 m_gass.unregisterDefaultDeactivator();
87 m_gass = null;
88 }
89 }