View Javadoc

1   package fr.in2p3.jsaga.impl.job.staging.mgr;
2   
3   import fr.in2p3.jsaga.helpers.StringArray;
4   import fr.in2p3.jsaga.impl.job.instance.AbstractSyncJobImpl;
5   import fr.in2p3.jsaga.impl.job.staging.*;
6   import org.ogf.saga.error.*;
7   import org.ogf.saga.file.Directory;
8   import org.ogf.saga.job.JobDescription;
9   
10  import java.io.*;
11  import java.util.ArrayList;
12  import java.util.List;
13  
14  /* ***************************************************
15   * *** Centre de Calcul de l'IN2P3 - Lyon (France) ***
16   * ***             http://cc.in2p3.fr/             ***
17   * ***************************************************
18   * File:   DataStagingManagerThroughStream
19   * Author: Sylvain Reynaud (sreynaud@in2p3.fr)
20   * Date:   9 mars 2010
21   * ***************************************************
22   * Description:                                      */
23  /**
24   *
25   */
26  public class DataStagingManagerThroughStream implements DataStagingManager {
27      private List<InputDataStagingToRemote> m_inputToRemote;
28      private List<InputDataStagingToWorker> m_inputToWorker;
29      private List<OutputDataStagingFromRemote> m_outputFromRemote;
30      private List<OutputDataStagingFromWorker> m_outputFromWorker;
31  
32      // job description
33      private String m_executable;
34      private String[] m_arguments;
35      private StringBuffer m_redirections;
36  
37      public DataStagingManagerThroughStream(String[] fileTransfer) throws NotImplementedException, BadParameterException, NoSuccessException {
38          // create
39          m_inputToRemote = new ArrayList<InputDataStagingToRemote>();
40          m_inputToWorker = new ArrayList<InputDataStagingToWorker>();
41          m_outputFromRemote = new ArrayList<OutputDataStagingFromRemote>();
42          m_outputFromWorker = new ArrayList<OutputDataStagingFromWorker>();
43          m_executable = null;
44          m_arguments = null;
45          m_redirections = null;
46  
47          // init
48          for (String ft : fileTransfer) {
49              AbstractDataStaging dataStaging = DataStagingFactory.create(ft);
50              if (dataStaging instanceof InputDataStagingToRemote) {
51                  m_inputToRemote.add((InputDataStagingToRemote) dataStaging);
52              } else if (dataStaging instanceof InputDataStagingToWorker) {
53                  m_inputToWorker.add((InputDataStagingToWorker) dataStaging);
54              } else if (dataStaging instanceof OutputDataStagingFromRemote) {
55                  m_outputFromRemote.add((OutputDataStagingFromRemote) dataStaging);
56              } else if (dataStaging instanceof OutputDataStagingFromWorker) {
57                  m_outputFromWorker.add((OutputDataStagingFromWorker) dataStaging);
58              } else {
59                  throw new BadParameterException("[INTERNAL ERROR] Unexpected class: "+dataStaging.getClass());
60              }
61          }
62      }
63  
64      public JobDescription modifyJobDescription(final JobDescription jobDesc) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, TimeoutException, NoSuccessException {
65          try {
66              // clone jobDesc and modify clone
67              JobDescription newJobDesc = (JobDescription) jobDesc.clone();
68  
69              // remove FileTransfer because it is not supported by plugin
70              try {
71                  newJobDesc.removeAttribute(JobDescription.FILETRANSFER);
72              } catch (DoesNotExistException e) {
73                  // ignore
74              }
75  
76              // remove options managed by generated script from job description
77              m_redirections = new StringBuffer();
78              if (this.needsStdin()) {
79                  // check job is not interactive
80                  try {
81                      if ("true".equalsIgnoreCase(jobDesc.getAttribute(JobDescription.INTERACTIVE))) {
82                          throw new BadParameterException("Option "+JobDescription.FILETRANSFER+" can not be used with option "+JobDescription.INTERACTIVE);
83                      }
84                  } catch (DoesNotExistException e) {
85                      // ignore
86                  }
87  
88                  newJobDesc.setAttribute(JobDescription.INTERACTIVE, "true");
89  
90                  m_executable = jobDesc.getAttribute(JobDescription.EXECUTABLE);
91                  newJobDesc.setAttribute(JobDescription.EXECUTABLE, "/bin/bash");
92  
93                  try {
94                      m_arguments = jobDesc.getVectorAttribute(JobDescription.ARGUMENTS);
95                      newJobDesc.removeAttribute(JobDescription.ARGUMENTS);
96                  } catch (DoesNotExistException e) {
97                      m_arguments = null;
98                  }
99  
100                 try {
101                     String input = jobDesc.getAttribute(JobDescription.INPUT);
102                     newJobDesc.removeAttribute(JobDescription.INPUT);
103                     m_redirections.append(" <");
104                     m_redirections.append(input);
105                 } catch (DoesNotExistException e) {
106                     // ignore
107                 }
108                 try {
109                     String output = jobDesc.getAttribute(JobDescription.OUTPUT);
110                     newJobDesc.removeAttribute(JobDescription.OUTPUT);
111                     m_redirections.append(" >");
112                     m_redirections.append(output);
113                 } catch (DoesNotExistException e) {
114                     // ignore
115                 }
116                 try {
117                     String error = jobDesc.getAttribute(JobDescription.ERROR);
118                     newJobDesc.removeAttribute(JobDescription.ERROR);
119                     m_redirections.append(" 2>");
120                     m_redirections.append(error);
121                 } catch (DoesNotExistException e) {
122                     // ignore
123                 }
124 
125                 // uncomment this to enable debugging job wrapper script
126 //                newJobDesc.setAttribute(JobDescription.EXECUTABLE, "/usr/bin/cat");
127 //                newJobDesc.setVectorAttribute(JobDescription.ARGUMENTS, new String[]{"> /tmp/job-wrapper.sh"});
128             }
129 
130             // returns
131             return newJobDesc;
132         } catch (IncorrectStateException e) {
133             throw new NoSuccessException(e);
134         } catch (DoesNotExistException e) {
135             throw new NoSuccessException(e);
136         } catch (CloneNotSupportedException e) {
137             throw new NoSuccessException(e);
138         }
139     }
140 
141     public void preStaging(AbstractSyncJobImpl job) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, DoesNotExistException, TimeoutException, IncorrectStateException, NoSuccessException {
142         // may create script
143         if (this.needsStdin()) {
144             // open
145             PrintStream stdin = new UnixPrintStream(job.getStdinSync());
146 
147             // copy template to script
148             InputStream template = this.getClass().getClassLoader().getResourceAsStream("bash/template.sh");
149             BufferedReader reader = new BufferedReader(new InputStreamReader(template));
150             try {
151                 for (String line; (line=reader.readLine())!=null; ) {
152                     stdin.println(line);
153                 }
154                 reader.close();
155             } catch (IOException e) {
156                 throw new NoSuccessException(e);
157             }
158 
159             // for each inputToWorker
160             for (int i=0; i<m_inputToWorker.size(); i++) {
161                 InputDataStagingToWorker staging = m_inputToWorker.get(i);
162                 staging.preStaging(job.getSession(), stdin, i, m_executable);
163             }
164 
165             // invoke command
166             stdin.println("PATH=.:$PATH");
167             if (m_arguments != null) {
168                 stdin.println("set -- "+ StringArray.arrayToString(m_arguments, " "));
169             }
170             stdin.println(m_executable+" $*"+m_redirections.toString());
171             stdin.println();
172 
173             // for each outputFromWorker
174             for (OutputDataStagingFromWorker staging : m_outputFromWorker) {
175                 staging.preStaging(stdin);
176             }
177 
178             // cleanup on worker
179             if (isCleanup(job.getJobDescriptionSync())) {
180                 for (InputDataStagingToWorker staging : m_inputToWorker) {
181                     staging.cleanup(stdin);
182                 }
183                 for (OutputDataStagingFromWorker staging : m_outputFromWorker) {
184                     staging.cleanup(stdin);
185                 }
186             }
187 
188             // close
189             stdin.close();
190         }
191 
192         // for each inputToRemote
193         for (InputDataStagingToRemote staging : m_inputToRemote) {
194             staging.preStaging(job.getSession());
195         }
196     }
197 
198     public void postStaging(AbstractSyncJobImpl job, String nativeJobId) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, DoesNotExistException, TimeoutException, IncorrectStateException, NoSuccessException {
199         // may retrieve output files from stdout
200         if (this.needsStdout()) {
201             // open
202             BufferedReader stdout = new BufferedReader(new InputStreamReader(job.getStdoutSync()));
203 
204             // for each outputFromWorker
205             for (OutputDataStagingFromWorker staging : m_outputFromWorker) {
206                 staging.postStaging(job.getSession(), stdout);
207             }
208 
209             // close
210             try{stdout.close();} catch(IOException e){/*ignore*/}
211         }
212 
213         // for each outputFromRemote
214         for (OutputDataStagingFromRemote staging : m_outputFromRemote) {
215             staging.postStaging(job.getSession());
216         }
217     }
218 
219     public Directory cleanup(AbstractSyncJobImpl job, String nativeJobId) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, DoesNotExistException, TimeoutException, IncorrectStateException, NoSuccessException {
220         // for each inputToRemote
221         for (InputDataStagingToRemote staging : m_inputToRemote) {
222             staging.cleanup(job.getSession());
223         }
224 
225         // for each outputFromRemote
226         for (OutputDataStagingFromRemote staging : m_outputFromRemote) {
227             staging.cleanup(job.getSession());
228         }
229         return null;
230     }
231 
232     private boolean needsStdin() {
233         return !m_inputToWorker.isEmpty() || !m_outputFromWorker.isEmpty();
234     }
235 
236     private boolean needsStdout() {
237         return !m_outputFromWorker.isEmpty();
238     }
239 
240     private static boolean isCleanup(JobDescription jobDesc) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, DoesNotExistException, TimeoutException, IncorrectStateException, NoSuccessException {
241         final boolean CLEANUP_DEFAULT = true;
242         try {
243             String cleanup = jobDesc.getAttribute(JobDescription.CLEANUP);
244             if ("False".equalsIgnoreCase(cleanup)) {
245                 return false;
246             } else if ("True".equalsIgnoreCase(cleanup)) {
247                 return true;
248             } else if ("Default".equalsIgnoreCase(cleanup)) {
249                 return CLEANUP_DEFAULT;
250             } else {
251                 throw new BadParameterException("Attribute '"+JobDescription.CLEANUP+"' has unexpected value: "+cleanup);
252             }
253         } catch (DoesNotExistException e) {
254             return CLEANUP_DEFAULT;
255         }
256     }
257 
258     private static boolean isURL(String file) {
259         final boolean hasProtocolScheme = file.contains(":/");
260         final boolean isLinuxAbsolutePath = file.startsWith("/");
261         final boolean isWindowsAbsolutePath = file.indexOf(':')<=1;  // -1 or 1 (none or "_:")
262         return hasProtocolScheme && ! (isLinuxAbsolutePath || isWindowsAbsolutePath);
263     }
264 }