1 package fr.in2p3.jsaga.impl.job.staging.mgr;
2
3 import fr.in2p3.jsaga.helpers.StringArray;
4 import fr.in2p3.jsaga.impl.job.instance.AbstractSyncJobImpl;
5 import fr.in2p3.jsaga.impl.job.staging.*;
6 import org.ogf.saga.error.*;
7 import org.ogf.saga.file.Directory;
8 import org.ogf.saga.job.JobDescription;
9
10 import java.io.*;
11 import java.util.ArrayList;
12 import java.util.List;
13
14
15
16
17
18
19
20
21
22
23
24
25
26 public class DataStagingManagerThroughStream implements DataStagingManager {
27 private List<InputDataStagingToRemote> m_inputToRemote;
28 private List<InputDataStagingToWorker> m_inputToWorker;
29 private List<OutputDataStagingFromRemote> m_outputFromRemote;
30 private List<OutputDataStagingFromWorker> m_outputFromWorker;
31
32
33 private String m_executable;
34 private String[] m_arguments;
35 private StringBuffer m_redirections;
36
37 public DataStagingManagerThroughStream(String[] fileTransfer) throws NotImplementedException, BadParameterException, NoSuccessException {
38
39 m_inputToRemote = new ArrayList<InputDataStagingToRemote>();
40 m_inputToWorker = new ArrayList<InputDataStagingToWorker>();
41 m_outputFromRemote = new ArrayList<OutputDataStagingFromRemote>();
42 m_outputFromWorker = new ArrayList<OutputDataStagingFromWorker>();
43 m_executable = null;
44 m_arguments = null;
45 m_redirections = null;
46
47
48 for (String ft : fileTransfer) {
49 AbstractDataStaging dataStaging = DataStagingFactory.create(ft);
50 if (dataStaging instanceof InputDataStagingToRemote) {
51 m_inputToRemote.add((InputDataStagingToRemote) dataStaging);
52 } else if (dataStaging instanceof InputDataStagingToWorker) {
53 m_inputToWorker.add((InputDataStagingToWorker) dataStaging);
54 } else if (dataStaging instanceof OutputDataStagingFromRemote) {
55 m_outputFromRemote.add((OutputDataStagingFromRemote) dataStaging);
56 } else if (dataStaging instanceof OutputDataStagingFromWorker) {
57 m_outputFromWorker.add((OutputDataStagingFromWorker) dataStaging);
58 } else {
59 throw new BadParameterException("[INTERNAL ERROR] Unexpected class: "+dataStaging.getClass());
60 }
61 }
62 }
63
64 public JobDescription modifyJobDescription(final JobDescription jobDesc) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, TimeoutException, NoSuccessException {
65 try {
66
67 JobDescription newJobDesc = (JobDescription) jobDesc.clone();
68
69
70 try {
71 newJobDesc.removeAttribute(JobDescription.FILETRANSFER);
72 } catch (DoesNotExistException e) {
73
74 }
75
76
77 m_redirections = new StringBuffer();
78 if (this.needsStdin()) {
79
80 try {
81 if ("true".equalsIgnoreCase(jobDesc.getAttribute(JobDescription.INTERACTIVE))) {
82 throw new BadParameterException("Option "+JobDescription.FILETRANSFER+" can not be used with option "+JobDescription.INTERACTIVE);
83 }
84 } catch (DoesNotExistException e) {
85
86 }
87
88 newJobDesc.setAttribute(JobDescription.INTERACTIVE, "true");
89
90 m_executable = jobDesc.getAttribute(JobDescription.EXECUTABLE);
91 newJobDesc.setAttribute(JobDescription.EXECUTABLE, "/bin/bash");
92
93 try {
94 m_arguments = jobDesc.getVectorAttribute(JobDescription.ARGUMENTS);
95 newJobDesc.removeAttribute(JobDescription.ARGUMENTS);
96 } catch (DoesNotExistException e) {
97 m_arguments = null;
98 }
99
100 try {
101 String input = jobDesc.getAttribute(JobDescription.INPUT);
102 newJobDesc.removeAttribute(JobDescription.INPUT);
103 m_redirections.append(" <");
104 m_redirections.append(input);
105 } catch (DoesNotExistException e) {
106
107 }
108 try {
109 String output = jobDesc.getAttribute(JobDescription.OUTPUT);
110 newJobDesc.removeAttribute(JobDescription.OUTPUT);
111 m_redirections.append(" >");
112 m_redirections.append(output);
113 } catch (DoesNotExistException e) {
114
115 }
116 try {
117 String error = jobDesc.getAttribute(JobDescription.ERROR);
118 newJobDesc.removeAttribute(JobDescription.ERROR);
119 m_redirections.append(" 2>");
120 m_redirections.append(error);
121 } catch (DoesNotExistException e) {
122
123 }
124
125
126
127
128 }
129
130
131 return newJobDesc;
132 } catch (IncorrectStateException e) {
133 throw new NoSuccessException(e);
134 } catch (DoesNotExistException e) {
135 throw new NoSuccessException(e);
136 } catch (CloneNotSupportedException e) {
137 throw new NoSuccessException(e);
138 }
139 }
140
141 public void preStaging(AbstractSyncJobImpl job) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, DoesNotExistException, TimeoutException, IncorrectStateException, NoSuccessException {
142
143 if (this.needsStdin()) {
144
145 PrintStream stdin = new UnixPrintStream(job.getStdinSync());
146
147
148 InputStream template = this.getClass().getClassLoader().getResourceAsStream("bash/template.sh");
149 BufferedReader reader = new BufferedReader(new InputStreamReader(template));
150 try {
151 for (String line; (line=reader.readLine())!=null; ) {
152 stdin.println(line);
153 }
154 reader.close();
155 } catch (IOException e) {
156 throw new NoSuccessException(e);
157 }
158
159
160 for (int i=0; i<m_inputToWorker.size(); i++) {
161 InputDataStagingToWorker staging = m_inputToWorker.get(i);
162 staging.preStaging(job.getSession(), stdin, i, m_executable);
163 }
164
165
166 stdin.println("PATH=.:$PATH");
167 if (m_arguments != null) {
168 stdin.println("set -- "+ StringArray.arrayToString(m_arguments, " "));
169 }
170 stdin.println(m_executable+" $*"+m_redirections.toString());
171 stdin.println();
172
173
174 for (OutputDataStagingFromWorker staging : m_outputFromWorker) {
175 staging.preStaging(stdin);
176 }
177
178
179 if (isCleanup(job.getJobDescriptionSync())) {
180 for (InputDataStagingToWorker staging : m_inputToWorker) {
181 staging.cleanup(stdin);
182 }
183 for (OutputDataStagingFromWorker staging : m_outputFromWorker) {
184 staging.cleanup(stdin);
185 }
186 }
187
188
189 stdin.close();
190 }
191
192
193 for (InputDataStagingToRemote staging : m_inputToRemote) {
194 staging.preStaging(job.getSession());
195 }
196 }
197
198 public void postStaging(AbstractSyncJobImpl job, String nativeJobId) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, DoesNotExistException, TimeoutException, IncorrectStateException, NoSuccessException {
199
200 if (this.needsStdout()) {
201
202 BufferedReader stdout = new BufferedReader(new InputStreamReader(job.getStdoutSync()));
203
204
205 for (OutputDataStagingFromWorker staging : m_outputFromWorker) {
206 staging.postStaging(job.getSession(), stdout);
207 }
208
209
210 try{stdout.close();} catch(IOException e){
211 }
212
213
214 for (OutputDataStagingFromRemote staging : m_outputFromRemote) {
215 staging.postStaging(job.getSession());
216 }
217 }
218
219 public Directory cleanup(AbstractSyncJobImpl job, String nativeJobId) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, DoesNotExistException, TimeoutException, IncorrectStateException, NoSuccessException {
220
221 for (InputDataStagingToRemote staging : m_inputToRemote) {
222 staging.cleanup(job.getSession());
223 }
224
225
226 for (OutputDataStagingFromRemote staging : m_outputFromRemote) {
227 staging.cleanup(job.getSession());
228 }
229 return null;
230 }
231
232 private boolean needsStdin() {
233 return !m_inputToWorker.isEmpty() || !m_outputFromWorker.isEmpty();
234 }
235
236 private boolean needsStdout() {
237 return !m_outputFromWorker.isEmpty();
238 }
239
240 private static boolean isCleanup(JobDescription jobDesc) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, DoesNotExistException, TimeoutException, IncorrectStateException, NoSuccessException {
241 final boolean CLEANUP_DEFAULT = true;
242 try {
243 String cleanup = jobDesc.getAttribute(JobDescription.CLEANUP);
244 if ("False".equalsIgnoreCase(cleanup)) {
245 return false;
246 } else if ("True".equalsIgnoreCase(cleanup)) {
247 return true;
248 } else if ("Default".equalsIgnoreCase(cleanup)) {
249 return CLEANUP_DEFAULT;
250 } else {
251 throw new BadParameterException("Attribute '"+JobDescription.CLEANUP+"' has unexpected value: "+cleanup);
252 }
253 } catch (DoesNotExistException e) {
254 return CLEANUP_DEFAULT;
255 }
256 }
257
258 private static boolean isURL(String file) {
259 final boolean hasProtocolScheme = file.contains(":/");
260 final boolean isLinuxAbsolutePath = file.startsWith("/");
261 final boolean isWindowsAbsolutePath = file.indexOf(':')<=1;
262 return hasProtocolScheme && ! (isLinuxAbsolutePath || isWindowsAbsolutePath);
263 }
264 }