1 package fr.in2p3.jsaga.impl.file.copy;
2
3 import fr.in2p3.jsaga.adaptor.data.optimise.DataCopyMonitor;
4 import fr.in2p3.jsaga.impl.monitoring.*;
5 import fr.in2p3.jsaga.impl.task.AbstractTaskImpl;
6 import org.ogf.saga.SagaObject;
7 import org.ogf.saga.error.*;
8 import org.ogf.saga.session.Session;
9 import org.ogf.saga.task.*;
10 import org.ogf.saga.url.URL;
11
12
13
14
15
16
17
18
19
20
21
22
23
24 public abstract class AbstractCopyTask<T extends SagaObject,E> extends AbstractTaskImpl<T,E> implements Task<T,E>, DataCopyMonitor {
25 public static final String FILE_COPY_PROGRESS = "file.copy.progress";
26
27 private URL m_target;
28 private int m_flags;
29
30 private long m_totalWrittenBytes;
31 private MetricImpl<Long> m_metric_Progress;
32
33
34 public AbstractCopyTask(TaskMode mode, Session session, URL target, int flags) throws NotImplementedException {
35 super(session, null, true);
36
37 m_target = target;
38 m_flags = flags;
39
40 m_totalWrittenBytes = 0L;
41 m_metric_Progress = new MetricFactoryImpl<Long>(this).createAndRegister(
42 FILE_COPY_PROGRESS,
43 "this metric gives the state of ongoing file transfer as number of bytes transfered.",
44 MetricMode.ReadOnly,
45 "bytes",
46 MetricType.Int,
47 0L);
48 try {
49 switch(mode) {
50 case TASK:
51 break;
52 case ASYNC:
53 this.run();
54 break;
55 case SYNC:
56 this.run();
57 this.waitFor();
58 break;
59 default:
60 throw new NotImplementedException("INTERNAL ERROR: unexpected exception");
61 }
62 } catch (NotImplementedException e) {
63 throw e;
64 } catch (SagaException e) {
65 throw new NotImplementedException(e);
66 }
67 }
68
69 public void increment(long writtenBytes) {
70 this.setTotal(m_totalWrittenBytes + writtenBytes);
71 }
72
73 public void setTotal(long writtenBytes) {
74 m_totalWrittenBytes = writtenBytes;
75 m_metric_Progress.setValue(m_totalWrittenBytes);
76 }
77
78 public abstract void doCopy(URL target, int flags) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, IncorrectStateException, DoesNotExistException, AlreadyExistsException, TimeoutException, NoSuccessException, IncorrectURLException;
79
80
81
82 private Thread m_thread;
83 public void doSubmit() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
84 m_thread = new Thread(new Runnable(){
85 public void run() {
86 AbstractCopyTask.super.setState(State.RUNNING);
87 try {
88 AbstractCopyTask.this.doCopy(m_target, m_flags);
89 AbstractCopyTask.super.setState(State.DONE);
90 } catch (Exception e) {
91 AbstractCopyTask.super.setException(new NoSuccessException(e));
92 AbstractCopyTask.super.setState(State.FAILED);
93 }
94 }
95 });
96 m_thread.start();
97 }
98
99 protected void doCancel() {
100 try {
101 if (m_thread != null) {
102 m_thread.interrupt();
103 }
104 this.setState(State.CANCELED);
105 } catch(SecurityException e) {
106
107 }
108 }
109
110 protected State queryState() {
111 return null;
112 }
113
114 public boolean startListening() {
115 return true;
116 }
117
118 public void stopListening() {
119
120 }
121 }