1 package fr.in2p3.jsaga.impl.job.streaming.mgr;
2
3 import java.io.File;
4 import java.io.FileOutputStream;
5 import java.io.OutputStream;
6 import java.util.UUID;
7
8 import org.ogf.saga.error.AuthenticationFailedException;
9 import org.ogf.saga.error.AuthorizationFailedException;
10 import org.ogf.saga.error.BadParameterException;
11 import org.ogf.saga.error.DoesNotExistException;
12 import org.ogf.saga.error.IncorrectStateException;
13 import org.ogf.saga.error.NoSuccessException;
14 import org.ogf.saga.error.NotImplementedException;
15 import org.ogf.saga.error.PermissionDeniedException;
16 import org.ogf.saga.error.TimeoutException;
17 import org.ogf.saga.file.Directory;
18 import org.ogf.saga.job.JobDescription;
19
20 import fr.in2p3.jsaga.adaptor.job.control.staging.StagingJobAdaptorTwoPhase;
21 import fr.in2p3.jsaga.impl.job.instance.AbstractSyncJobImpl;
22 import fr.in2p3.jsaga.impl.job.staging.mgr.DataStagingManager;
23 import fr.in2p3.jsaga.impl.job.staging.mgr.DataStagingManagerThroughSandboxTwoPhase;
24 import fr.in2p3.jsaga.impl.job.streaming.LocalFileFactory;
25
26
27
28
29
30
31
32
33
34
35
36 public class StreamingManagerThroughSandboxTwoPhase extends DataStagingManagerThroughSandboxTwoPhase implements StreamingManagerThroughSandbox {
37
38 private String m_uuid;
39
40 public StreamingManagerThroughSandboxTwoPhase(StagingJobAdaptorTwoPhase adaptor,
41 String uniqId) throws NotImplementedException,
42 BadParameterException, NoSuccessException {
43 super(adaptor, uniqId);
44 m_uuid = uniqId;
45 }
46
47 @Override
48 public JobDescription modifyJobDescription(JobDescription jobDesc)
49 throws NotImplementedException, AuthenticationFailedException,
50 AuthorizationFailedException, PermissionDeniedException,
51 BadParameterException, TimeoutException, NoSuccessException {
52
53 try {
54 JobDescription newJobDesc = (JobDescription) jobDesc.clone();
55
56 newJobDesc.setAttribute(JobDescription.INTERACTIVE, JobDescription.FALSE);
57
58
59 newJobDesc.setVectorAttribute(JobDescription.FILETRANSFER, new String[]{
60 LocalFileFactory.getLocalInputFile(m_uuid) + " > " + getWorker("input"),
61 LocalFileFactory.getLocalOutputFile(m_uuid) + " < " + getWorker("output"),
62 LocalFileFactory.getLocalErrorFile(m_uuid) + " < " + getWorker("error")
63 });
64
65
66 newJobDesc.setAttribute(JobDescription.INPUT, getWorker("input").toString());
67 newJobDesc.setAttribute(JobDescription.OUTPUT, getWorker("output").toString());
68 newJobDesc.setAttribute(JobDescription.ERROR, getWorker("error").toString());
69
70 return newJobDesc;
71 } catch (CloneNotSupportedException e) {
72 throw new NoSuccessException(e);
73 } catch (Exception e) {
74 throw new NoSuccessException(e);
75 }
76 }
77
78 @Override
79 public Directory cleanup(AbstractSyncJobImpl job, String nativeJobId) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, DoesNotExistException, TimeoutException, IncorrectStateException, NoSuccessException {
80 try {
81 File input = LocalFileFactory.getLocalInputFile(m_uuid);
82 input.delete();
83 } catch (Exception e) {
84 }
85 try {
86 File output = LocalFileFactory.getLocalOutputFile(m_uuid);
87 output.delete();
88 } catch (Exception e) {
89 }
90 try {
91 File error = LocalFileFactory.getLocalErrorFile(m_uuid);
92 error.delete();
93 } catch (Exception e) {
94 }
95 return super.cleanup(job, nativeJobId);
96 }
97
98 protected String getWorker(String suffix) {
99 return "worker-"+m_uuid+"."+suffix;
100 }
101
102
103 }