View Javadoc

1   package fr.in2p3.jsaga.impl.file.copy;
2   
3   import fr.in2p3.jsaga.adaptor.data.optimise.DataCopyMonitor;
4   import fr.in2p3.jsaga.impl.monitoring.*;
5   import fr.in2p3.jsaga.impl.task.AbstractTaskImpl;
6   import org.ogf.saga.SagaObject;
7   import org.ogf.saga.error.*;
8   import org.ogf.saga.session.Session;
9   import org.ogf.saga.task.*;
10  import org.ogf.saga.url.URL;
11  
12  /* ***************************************************
13  * *** Centre de Calcul de l'IN2P3 - Lyon (France) ***
14  * ***             http://cc.in2p3.fr/             ***
15  * ***************************************************
16  * File:   AbstractCopyTask
17  * Author: Sylvain Reynaud (sreynaud@in2p3.fr)
18  * Date:   21 fevr. 2009
19  * ***************************************************
20  * Description:                                      */
21  /**
22   *
23   */
24  public abstract class AbstractCopyTask<T extends SagaObject,E> extends AbstractTaskImpl<T,E> implements Task<T,E>, DataCopyMonitor {
25      public static final String FILE_COPY_PROGRESS = "file.copy.progress";
26      // internal
27      private URL m_target;
28      private int m_flags;
29      // metrics
30      private long m_totalWrittenBytes;
31      private MetricImpl<Long> m_metric_Progress;
32  
33      /** constructor */
34      public AbstractCopyTask(TaskMode mode, Session session, URL target, int flags) throws NotImplementedException {
35          super(session, null, true);
36          // internal
37          m_target = target;
38          m_flags = flags;
39          // metrics
40          m_totalWrittenBytes = 0L;
41          m_metric_Progress = new MetricFactoryImpl<Long>(this).createAndRegister(
42                  FILE_COPY_PROGRESS,
43                  "this metric gives the state of ongoing file transfer as number of bytes transfered.",
44                  MetricMode.ReadOnly,
45                  "bytes",
46                  MetricType.Int,
47                  0L);
48          try {
49              switch(mode) {
50                  case TASK:
51                      break;
52                  case ASYNC:
53                      this.run();
54                      break;
55                  case SYNC:
56                      this.run();
57                      this.waitFor();
58                      break;
59                  default:
60                      throw new NotImplementedException("INTERNAL ERROR: unexpected exception");
61              }
62          } catch (NotImplementedException e) {
63              throw e;
64          } catch (SagaException e) {
65              throw new NotImplementedException(e);
66          }
67      }
68  
69      public void increment(long writtenBytes) {
70      	this.setTotal(m_totalWrittenBytes + writtenBytes);
71      }
72  
73      public void setTotal(long writtenBytes) {
74          m_totalWrittenBytes = writtenBytes;
75          m_metric_Progress.setValue(m_totalWrittenBytes);
76      }
77  
78      public abstract void doCopy(URL target, int flags) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, IncorrectStateException, DoesNotExistException, AlreadyExistsException, TimeoutException, NoSuccessException, IncorrectURLException;
79  
80      ////////////////////////////////////// implementation of AbstractTaskImpl //////////////////////////////////////
81  
82      private Thread m_thread;
83      public void doSubmit() throws NotImplementedException, IncorrectStateException, TimeoutException, NoSuccessException {
84          m_thread = new Thread(new Runnable(){
85              public void run() {
86                  AbstractCopyTask.super.setState(State.RUNNING);
87                  try {
88                      AbstractCopyTask.this.doCopy(m_target, m_flags);
89                      AbstractCopyTask.super.setState(State.DONE);
90                  } catch (Exception e) {
91                      AbstractCopyTask.super.setException(new NoSuccessException(e));
92                      AbstractCopyTask.super.setState(State.FAILED);
93                  }
94              }
95          });
96          m_thread.start();
97      }
98  
99      protected void doCancel() {
100         try {
101             if (m_thread != null) {
102                 m_thread.interrupt();
103             }
104             this.setState(State.CANCELED);
105         } catch(SecurityException e) {
106             // do nothing (failed to cancel task)
107         }
108     }
109 
110     protected State queryState() {
111         return null;    // FileCopyTask does not support queryState
112     }
113 
114     public boolean startListening() {
115         return true;    // FileCopyTask is always listening anyway...
116     }
117 
118     public void stopListening() {
119         // do nothing
120     }
121 }