View Javadoc

1   package org.ogf.saga.job.base;
2   
3   import org.ogf.saga.JSAGABaseTest;
4   import org.ogf.saga.context.Context;
5   import org.ogf.saga.error.*;
6   import org.ogf.saga.job.*;
7   import org.ogf.saga.job.abstracts.Attribute;
8   import org.ogf.saga.job.abstracts.AttributeVector;
9   import org.ogf.saga.monitoring.*;
10  import org.ogf.saga.session.Session;
11  import org.ogf.saga.session.SessionFactory;
12  import org.ogf.saga.task.State;
13  import org.ogf.saga.task.Task;
14  import org.ogf.saga.url.URL;
15  import org.ogf.saga.url.URLFactory;
16  
17  import java.text.DateFormat;
18  import java.text.SimpleDateFormat;
19  import java.util.Date;
20  
21  /* ***************************************************
22  * *** Centre de Calcul de l'IN2P3 - Lyon (France) ***
23  * ***             http://cc.in2p3.fr/             ***
24  * ***************************************************
25  * File:   AbstractJobTest
26  * Author: Sylvain Reynaud (sreynaud@in2p3.fr)
27  * Date:   12 nov. 2007
28  * ***************************************************
29  * Description:                                      */
30  
31  public abstract class JobBaseTest extends JSAGABaseTest {
32      protected static final String MODEL = "JSAGA";
33  	
34  	// values
35      protected String CANDIDATE_HOST             = "candidate.host";
36  	protected String SIMPLE_JOB_BINARY 			= "simpleJobBinary";
37  	protected String MAX_QUEUING_TIME 			= "maxQueuingTime";
38  	protected String LONG_JOB_BINARY  			= "longJobBinary";
39  	protected String LONG_JOB_DURATION 			= "longJobDuration";
40  	protected String FINALY_TIMEOUT  			= "finalyTimeout";
41  	protected String SIMULTANEOUS_JOB_NUMBER	= "simultaneousJobNumber";
42  		
43  	// set default values
44  	private static final String DEFAULT_LONG_JOB_DURATION 			= "30";
45  	private static final String DEFAULT_FINALY_TIMEOUT 				= "60" ;
46  	private static final String DEFAULT_SIMPLE_JOB_BINARY 			= "/bin/date" ;
47  	private static final String DEFAULT_LONG_JOB_BINARY 			= "/bin/sleep" ;
48  	private static final String DEFAULT_MAX_QUEUING_TIME 			= "60";
49  	private static final String DEFAULT_SIMULTANEOUS_JOB_NUMBER 	= "5";
50  
51      // configuration
52      protected URL m_jobservice;
53      protected String m_candidateHost;
54      protected Session m_session;
55  
56      protected JobBaseTest(String jobprotocol) throws Exception {
57          super();
58  
59          // configure
60          m_jobservice = URLFactory.createURL(getRequiredProperty(jobprotocol, CONFIG_JOBSERVICE_URL));
61          m_candidateHost = super.getOptionalProperty(jobprotocol, CANDIDATE_HOST);
62          m_session = SessionFactory.createSession(true);
63          
64          // init values
65          SIMPLE_JOB_BINARY = super.getOptionalProperty(jobprotocol, SIMPLE_JOB_BINARY, DEFAULT_SIMPLE_JOB_BINARY);
66         	LONG_JOB_BINARY = super.getOptionalProperty(jobprotocol, LONG_JOB_BINARY, DEFAULT_LONG_JOB_BINARY);
67         	LONG_JOB_DURATION = super.getOptionalProperty(jobprotocol, LONG_JOB_DURATION, DEFAULT_LONG_JOB_DURATION);	
68          FINALY_TIMEOUT = super.getOptionalProperty(jobprotocol, FINALY_TIMEOUT, DEFAULT_FINALY_TIMEOUT);
69          MAX_QUEUING_TIME = super.getOptionalProperty(jobprotocol, MAX_QUEUING_TIME, DEFAULT_MAX_QUEUING_TIME);
70          SIMULTANEOUS_JOB_NUMBER = super.getOptionalProperty(jobprotocol, SIMULTANEOUS_JOB_NUMBER, DEFAULT_SIMULTANEOUS_JOB_NUMBER);       	
71      }
72      
73      /**
74       * Creates a new job
75       * @param desc The job description
76       * @return The new job
77       * @throws Exception
78       */
79      protected Job createJob(JobDescription desc) throws Exception  {
80          JobService service = JobFactory.createJobService(m_session, m_jobservice);
81          Job job = service.createJob(desc);
82          return job;
83      }
84      
85      /**
86       * Runs a job
87       * @param desc The job description
88       * @return The running job
89       * @throws Exception
90       */
91      protected Job runJob(JobDescription desc) throws Exception  {
92          Job job = createJob(desc);
93          job.run();
94          return job;
95      }
96      
97      /**
98       * Creates a new job description
99       * @param executable A string with the executable path
100      * @param attributes A string array with the job attributes
101      * @return The job description
102      * @throws Exception
103      */
104     protected JobDescription createJob(String executable, Attribute[] attributes, AttributeVector[] attributesVector) throws Exception {
105     	// prepare
106         JobDescription desc = JobFactory.createJobDescription();
107         desc.setAttribute(JobDescription.EXECUTABLE, executable);
108         desc.setAttribute(JobDescription.OUTPUT, "stdout.txt");
109         desc.setAttribute(JobDescription.ERROR, "stderr.txt");
110         if (m_candidateHost != null) {
111             desc.setVectorAttribute(JobDescription.CANDIDATEHOSTS, new String[]{m_candidateHost});
112         }
113         if(attributes != null) {
114         	for (int i = 0; i < attributes.length; i++) {
115         		desc.setAttribute(attributes[i].getKey(), attributes[i].getValue());
116 			}
117         }
118         if(attributesVector != null) {
119         	for (int i = 0; i < attributesVector.length; i++) {
120         		desc.setVectorAttribute(attributesVector[i].getKey(), attributesVector[i].getValue());
121 			}
122         }
123         return desc;
124     }
125 
126     /**
127      *  Very simple job which prints the execution date
128      * @return The job description
129      * @throws Exception
130      */
131     protected JobDescription createSimpleJob() throws Exception {
132     	return createJob(SIMPLE_JOB_BINARY, null, null);
133     }
134     
135     
136     /**
137      * Job which write 'Test' on stdout
138      * @param textToPrint The string to print in stdout
139      * @return The job description
140      * @throws Exception
141      */
142     protected JobDescription createWriteJob(String textToPrint) throws Exception {
143     	AttributeVector[] attributesV = new AttributeVector[1];
144     	attributesV[0] = new AttributeVector(JobDescription.ARGUMENTS,new String[]{textToPrint});    	
145     	return createJob("/bin/echo", null, attributesV);
146     }
147     
148     /**
149      * Job which generate error like 'Command not found' on stderr
150      * @return The job description
151      * @throws Exception
152      */
153     protected JobDescription createErrorJob() throws Exception {
154     	return createJob("/bin/command-error", null, null);
155     }
156     
157     /**
158      * Long job which sleeps 30 seconds
159      * @return The job description
160      * @throws Exception
161      */
162     protected JobDescription createLongJob() throws Exception {
163     	AttributeVector[] attributesV = new AttributeVector[1];
164     	attributesV[0] = new AttributeVector(JobDescription.ARGUMENTS, new String[]{LONG_JOB_DURATION});
165     	return createJob(LONG_JOB_BINARY, null, attributesV);
166     }
167 
168     /**
169      * Check if the required status is the job status
170      * @param jobState the job status
171      * @param wantedStatus The required status
172      * @throws Exception
173      */
174     protected void checkStatus(State jobState, State wantedStatus) throws Exception {
175     	if(jobState != wantedStatus) {
176         	fail("Invalid status "+jobState+": must be "+wantedStatus+".");
177         }
178 	}    
179     
180     protected void printCurrentDate() {
181     	DateFormat df = new SimpleDateFormat("yyyy/MM/dd-HH:mm:ss");
182         System.out.println(df.format(new Date()));
183 	}
184 
185     protected void printStatus(Job job) throws NotImplementedException, TimeoutException, NoSuccessException {
186 		System.out.println("Status: "+job.getState());
187 	}
188 
189 
190     private String m_subState;
191     protected boolean waitForSubState(Job job, String subState) throws Exception {
192     	float timeoutInSeconds = Float.valueOf(MAX_QUEUING_TIME);
193     	int cookie = job.addCallback(Job.JOB_STATEDETAIL, new Callback(){
194             public boolean cb(Monitorable mt, Metric metric, Context ctx) throws NotImplementedException, AuthorizationFailedException {
195                 try {
196                 	m_subState = metric.getAttribute(Metric.VALUE);
197                 } catch (Exception e) {
198                     throw new NotImplementedException(e);
199                 }
200                 return true;
201             }
202         });
203     	m_subState = job.getMetric(Job.JOB_STATEDETAIL).getAttribute(Metric.VALUE);        
204         try {
205             boolean forever;
206             long endTime;
207             if (timeoutInSeconds == Task.WAIT_FOREVER) {
208                 forever = true;
209                 endTime = -1;
210             } else if (timeoutInSeconds == Task.NO_WAIT) {
211                 forever = false;
212                 endTime = -1;
213             } else {
214                 forever = false;
215                 endTime = System.currentTimeMillis() + (long) timeoutInSeconds*1000;
216             }
217             while(!this.isEndedOrSubState(subState) && (forever || System.currentTimeMillis()<endTime)) {
218             	Thread.currentThread().sleep(100);
219             }
220         } catch (InterruptedException e) {/*ignore*/}
221         job.removeCallback(Job.JOB_STATEDETAIL, cookie);
222         return this.isEndedOrSubState(subState);
223     }
224     
225     private boolean isEndedOrSubState( String subState) {
226     	return m_subState.equals(subState) ||
227     		m_subState.equals(MODEL+":CANCELED") ||
228     		m_subState.equals(MODEL+":DONE") ||
229     		m_subState.equals(MODEL+":FAILED_ERROR") ||
230     		m_subState.equals(MODEL+":FAILED_ABORTED");
231     }
232 }