1 package org.ogf.saga.job.base;
2
3 import org.ogf.saga.JSAGABaseTest;
4 import org.ogf.saga.context.Context;
5 import org.ogf.saga.error.*;
6 import org.ogf.saga.job.*;
7 import org.ogf.saga.job.abstracts.Attribute;
8 import org.ogf.saga.job.abstracts.AttributeVector;
9 import org.ogf.saga.monitoring.*;
10 import org.ogf.saga.session.Session;
11 import org.ogf.saga.session.SessionFactory;
12 import org.ogf.saga.task.State;
13 import org.ogf.saga.task.Task;
14 import org.ogf.saga.url.URL;
15 import org.ogf.saga.url.URLFactory;
16
17 import java.text.DateFormat;
18 import java.text.SimpleDateFormat;
19 import java.util.Date;
20
21
22
23
24
25
26
27
28
29
30
31 public abstract class JobBaseTest extends JSAGABaseTest {
32 protected static final String MODEL = "JSAGA";
33
34
35 protected String CANDIDATE_HOST = "candidate.host";
36 protected String SIMPLE_JOB_BINARY = "simpleJobBinary";
37 protected String MAX_QUEUING_TIME = "maxQueuingTime";
38 protected String LONG_JOB_BINARY = "longJobBinary";
39 protected String LONG_JOB_DURATION = "longJobDuration";
40 protected String FINALY_TIMEOUT = "finalyTimeout";
41 protected String SIMULTANEOUS_JOB_NUMBER = "simultaneousJobNumber";
42
43
44 private static final String DEFAULT_LONG_JOB_DURATION = "30";
45 private static final String DEFAULT_FINALY_TIMEOUT = "60" ;
46 private static final String DEFAULT_SIMPLE_JOB_BINARY = "/bin/date" ;
47 private static final String DEFAULT_LONG_JOB_BINARY = "/bin/sleep" ;
48 private static final String DEFAULT_MAX_QUEUING_TIME = "60";
49 private static final String DEFAULT_SIMULTANEOUS_JOB_NUMBER = "5";
50
51
52 protected URL m_jobservice;
53 protected String m_candidateHost;
54 protected Session m_session;
55
56 protected JobBaseTest(String jobprotocol) throws Exception {
57 super();
58
59
60 m_jobservice = URLFactory.createURL(getRequiredProperty(jobprotocol, CONFIG_JOBSERVICE_URL));
61 m_candidateHost = super.getOptionalProperty(jobprotocol, CANDIDATE_HOST);
62 m_session = SessionFactory.createSession(true);
63
64
65 SIMPLE_JOB_BINARY = super.getOptionalProperty(jobprotocol, SIMPLE_JOB_BINARY, DEFAULT_SIMPLE_JOB_BINARY);
66 LONG_JOB_BINARY = super.getOptionalProperty(jobprotocol, LONG_JOB_BINARY, DEFAULT_LONG_JOB_BINARY);
67 LONG_JOB_DURATION = super.getOptionalProperty(jobprotocol, LONG_JOB_DURATION, DEFAULT_LONG_JOB_DURATION);
68 FINALY_TIMEOUT = super.getOptionalProperty(jobprotocol, FINALY_TIMEOUT, DEFAULT_FINALY_TIMEOUT);
69 MAX_QUEUING_TIME = super.getOptionalProperty(jobprotocol, MAX_QUEUING_TIME, DEFAULT_MAX_QUEUING_TIME);
70 SIMULTANEOUS_JOB_NUMBER = super.getOptionalProperty(jobprotocol, SIMULTANEOUS_JOB_NUMBER, DEFAULT_SIMULTANEOUS_JOB_NUMBER);
71 }
72
73
74
75
76
77
78
79 protected Job createJob(JobDescription desc) throws Exception {
80 JobService service = JobFactory.createJobService(m_session, m_jobservice);
81 Job job = service.createJob(desc);
82 return job;
83 }
84
85
86
87
88
89
90
91 protected Job runJob(JobDescription desc) throws Exception {
92 Job job = createJob(desc);
93 job.run();
94 return job;
95 }
96
97
98
99
100
101
102
103
104 protected JobDescription createJob(String executable, Attribute[] attributes, AttributeVector[] attributesVector) throws Exception {
105
106 JobDescription desc = JobFactory.createJobDescription();
107 desc.setAttribute(JobDescription.EXECUTABLE, executable);
108 desc.setAttribute(JobDescription.OUTPUT, "stdout.txt");
109 desc.setAttribute(JobDescription.ERROR, "stderr.txt");
110 if (m_candidateHost != null) {
111 desc.setVectorAttribute(JobDescription.CANDIDATEHOSTS, new String[]{m_candidateHost});
112 }
113 if(attributes != null) {
114 for (int i = 0; i < attributes.length; i++) {
115 desc.setAttribute(attributes[i].getKey(), attributes[i].getValue());
116 }
117 }
118 if(attributesVector != null) {
119 for (int i = 0; i < attributesVector.length; i++) {
120 desc.setVectorAttribute(attributesVector[i].getKey(), attributesVector[i].getValue());
121 }
122 }
123 return desc;
124 }
125
126
127
128
129
130
131 protected JobDescription createSimpleJob() throws Exception {
132 return createJob(SIMPLE_JOB_BINARY, null, null);
133 }
134
135
136
137
138
139
140
141
142 protected JobDescription createWriteJob(String textToPrint) throws Exception {
143 AttributeVector[] attributesV = new AttributeVector[1];
144 attributesV[0] = new AttributeVector(JobDescription.ARGUMENTS,new String[]{textToPrint});
145 return createJob("/bin/echo", null, attributesV);
146 }
147
148
149
150
151
152
153 protected JobDescription createErrorJob() throws Exception {
154 return createJob("/bin/command-error", null, null);
155 }
156
157
158
159
160
161
162 protected JobDescription createLongJob() throws Exception {
163 AttributeVector[] attributesV = new AttributeVector[1];
164 attributesV[0] = new AttributeVector(JobDescription.ARGUMENTS, new String[]{LONG_JOB_DURATION});
165 return createJob(LONG_JOB_BINARY, null, attributesV);
166 }
167
168
169
170
171
172
173
174 protected void checkStatus(State jobState, State wantedStatus) throws Exception {
175 if(jobState != wantedStatus) {
176 fail("Invalid status "+jobState+": must be "+wantedStatus+".");
177 }
178 }
179
180 protected void printCurrentDate() {
181 DateFormat df = new SimpleDateFormat("yyyy/MM/dd-HH:mm:ss");
182 System.out.println(df.format(new Date()));
183 }
184
185 protected void printStatus(Job job) throws NotImplementedException, TimeoutException, NoSuccessException {
186 System.out.println("Status: "+job.getState());
187 }
188
189
190 private String m_subState;
191 protected boolean waitForSubState(Job job, String subState) throws Exception {
192 float timeoutInSeconds = Float.valueOf(MAX_QUEUING_TIME);
193 int cookie = job.addCallback(Job.JOB_STATEDETAIL, new Callback(){
194 public boolean cb(Monitorable mt, Metric metric, Context ctx) throws NotImplementedException, AuthorizationFailedException {
195 try {
196 m_subState = metric.getAttribute(Metric.VALUE);
197 } catch (Exception e) {
198 throw new NotImplementedException(e);
199 }
200 return true;
201 }
202 });
203 m_subState = job.getMetric(Job.JOB_STATEDETAIL).getAttribute(Metric.VALUE);
204 try {
205 boolean forever;
206 long endTime;
207 if (timeoutInSeconds == Task.WAIT_FOREVER) {
208 forever = true;
209 endTime = -1;
210 } else if (timeoutInSeconds == Task.NO_WAIT) {
211 forever = false;
212 endTime = -1;
213 } else {
214 forever = false;
215 endTime = System.currentTimeMillis() + (long) timeoutInSeconds*1000;
216 }
217 while(!this.isEndedOrSubState(subState) && (forever || System.currentTimeMillis()<endTime)) {
218 Thread.currentThread().sleep(100);
219 }
220 } catch (InterruptedException e) {
221 job.removeCallback(Job.JOB_STATEDETAIL, cookie);
222 return this.isEndedOrSubState(subState);
223 }
224
225 private boolean isEndedOrSubState( String subState) {
226 return m_subState.equals(subState) ||
227 m_subState.equals(MODEL+":CANCELED") ||
228 m_subState.equals(MODEL+":DONE") ||
229 m_subState.equals(MODEL+":FAILED_ERROR") ||
230 m_subState.equals(MODEL+":FAILED_ABORTED");
231 }
232 }