View Javadoc

1   package fr.in2p3.jsaga.impl.file.copy;
2   
3   import fr.in2p3.jsaga.Base;
4   import fr.in2p3.jsaga.EngineProperties;
5   import fr.in2p3.jsaga.adaptor.data.DataAdaptor;
6   import fr.in2p3.jsaga.adaptor.data.ParentDoesNotExist;
7   import fr.in2p3.jsaga.adaptor.data.optimise.DataCopy;
8   import fr.in2p3.jsaga.adaptor.data.optimise.DataCopyDelegated;
9   import fr.in2p3.jsaga.adaptor.data.write.FileWriter;
10  import fr.in2p3.jsaga.adaptor.data.write.FileWriterPutter;
11  import fr.in2p3.jsaga.impl.file.AbstractSyncFileImpl;
12  import fr.in2p3.jsaga.impl.logicalfile.AbstractSyncLogicalFileImpl;
13  import fr.in2p3.jsaga.impl.namespace.FlagsHelper;
14  import fr.in2p3.jsaga.impl.namespace.JSAGAFlags;
15  import org.ogf.saga.error.*;
16  import org.ogf.saga.file.*;
17  import org.ogf.saga.logicalfile.LogicalFileFactory;
18  import org.ogf.saga.namespace.Flags;
19  import org.ogf.saga.session.Session;
20  import org.ogf.saga.url.URL;
21  
22  import java.io.IOException;
23  import java.util.List;
24  
25  /* ***************************************************
26  * *** Centre de Calcul de l'IN2P3 - Lyon (France) ***
27  * ***             http://cc.in2p3.fr/             ***
28  * ***************************************************
29  * File:   FileCopyFrom
30  * Author: Sylvain Reynaud (sreynaud@in2p3.fr)
31  * Date:   9 juil. 2008
32  * ***************************************************
33  * Description:                                      */
34  /**
35   *
36   */
37  public class FileCopyFrom {
38      private static final String JSAGA_FACTORY = Base.getSagaFactory();
39  
40      //private static final int DEFAULT_BUFFER_SIZE = 16384;
41      private Session m_session;
42      private AbstractSyncFileImpl m_targetFile;
43      private DataAdaptor m_adaptor;
44  
45      /** constructor */
46      public FileCopyFrom(Session session, AbstractSyncFileImpl targetFile, DataAdaptor adaptor) throws NotImplementedException {
47          m_session = session;
48          m_targetFile = targetFile;
49          m_adaptor = adaptor;
50      }
51  
52      public void copyFrom(URL effectiveSource, int flags, AbstractCopyFromTask progressMonitor) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException, IncorrectURLException {
53          boolean overwrite = Flags.OVERWRITE.isSet(flags);
54          URL target = m_targetFile.getURL();
55          if (m_adaptor instanceof DataCopyDelegated && target.getScheme().equals(effectiveSource.getScheme())) {
56              try {
57                  ((DataCopyDelegated)m_adaptor).requestTransfer(
58                          effectiveSource,
59                          target,
60                          overwrite, target.getQuery());
61              } catch (DoesNotExistException doesNotExist) {
62                  throw new DoesNotExistException("Source file does not exist: "+effectiveSource, doesNotExist.getCause());
63              } catch (AlreadyExistsException alreadyExists) {
64                  throw new IncorrectStateException("Target entry already exists: "+target, alreadyExists);
65              }
66          } else if (m_adaptor instanceof DataCopy && target.getScheme().equals(effectiveSource.getScheme())) {
67              String sourceHost = effectiveSource.getHost();
68              int sourcePort = effectiveSource.getPort()>-1 ? effectiveSource.getPort() : m_adaptor.getDefaultPort();
69              String sourcePath = effectiveSource.getPath();
70              try {
71                  ((DataCopy)m_adaptor).copyFrom(
72                          sourceHost, sourcePort, sourcePath,
73                          target.getPath(),
74                          overwrite, target.getQuery());
75              } catch (DoesNotExistException doesNotExist) {
76                  throw new DoesNotExistException("Source file does not exist: "+effectiveSource, doesNotExist.getCause());
77              } catch (AlreadyExistsException alreadyExists) {
78                  throw new IncorrectStateException("Target entry already exists: "+target, alreadyExists);
79              }
80          } else if (m_adaptor instanceof FileWriterPutter && !target.getScheme().equals(effectiveSource.getScheme())) {
81              AbstractSyncFileImpl sourceFile = this.createSourceFile(effectiveSource);
82              try {
83                  ((FileWriterPutter)m_adaptor).putFromStream(
84                          target.getPath(),
85                          false,
86                          target.getQuery(),
87                          sourceFile.getFileInputStream());
88              } catch (ParentDoesNotExist parentDoesNotExist) {
89                  throw new DoesNotExistException("Target parent directory does not exist: "+target, parentDoesNotExist);
90              } catch (AlreadyExistsException alreadyExists) {
91                  throw new IncorrectStateException("Target entry already exists: "+target, alreadyExists);
92              } finally {
93                  sourceFile.close();
94              }
95          } else if (m_adaptor instanceof FileWriter) {
96              // todo: check that source is not a logical entry
97              this.getFromPhysicalFile(effectiveSource, flags, progressMonitor);
98          } else {
99              throw new NotImplementedException("Not supported for this protocol: "+target.getScheme());
100         }
101     }
102 
103     private void getFromPhysicalFile(URL source, int flags, AbstractCopyFromTask progressMonitor) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException, IncorrectURLException {
104         IOException closingException = null;
105 
106         // open source file if it exists
107         AbstractSyncFileImpl sourceFile = this.createSourceFile(source);
108         FileInputStream in = sourceFile.getFileInputStream();
109 
110         // start to read if it exists
111         int bufferSize = EngineProperties.getInteger(EngineProperties.DATA_COPY_BUFFER_SIZE);
112         byte[] data = new byte[bufferSize];
113         int readlen;
114         try {
115             readlen = in.read(data, 0, data.length);
116         } catch (IOException e) {
117             try {
118                 sourceFile.close();
119             } catch (Exception e1) {/*ignore*/}
120             throw new DoesNotExistException("Source file does not exist: "+source, e);
121         }
122 
123         // open target file and copy
124         try {
125             // sourceFlags may contains OVERWRITE flag for target
126             boolean overwrite = Flags.OVERWRITE.isSet(flags);
127             FileOutputStream out = m_targetFile.newFileOutputStream(overwrite);
128             try {
129                 while (readlen > 0) {
130                     int writelen;
131                     for (int total=0; total<readlen; total+=writelen) {
132                         writelen = readlen - total;
133                         if (total > 0) {
134                             byte[] dataBis = new byte[writelen];
135                             System.arraycopy(data, total, dataBis, 0, writelen);
136                             out.write(dataBis, 0, writelen);
137                         } else {
138                             out.write(data, 0, readlen);
139                         }
140                         // update progress monitor
141                         if (progressMonitor != null) {
142                             progressMonitor.increment(writelen);
143                         }
144                     }
145                     readlen = in.read(data,0, data.length);
146                 }
147             } catch (IOException e) {
148                 throw new TimeoutException(e);
149             } finally {
150                 try {
151                     out.close();
152                 } catch (IOException e) {
153                     closingException = e;
154                 }
155             }
156         } catch (AlreadyExistsException alreadyExists) {
157             throw new IncorrectStateException("Target file already exists: "+m_targetFile.getURL(), alreadyExists);
158         } finally {
159             try {
160                 sourceFile.close();
161             } catch (Exception e) {
162                 closingException = new IOException(e.getClass().getName()+": "+e.getMessage());
163             }
164         }
165         if (closingException != null) {
166             throw new IncorrectStateException(closingException);
167         }
168     }
169 
170     private void getFromLogicalFile(URL source, int flags) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException, IncorrectURLException {
171         // get location of source physical file
172         AbstractSyncLogicalFileImpl sourceLogicalFile = this.createSourceLogicalFile(source, flags);
173         List<URL> sourceLocations = sourceLogicalFile.listLocationsSync();
174         if (sourceLocations!=null && sourceLocations.size()>0) {
175             // get source physical file
176             URL sourcePhysicalUrl = sourceLocations.get(0);
177             // copy
178             m_targetFile.copyFromSync(sourcePhysicalUrl, flags);
179             // close source logical file
180             sourceLogicalFile.close();
181         } else {
182             throw new NoSuccessException("No location found for logical file: "+source);
183         }
184     }
185 
186     private AbstractSyncFileImpl createSourceFile(URL source) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException, IncorrectURLException {
187         try {
188             return (AbstractSyncFileImpl) FileFactory.createFile(JSAGA_FACTORY, m_session, source, Flags.READ.getValue());
189         } catch (AlreadyExistsException e) {
190             throw new NoSuccessException("Unexpected exception", e);
191         } catch (DoesNotExistException doesNotExist) {
192             throw new DoesNotExistException("Source file does not exist: "+source, doesNotExist.getCause());
193         }
194     }
195 
196     private AbstractSyncLogicalFileImpl createSourceLogicalFile(URL source, int flags) throws NotImplementedException, AuthenticationFailedException, AuthorizationFailedException, PermissionDeniedException, BadParameterException, IncorrectStateException, DoesNotExistException, TimeoutException, NoSuccessException, IncorrectURLException {
197         int correctedFlags = flags;
198         correctedFlags = new FlagsHelper(correctedFlags).remove(JSAGAFlags.PRESERVETIMES, Flags.OVERWRITE);
199         try {
200             return (AbstractSyncLogicalFileImpl) LogicalFileFactory.createLogicalFile(JSAGA_FACTORY, m_session, source, correctedFlags);
201         } catch (AlreadyExistsException e) {
202             throw new NoSuccessException("Unexpected exception", e);
203         }
204     }
205 }