001/*
002 * The MIT License
003 * Copyright (c) 2012 Microsoft Corporation
004 *
005 * Permission is hereby granted, free of charge, to any person obtaining a copy
006 * of this software and associated documentation files (the "Software"), to deal
007 * in the Software without restriction, including without limitation the rights
008 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
009 * copies of the Software, and to permit persons to whom the Software is
010 * furnished to do so, subject to the following conditions:
011 *
012 * The above copyright notice and this permission notice shall be included in
013 * all copies or substantial portions of the Software.
014 *
015 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
016 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
017 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
018 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
019 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
020 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
021 * THE SOFTWARE.
022 */
023
024package microsoft.exchange.webservices.data.core.request;
025
026import microsoft.exchange.webservices.data.core.EwsServiceMultiResponseXmlReader;
027import microsoft.exchange.webservices.data.core.EwsServiceXmlReader;
028import microsoft.exchange.webservices.data.core.ExchangeService;
029import microsoft.exchange.webservices.data.core.enumeration.misc.HangingRequestDisconnectReason;
030import microsoft.exchange.webservices.data.core.enumeration.misc.TraceFlags;
031import microsoft.exchange.webservices.data.core.exception.http.EWSHttpException;
032import microsoft.exchange.webservices.data.core.exception.misc.ArgumentException;
033import microsoft.exchange.webservices.data.core.exception.service.local.ServiceVersionException;
034import microsoft.exchange.webservices.data.core.exception.service.local.ServiceXmlDeserializationException;
035import microsoft.exchange.webservices.data.core.exception.service.remote.ServiceRequestException;
036import microsoft.exchange.webservices.data.core.exception.xml.XmlException;
037import microsoft.exchange.webservices.data.misc.HangingTraceStream;
038import microsoft.exchange.webservices.data.security.XmlNodeType;
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041
042import javax.xml.stream.XMLStreamException;
043
044import java.io.ByteArrayOutputStream;
045import java.io.IOException;
046import java.io.InputStream;
047import java.io.ObjectStreamException;
048import java.net.SocketTimeoutException;
049import java.net.UnknownServiceException;
050import java.util.ArrayList;
051import java.util.List;
052import java.util.concurrent.ArrayBlockingQueue;
053import java.util.concurrent.ThreadPoolExecutor;
054import java.util.concurrent.TimeUnit;
055
056
057/**
058 * Represents an abstract, hanging service request.
059 */
060public abstract class HangingServiceRequestBase<T> extends ServiceRequestBase<T> {
061
062  private static final Log LOG = LogFactory.getLog(HangingServiceRequestBase.class);
063
064
065  public interface IHandleResponseObject {
066
067    /**
068     * Callback delegate to handle asynchronous response.
069     *
070     * @param response Response received from the server
071     * @throws ArgumentException
072     */
073    void handleResponseObject(Object response) throws ArgumentException;
074  }
075
076
077  public static final int BUFFER_SIZE = 4096;
078
079  /**
080   * Test switch to log all bytes that come across the wire.
081   * Helpful when parsing fails before certain bytes hit the trace logs.
082   */
083  private static volatile boolean logAllWireBytes = false;
084
085  /**
086   * Callback delegate to handle response objects
087   */
088  private IHandleResponseObject responseHandler;
089
090  /**
091   * Response from the server.
092   */
093  private HttpWebRequest response;
094
095  /**
096   * Expected minimum frequency in response, in milliseconds.
097   */
098  protected int heartbeatFrequencyMilliseconds;
099
100
101  public interface IHangingRequestDisconnectHandler {
102
103    /**
104     * Delegate method to handle a hanging request disconnection.
105     *
106     * @param sender the object invoking the delegate
107     * @param args event data
108     */
109    void hangingRequestDisconnectHandler(Object sender,
110        HangingRequestDisconnectEventArgs args);
111
112  }
113
114
115  public static boolean isLogAllWireBytes() {
116    return logAllWireBytes;
117  }
118
119  public static void setLogAllWireBytes(final boolean logAllWireBytes) {
120    HangingServiceRequestBase.logAllWireBytes = logAllWireBytes;
121  }
122
123  /**
124   * Disconnect events Occur when the hanging request is disconnected.
125   */
126  private List<IHangingRequestDisconnectHandler> onDisconnectList =
127      new ArrayList<IHangingRequestDisconnectHandler>();
128
129  /**
130   * Set event to happen when property disconnect.
131   *
132   * @param disconnect disconnect event
133   */
134  public void addOnDisconnectEvent(IHangingRequestDisconnectHandler disconnect) {
135    onDisconnectList.add(disconnect);
136  }
137
138  /**
139   * Remove the event from happening when property disconnect.
140   *
141   * @param disconnect disconnect event
142   */
143  protected void removeDisconnectEvent(
144      IHangingRequestDisconnectHandler disconnect) {
145    onDisconnectList.remove(disconnect);
146  }
147
148  /**
149   * Clears disconnect events list.
150   */
151  protected void clearDisconnectEvents() {
152    onDisconnectList.clear();
153  }
154
155  /**
156   * Initializes a new instance of the HangingServiceRequestBase class.
157   *
158   * @param service            The service.
159   * @param handler            Callback delegate to handle response objects
160   * @param heartbeatFrequency Frequency at which we expect heartbeats, in milliseconds.
161   */
162  protected HangingServiceRequestBase(ExchangeService service,
163      IHandleResponseObject handler, int heartbeatFrequency)
164      throws ServiceVersionException {
165    super(service);
166    this.responseHandler = handler;
167    this.heartbeatFrequencyMilliseconds = heartbeatFrequency;
168  }
169
170  /**
171   * Exectures the request.
172   */
173  public void internalExecute() throws Exception {
174    synchronized (this) {
175      this.response = this.validateAndEmitRequest();
176      this.internalOnConnect();
177    }
178  }
179
180  /**
181   * Parses the response.
182   *
183   */
184  private void parseResponses() {
185    HangingTraceStream tracingStream = null;
186    ByteArrayOutputStream responseCopy = null;
187
188
189    try {
190      boolean traceEWSResponse = this.getService().isTraceEnabledFor(TraceFlags.EwsResponse);
191      InputStream responseStream = this.response.getInputStream();
192      tracingStream = new HangingTraceStream(responseStream,
193          this.getService());
194      //EWSServiceMultiResponseXmlReader. Create causes a read.
195
196      if (traceEWSResponse) {
197        responseCopy = new ByteArrayOutputStream();
198        tracingStream.setResponseCopy(responseCopy);
199      }
200
201      while (this.isConnected()) {
202        T responseObject;
203        if (traceEWSResponse) {
204          EwsServiceMultiResponseXmlReader ewsXmlReader =
205              EwsServiceMultiResponseXmlReader.create(tracingStream, getService());
206          responseObject = this.readResponse(ewsXmlReader);
207          this.responseHandler.handleResponseObject(responseObject);
208
209          // reset the stream collector.
210          responseCopy.close();
211          responseCopy = new ByteArrayOutputStream();
212          tracingStream.setResponseCopy(responseCopy);
213
214        } else {
215          EwsServiceMultiResponseXmlReader ewsXmlReader =
216              EwsServiceMultiResponseXmlReader.create(tracingStream, getService());
217          responseObject = this.readResponse(ewsXmlReader);
218          this.responseHandler.handleResponseObject(responseObject);
219        }
220      }
221    } catch (SocketTimeoutException ex) {
222      // The connection timed out.
223      this.disconnect(HangingRequestDisconnectReason.Timeout, ex);
224    } catch (UnknownServiceException ex) {
225      // Stream is closed, so disconnect.
226      this.disconnect(HangingRequestDisconnectReason.Exception, ex);
227    } catch (ObjectStreamException ex) {
228      // Stream is closed, so disconnect.
229      this.disconnect(HangingRequestDisconnectReason.Exception, ex);
230    } catch (IOException ex) {
231      // Stream is closed, so disconnect.
232      this.disconnect(HangingRequestDisconnectReason.Exception, ex);
233    } catch (UnsupportedOperationException ex) {
234      LOG.error(ex);
235      // This is thrown if we close the stream during a
236      //read operation due to a user method call.
237      // Trying to delay closing until the read finishes
238      //simply results in a long-running connection.
239      this.disconnect(HangingRequestDisconnectReason.UserInitiated, null);
240    } catch (Exception ex) {
241      // Stream is closed, so disconnect.
242      this.disconnect(HangingRequestDisconnectReason.Exception, ex);
243    } finally {
244      if (responseCopy != null) {
245        try {
246          responseCopy.close();
247          responseCopy = null;
248        } catch (Exception ex) {
249          LOG.error(ex);
250        }
251      }
252    }
253  }
254
255  private boolean isConnected;
256
257  /**
258   * Gets a value indicating whether this instance is connected.
259   *
260   * @return true, if this instance is connected; otherwise, false
261   */
262  public boolean isConnected() {
263    return this.isConnected;
264  }
265
266  private void setIsConnected(boolean value) {
267    this.isConnected = value;
268  }
269
270  /**
271   * Disconnects the request.
272   */
273  public void disconnect() {
274    synchronized (this) {
275      try {
276        this.response.close();
277      } catch (IOException e) {
278        // Ignore exception on disconnection
279      }
280      this.disconnect(HangingRequestDisconnectReason.UserInitiated, null);
281    }
282  }
283
284  /**
285   * Disconnects the request with the specified reason and exception.
286   *
287   * @param reason    The reason.
288   * @param exception The exception.
289   */
290  public void disconnect(HangingRequestDisconnectReason reason, Exception exception) {
291    if (this.isConnected()) {
292      try {
293        this.response.close();
294      } catch (IOException e) {
295        // Ignore exception on disconnection
296      }
297      this.internalOnDisconnect(reason, exception);
298    }
299  }
300
301  /**
302   * Perform any bookkeeping needed when we connect
303   * @throws XMLStreamException the XML stream exception
304   */
305  private void internalOnConnect() throws XMLStreamException,
306      IOException, EWSHttpException {
307    if (!this.isConnected()) {
308      this.isConnected = true;
309
310      if (this.getService().isTraceEnabledFor(TraceFlags.EwsResponseHttpHeaders)) {
311        // Trace Http headers
312        this.getService().processHttpResponseHeaders(
313            TraceFlags.EwsResponseHttpHeaders,
314            this.response);
315      }
316      int poolSize = 1;
317
318      int maxPoolSize = 1;
319
320      long keepAliveTime = 10;
321
322      final ArrayBlockingQueue<Runnable> queue =
323          new ArrayBlockingQueue<Runnable>(
324              1);
325      ThreadPoolExecutor threadPool = new ThreadPoolExecutor(poolSize,
326          maxPoolSize,
327          keepAliveTime, TimeUnit.SECONDS, queue);
328      threadPool.execute(new Runnable() {
329        public void run() {
330          parseResponses();
331        }
332      });
333      threadPool.shutdown();
334    }
335  }
336
337  /**
338   * Perform any bookkeeping needed when we disconnect (cleanly or forcefully)
339   *
340   * @param reason    The reason.
341   * @param exception The exception.
342   */
343  private void internalOnDisconnect(HangingRequestDisconnectReason reason,
344      Exception exception) {
345    if (this.isConnected()) {
346      this.isConnected = false;
347      for (IHangingRequestDisconnectHandler disconnect : onDisconnectList) {
348        disconnect.hangingRequestDisconnectHandler(this,
349            new HangingRequestDisconnectEventArgs(reason, exception));
350      }
351    }
352  }
353
354  /**
355   * Reads any preamble data not part of the core response.
356   *
357   * @param ewsXmlReader The EwsServiceXmlReader.
358   * @throws Exception
359   */
360  @Override
361  protected void readPreamble(EwsServiceXmlReader ewsXmlReader)
362      throws Exception {
363    // Do nothing.
364    try {
365      ewsXmlReader.read(new XmlNodeType(XmlNodeType.START_DOCUMENT));
366    } catch (XmlException ex) {
367      throw new ServiceRequestException("The response received from the service didn't contain valid XML.", ex);
368    } catch (ServiceXmlDeserializationException ex) {
369      throw new ServiceRequestException("The response received from the service didn't contain valid XML.", ex);
370    }
371  }
372}