View Javadoc

1   package fr.in2p3.jsaga.impl.job.instance.stream;
2   
3   import fr.in2p3.jsaga.adaptor.job.control.interactive.*;
4   import fr.in2p3.jsaga.impl.job.instance.AbstractSyncJobImpl;
5   import org.ogf.saga.error.*;
6   
7   import java.io.*;
8   
9   /* ***************************************************
10  * *** Centre de Calcul de l'IN2P3 - Lyon (France) ***
11  * ***             http://cc.in2p3.fr/             ***
12  * ***************************************************
13  * File:   JobStdinOutputStream
14  * Author: Sylvain Reynaud (sreynaud@in2p3.fr)
15  * Date:   30 avr. 2008
16  * ***************************************************
17  * Description:                                      */
18  /**
19   *
20   */
21  public class JobStdinOutputStream extends Stdin {
22      protected AbstractSyncJobImpl m_job;
23      private JobIOGetterInteractive m_ioHandler;
24  
25      public JobStdinOutputStream(AbstractSyncJobImpl job) throws NotImplementedException, DoesNotExistException, TimeoutException, NoSuccessException {
26          m_job = job;
27          switch(m_job.getState()) {
28              case NEW:
29              case RUNNING:
30                  // OK
31                  break;
32              default:
33                  throw new DoesNotExistException("Stdin is not available because job is ended or suspended");
34          }
35      }
36  
37      public byte[] getBuffer() {
38          if (m_buffer != null) {
39              return m_buffer.toByteArray();
40          } else {
41              return new byte[0];
42          }
43      }
44      
45      public void openJobIOHandler(JobIOGetterInteractive ioHandler) throws NotImplementedException, PermissionDeniedException, TimeoutException, NoSuccessException {
46          m_ioHandler = ioHandler;
47  
48          // get stream
49          if (m_stream == null) {
50              m_stream = m_ioHandler.getStdin();
51          }
52  
53          // dump buffer to stream
54          if (m_buffer!=null && m_buffer.size()>0) {
55              try {
56                  m_stream.write(m_buffer.toByteArray());
57                  m_stream.close();
58              } catch (IOException e) {
59                  throw new NoSuccessException(e);
60              }
61          }
62      }
63  
64      /////////////////////////////////// interface OutputStream ///////////////////////////////////
65  
66      public void write(int b) throws IOException {this.getStream().write(b);}
67      public void write(byte[] b) throws IOException {this.getStream().write(b);}
68      public void write(byte[] b, int off, int len) throws IOException {this.getStream().write(b, off, len);}
69      public void flush() throws IOException {this.getStream().flush();}
70      public void close() throws IOException {this.getStream().close();}
71  
72      /////////////////////////////////////// private method ///////////////////////////////////////
73  
74      private ByteArrayOutputStream m_buffer;
75      private OutputStream m_stream;
76      private OutputStream getStream() throws IOException {
77          try {
78              switch(m_job.getState()) {
79                  case NEW:
80                      if (m_buffer == null) {
81                          m_buffer = new ByteArrayOutputStream();
82                      }
83                      return m_buffer;
84                  case RUNNING:
85                      if (m_stream == null) {
86                          if (m_ioHandler instanceof JobIOGetterInteractive) {
87                              m_stream = ((JobIOGetterInteractive)m_ioHandler).getStdin();
88                          } else if (m_ioHandler instanceof JobIOGetter || m_ioHandler instanceof JobIOSetter) {
89                              throw new NotImplementedException("Can not write to stdin because job is running and adaptor does not support job interactivity");
90                          }
91                      }
92                      return m_stream;
93                  default:
94                      throw new DoesNotExistException("Stdin is not available because job neither unsubmitted nor running");
95              }
96          } catch (Exception e) {
97              throw new IOException(e.getMessage());
98          }
99      }
100 }