View Javadoc

1   package org.ogf.saga.job.run;
2   
3   import org.junit.After;
4   import org.junit.Before;
5   import org.junit.Test;
6   import org.ogf.saga.buffer.Buffer;
7   import org.ogf.saga.buffer.BufferFactory;
8   import org.ogf.saga.error.IncorrectStateException;
9   import org.ogf.saga.file.FileFactory;
10  import org.ogf.saga.job.Job;
11  import org.ogf.saga.job.JobDescription;
12  import org.ogf.saga.job.JobFactory;
13  import org.ogf.saga.job.JobService;
14  import org.ogf.saga.job.base.JobBaseTest;
15  import org.ogf.saga.namespace.Flags;
16  import org.ogf.saga.task.State;
17  import org.ogf.saga.url.URLFactory;
18  
19  import java.io.File;
20  import java.io.FileInputStream;
21  import java.io.FileOutputStream;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.net.URI;
25  import java.util.UUID;
26  
27  /* ***************************************************
28  * *** Centre de Calcul de l'IN2P3 - Lyon (France) ***
29  * ***             http://cc.in2p3.fr/             ***
30  * ***************************************************
31  * File:   JobRunSandboxTest
32  * Author: Sylvain Reynaud (sreynaud@in2p3.fr)
33  * Author: lionel.schwarz@in2p3.fr
34  * Date:   7 NOV 2013
35  * ***************************************************
36  * Description:                                      */
37  /**
38   *
39   */
40  public abstract class SandboxTest extends JobBaseTest {
41      private static final String SCRIPT_IMPLICIT = "/usr/bin/tr 'ou' 'ui'";
42      protected String m_scriptExplicit = "#!/bin/bash\n/bin/cat ${1##file:} | "+SCRIPT_IMPLICIT+" > ${2##file:}";
43      private static final String INPUT_CONTENT = "coucou";
44      private static final String OUTPUT_CONTENT = "cuicui";
45      private static final File TMP = new File(System.getProperty("java.io.tmpdir"));
46  
47      private UUID m_uuid;
48  
49      protected SandboxTest(String jobprotocol) throws Exception {
50          super(jobprotocol);
51      }
52  
53      @Before
54      public void setUp() {
55          m_uuid = UUID.randomUUID();
56      }
57  
58      @After
59      public void tearDown() {
60          m_uuid = null;
61      }
62  
63      @Test
64      public void test_remote_input_explicit() throws Exception {
65          this.runJobExplicit(getRemote("input"), getWorker("input"), getLocal("output"), getWorker("output"));
66      }
67  
68      @Test
69      public void test_remote_output_explicit() throws Exception {
70          this.runJobExplicit(getLocal("input"), getWorker("input"), getRemote("output"), getWorker("output"));
71      }
72  
73      @Test
74      public void test_input_output_explicit() throws Exception {
75          this.runJobExplicit(getLocal("input"), getWorker("input"), getLocal("output"), getWorker("output"));
76      }
77  
78      @Test
79      public void test_input_output_implicit() throws Exception {
80          this.runJobImplicit(getLocal("input"), getWorker("input"), getLocal("output"), getWorker("output"));
81      }
82  
83      @Test
84      public void test_output_only_implicit() throws Exception {
85          Object localOutput = getLocal("output");
86          Object workerOutput = getWorker("output");
87  
88          // create job
89          JobDescription desc = JobFactory.createJobDescription();
90          desc.setAttribute(JobDescription.EXECUTABLE, "/bin/echo");
91          desc.setVectorAttribute(JobDescription.ARGUMENTS, new String[]{"cuicui"});
92          desc.setVectorAttribute(JobDescription.FILETRANSFER, new String[]{
93                  localOutput+" < "+workerOutput
94          });
95          desc.setAttribute(JobDescription.OUTPUT, workerOutput.toString());
96  
97          // run job
98          String outputContent = this.runAndGetOutput(desc, localOutput);
99          assertEquals(OUTPUT_CONTENT, outputContent.trim());
100 
101         // cleanup
102         this.cleanup(localOutput);
103     }
104 
105     @Test
106     public void test_output_workingDirectory() throws Exception {
107         Object localOutput = getLocal("output");
108         Object workerOutput = getWorker("output");
109 
110         // create job
111         JobDescription desc = JobFactory.createJobDescription();
112         desc.setAttribute(JobDescription.EXECUTABLE, "/bin/pwd");
113     	desc.setAttribute(JobDescription.WORKINGDIRECTORY, "/tmp");
114         desc.setVectorAttribute(JobDescription.FILETRANSFER, new String[]{
115                 localOutput+" < "+workerOutput
116         });
117         desc.setAttribute(JobDescription.OUTPUT, workerOutput.toString());
118 
119         // run job
120         String outputContent = this.runAndGetOutput(desc, localOutput);
121         assertEquals("/tmp", outputContent.trim());
122 
123         // cleanup
124         this.cleanup(localOutput);
125     }
126 
127     //////////////////////////////////////////// private methods ////////////////////////////////////////////
128 
129     protected File getLocal(String suffix) {
130         return new File(TMP, "local-"+m_uuid+"."+suffix);
131     }
132     protected URI getRemote(String suffix) {
133         return new File(TMP, "remote-"+m_uuid+"."+suffix).toURI();
134     }
135     protected String getWorker(String suffix) {
136         return "worker-"+m_uuid+"."+suffix;
137     }
138 
139     protected void runJobExplicit(Object localInput, Object workerInput, Object localOutput, Object workerOutput) throws Exception {
140         this.runJob(true, getLocal("sh"), getWorker("sh"), localInput, workerInput, localOutput, workerOutput);
141     }
142     protected void runJobImplicit(Object localInput, Object workerInput, Object localOutput, Object workerOutput) throws Exception {
143         this.runJob(false, getLocal("sh"), getWorker("sh"), localInput, workerInput, localOutput, workerOutput);
144     }
145     private void runJob(boolean explicitRedirect, File localScript, String workerScript, Object localInput, Object workerInput, Object localOutput, Object workerOutput) throws Exception {
146         // prepare
147         this.put(localInput, INPUT_CONTENT.getBytes());
148 
149         // create job
150         JobDescription desc = JobFactory.createJobDescription();
151         desc.setAttribute(JobDescription.EXECUTABLE, workerScript);
152         desc.setVectorAttribute(JobDescription.FILETRANSFER, new String[]{
153                 localScript+" > "+workerScript,
154                 localInput+" > "+workerInput,
155                 localOutput+" < "+workerOutput
156         });
157         if (explicitRedirect) {
158             this.put(localScript, m_scriptExplicit.getBytes());
159             desc.setVectorAttribute(JobDescription.ARGUMENTS, new String[]{workerInput.toString(), workerOutput.toString()});
160         } else {
161             this.put(localScript, SCRIPT_IMPLICIT.getBytes());
162             desc.setAttribute(JobDescription.INPUT, workerInput.toString());
163             desc.setAttribute(JobDescription.OUTPUT, workerOutput.toString());
164         }
165 
166         // run job
167         String outputContent = this.runAndGetOutput(desc, localOutput);
168         assertEquals(OUTPUT_CONTENT, outputContent);
169 
170         // cleanup
171         this.cleanup(localScript);
172         this.cleanup(localInput);
173         this.cleanup(localOutput);
174     }
175 
176     private String runAndGetOutput(JobDescription desc, Object localOutput) throws Exception {
177         // submit
178         JobService service = JobFactory.createJobService(m_session, m_jobservice);
179         Job job = service.createJob(desc);
180         job.run();
181         System.out.println(job.getAttribute(Job.JOBID));
182 
183         // wait
184         job.waitFor();
185 
186         
187         // for debugging
188         // only works for interactive jobs
189         if (State.FAILED.equals(job.getState())) {
190         	try {
191 	            // print stderr
192 	            byte[] buffer = new byte[1024];
193 	            InputStream stderr = job.getStderr();
194 	            for (int len; (len=stderr.read(buffer))>-1; ) {
195 	                System.err.write(buffer, 0, len);
196 	            }
197 	            stderr.close();
198 	
199 	            // rethrow exception
200 	            job.rethrow();
201         	} catch (IncorrectStateException is) {
202         		// ignore
203         	}
204         }
205 
206         // check job status
207         assertEquals(State.DONE, job.getState());
208 
209         // return job output content
210         return this.get(localOutput);
211     }
212 
213     private void put(Object location, byte[] content) throws Exception {
214         if (location instanceof File) {
215             File file = (File) location;
216             OutputStream out = new FileOutputStream(file);
217             out.write(content);
218             out.close();
219         } else if (location instanceof URI) {
220             URI uri = (URI) location;
221             org.ogf.saga.file.File file = FileFactory.createFile(m_session, URLFactory.createURL(uri.toString()), Flags.CREATE.getValue());
222             file.write(BufferFactory.createBuffer(content));
223             file.close();
224         } else {
225             throw new Exception("Unexpected class: "+location.getClass());
226         }
227     }
228 
229     private String get(Object location) throws Exception {
230         final int BUFFER_SIZE = 1024;
231         if (location instanceof File) {
232             File file = (File) location;
233             byte[] buffer = new byte[BUFFER_SIZE];
234             InputStream in = new FileInputStream(file);
235             int len = in.read(buffer);
236             in.close();
237 
238             // check not empty
239             boolean isNotEmpty = (len > -1);
240             assertTrue("File is empty: "+file, isNotEmpty);
241             return new String(buffer, 0, len);
242         } else if (location instanceof URI) {
243             URI uri = (URI) location;
244             Buffer buffer = BufferFactory.createBuffer(BUFFER_SIZE);
245             org.ogf.saga.file.File file = FileFactory.createFile(m_session, URLFactory.createURL(uri.toString()), Flags.READ.getValue());
246             int len = file.read(buffer);
247             file.close();
248 
249             // check not empty
250             boolean isNotEmpty = (len > -1);
251             assertTrue("File is empty: "+uri, isNotEmpty);
252             return new String(buffer.getData(), 0, len);
253         } else {
254             throw new Exception("Unexpected class: "+location.getClass());
255         }
256     }
257 
258     private void cleanup(Object location) throws Exception {
259         if (location instanceof File) {
260             File file = (File) location;
261             if (! file.delete()) {
262                 throw new Exception("Failed to remove file: "+file);
263             }
264         } else if (location instanceof URI) {
265             URI uri = (URI) location;
266             org.ogf.saga.file.File file = FileFactory.createFile(m_session, URLFactory.createURL(uri.toString()), Flags.NONE.getValue());
267             file.remove();
268             file.close();
269         } else {
270             throw new Exception("Unexpected class: "+location.getClass());
271         }
272     }
273 }