1 package fr.in2p3.jsaga.impl.file.copy;
2
3 import fr.in2p3.jsaga.impl.monitoring.*;
4 import fr.in2p3.jsaga.impl.task.AbstractTaskImpl;
5 import org.ogf.saga.SagaObject;
6 import org.ogf.saga.error.*;
7 import org.ogf.saga.session.Session;
8 import org.ogf.saga.task.*;
9 import org.ogf.saga.url.URL;
10
11
12
13
14
15
16
17
18
19
20
21
22
23 public abstract class AbstractCopyFromTask<T extends SagaObject,E> extends AbstractTaskImpl<T,E> implements Task<T,E> {
24
25 private URL m_source;
26 private int m_flags;
27
28 private long m_totalWrittenBytes;
29 private MetricImpl<Long> m_metric_Progress;
30
31
32 public AbstractCopyFromTask(TaskMode mode, Session session, URL source, int flags) throws NotImplementedException {
33 super(session, null, true);
34
35 m_source = source;
36 m_flags = flags;
37
38 m_totalWrittenBytes = 0L;
39 m_metric_Progress = new MetricFactoryImpl<Long>(this).createAndRegister(
40 AbstractCopyTask.FILE_COPY_PROGRESS,
41 "this metric gives the state of ongoing file transfer as number of bytes transfered.",
42 MetricMode.ReadOnly,
43 "bytes",
44 MetricType.Int,
45 0L);
46 try {
47 switch(mode) {
48 case TASK:
49 break;
50 case ASYNC:
51 this.run();
52 break;
53 case SYNC:
54 this.run();
55 this.waitFor();
56 break;
57 default:
58 throw new NotImplementedException("INTERNAL ERROR: unexpected exception");
59 }
60 } catch (NotImplementedException e) {
61 throw e;
62 } catch (SagaException e) {
63 throw new NotImplementedException(e);
64 }
65 }
66
67 void increment(long writtenBytes) {
68 m_totalWrittenBytes += writtenBytes;
69 m_metric_Progress.setValue(m_totalWrittenBytes);
70 }
71
72 public abstract void doCopyFrom(URL source, int flags) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, IncorrectStateException, DoesNotExistException, AlreadyExistsException, TimeoutException, NoSuccessException, IncorrectURLException;
73
74
75
76 private Thread m_thread;
77 public void doSubmit() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
78 m_thread = new Thread(new Runnable(){
79 public void run() {
80 AbstractCopyFromTask.super.setState(State.RUNNING);
81 try {
82 AbstractCopyFromTask.this.doCopyFrom(m_source, m_flags);
83 AbstractCopyFromTask.super.setState(State.DONE);
84 } catch (Exception e) {
85 AbstractCopyFromTask.super.setException(new NoSuccessException(e));
86 AbstractCopyFromTask.super.setState(State.FAILED);
87 }
88 }
89 });
90 m_thread.start();
91 }
92
93 protected void doCancel() {
94 try {
95 if (m_thread != null) {
96 m_thread.interrupt();
97 }
98 this.setState(State.CANCELED);
99 } catch(SecurityException e) {
100
101 }
102 }
103
104 protected State queryState() {
105 return null;
106 }
107
108 public boolean startListening() {
109 return true;
110 }
111
112 public void stopListening() {
113
114 }
115 }