View Javadoc

1   package fr.in2p3.jsaga.adaptor.job;
2   
3   import fr.in2p3.jsaga.adaptor.base.defaults.Default;
4   import fr.in2p3.jsaga.adaptor.base.usage.*;
5   import fr.in2p3.jsaga.adaptor.job.monitor.JobStatusNotifier;
6   import fr.in2p3.jsaga.adaptor.job.monitor.ListenIndividualJob;
7   import org.apache.log4j.Logger;
8   import org.globus.gram.*;
9   import org.globus.gram.internal.GRAMConstants;
10  import org.ietf.jgss.GSSException;
11  import org.ogf.saga.error.*;
12  
13  import java.net.InetAddress;
14  import java.net.UnknownHostException;
15  import java.util.Map;
16  
17  /* ***************************************************
18  * *** Centre de Calcul de l'IN2P3 - Lyon (France) ***
19  * ***             http://cc.in2p3.fr/             ***
20  * ***************************************************
21  * File:   GatekeeperJobMonitorAdaptor
22  * Author: Sylvain Reynaud (sreynaud@in2p3.fr)
23  * Date:   16 nov. 2007
24  * ***************************************************
25  * Description:                                      */
26  /**
27   *
28   */
29  public class GatekeeperJobMonitorAdaptor extends GkCommonJobMonitorAdaptor implements ListenIndividualJob  {
30  	private Logger logger = Logger.getLogger(GatekeeperJobMonitorAdaptor.class.getName());	
31  
32      /** override super.getType() */
33      public String getType() {
34          return "gatekeeper";
35      }
36  
37      /** override super.getUsage() */
38      public Usage getUsage() {
39          return new UAnd.Builder()
40                          .and(new UOptional(IP_ADDRESS))
41                          .and(new UOptional(TCP_PORT_RANGE))
42                          .build();
43      }
44  
45      /** override super.getDefaults() */
46      public Default[] getDefaults(Map attributes) throws IncorrectStateException {
47      	try {
48  			String defaultIp = InetAddress.getLocalHost().getHostAddress();
49  			String defaultTcpPortRange="40000,45000";
50  			return new Default[]{new Default(IP_ADDRESS, defaultIp),new Default(TCP_PORT_RANGE, defaultTcpPortRange)};
51  		} catch (UnknownHostException e) {
52  			return null;
53  		}
54      }
55  
56      public void subscribeJob(String nativeJobId, JobStatusNotifier notifier) throws TimeoutException, NoSuccessException {
57          GramJob job = getGramJobById(nativeJobId);
58          GramJobListener listener = new GatekeeperJobStatusListener(notifier);
59          job.addListener(listener);
60          try {
61          	job.bind();
62          } catch (GramException e) {
63          	job.removeListener(listener);
64              if (e.getErrorCode() == GramException.CONNECTION_FAILED ||
65              		e.getErrorCode() == GramException.JOB_QUERY_DENIAL ||
66              		e.getErrorCode() == GramException.HTTP_UNFRAME_FAILED) {
67                  //WARNING: Globus does not distinguish job DONE and job manager stopped
68              	logger.warn("Globus job manager may be stopped: status DONE returned in subscribeJob() for job "+nativeJobId);
69                  GatekeeperJobStatus status = new GatekeeperJobStatus(nativeJobId, new Integer(GRAMConstants.STATUS_DONE), "DONE");
70                  notifier.notifyChange(status);
71              } else {
72                  super.rethrowException(e);
73              }
74          } catch (GSSException e) {
75              job.removeListener(listener);
76              throw new NoSuccessException(e);
77          }
78      }
79  
80      public void unsubscribeJob(String nativeJobId) throws TimeoutException, NoSuccessException {
81          GramJob job = getGramJobById(nativeJobId);
82          try {
83              job.unbind();
84          } catch (GramException e) {
85          	if (e.getErrorCode() == GramException.CONNECTION_FAILED ||
86              		e.getErrorCode() == GramException.JOB_QUERY_DENIAL ||
87              		e.getErrorCode() == GramException.HTTP_UNFRAME_FAILED) {
88                  // ignore (Globus does not distinguish job DONE and job manager stopped)
89              } else {
90                  super.rethrowException(e);
91              }
92          } catch (GSSException e) {
93              throw new NoSuccessException(e);
94          }
95          //NOTICE: no need to remove GramJobListener since job is unregistered from CallbackHandler
96      }
97  }