001/* 002 * The MIT License 003 * Copyright (c) 2012 Microsoft Corporation 004 * 005 * Permission is hereby granted, free of charge, to any person obtaining a copy 006 * of this software and associated documentation files (the "Software"), to deal 007 * in the Software without restriction, including without limitation the rights 008 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 009 * copies of the Software, and to permit persons to whom the Software is 010 * furnished to do so, subject to the following conditions: 011 * 012 * The above copyright notice and this permission notice shall be included in 013 * all copies or substantial portions of the Software. 014 * 015 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 016 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 017 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 018 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 019 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 020 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 021 * THE SOFTWARE. 022 */ 023 024package microsoft.exchange.webservices.data.core.request; 025 026import microsoft.exchange.webservices.data.core.EwsServiceMultiResponseXmlReader; 027import microsoft.exchange.webservices.data.core.EwsServiceXmlReader; 028import microsoft.exchange.webservices.data.core.ExchangeService; 029import microsoft.exchange.webservices.data.core.enumeration.misc.HangingRequestDisconnectReason; 030import microsoft.exchange.webservices.data.core.enumeration.misc.TraceFlags; 031import microsoft.exchange.webservices.data.core.exception.http.EWSHttpException; 032import microsoft.exchange.webservices.data.core.exception.misc.ArgumentException; 033import microsoft.exchange.webservices.data.core.exception.service.local.ServiceVersionException; 034import microsoft.exchange.webservices.data.core.exception.service.local.ServiceXmlDeserializationException; 035import microsoft.exchange.webservices.data.core.exception.service.remote.ServiceRequestException; 036import microsoft.exchange.webservices.data.core.exception.xml.XmlException; 037import microsoft.exchange.webservices.data.misc.HangingTraceStream; 038import microsoft.exchange.webservices.data.security.XmlNodeType; 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041 042import javax.xml.stream.XMLStreamException; 043 044import java.io.ByteArrayOutputStream; 045import java.io.IOException; 046import java.io.InputStream; 047import java.io.ObjectStreamException; 048import java.net.SocketTimeoutException; 049import java.net.UnknownServiceException; 050import java.util.ArrayList; 051import java.util.List; 052import java.util.concurrent.ArrayBlockingQueue; 053import java.util.concurrent.ThreadPoolExecutor; 054import java.util.concurrent.TimeUnit; 055 056 057/** 058 * Represents an abstract, hanging service request. 059 */ 060public abstract class HangingServiceRequestBase<T> extends ServiceRequestBase<T> { 061 062 private static final Log LOG = LogFactory.getLog(HangingServiceRequestBase.class); 063 064 065 public interface IHandleResponseObject { 066 067 /** 068 * Callback delegate to handle asynchronous response. 069 * 070 * @param response Response received from the server 071 * @throws ArgumentException 072 */ 073 void handleResponseObject(Object response) throws ArgumentException; 074 } 075 076 077 public static final int BUFFER_SIZE = 4096; 078 079 /** 080 * Test switch to log all bytes that come across the wire. 081 * Helpful when parsing fails before certain bytes hit the trace logs. 082 */ 083 private static volatile boolean logAllWireBytes = false; 084 085 /** 086 * Callback delegate to handle response objects 087 */ 088 private IHandleResponseObject responseHandler; 089 090 /** 091 * Response from the server. 092 */ 093 private HttpWebRequest response; 094 095 /** 096 * Expected minimum frequency in response, in milliseconds. 097 */ 098 protected int heartbeatFrequencyMilliseconds; 099 100 101 public interface IHangingRequestDisconnectHandler { 102 103 /** 104 * Delegate method to handle a hanging request disconnection. 105 * 106 * @param sender the object invoking the delegate 107 * @param args event data 108 */ 109 void hangingRequestDisconnectHandler(Object sender, 110 HangingRequestDisconnectEventArgs args); 111 112 } 113 114 115 public static boolean isLogAllWireBytes() { 116 return logAllWireBytes; 117 } 118 119 public static void setLogAllWireBytes(final boolean logAllWireBytes) { 120 HangingServiceRequestBase.logAllWireBytes = logAllWireBytes; 121 } 122 123 /** 124 * Disconnect events Occur when the hanging request is disconnected. 125 */ 126 private List<IHangingRequestDisconnectHandler> onDisconnectList = 127 new ArrayList<IHangingRequestDisconnectHandler>(); 128 129 /** 130 * Set event to happen when property disconnect. 131 * 132 * @param disconnect disconnect event 133 */ 134 public void addOnDisconnectEvent(IHangingRequestDisconnectHandler disconnect) { 135 onDisconnectList.add(disconnect); 136 } 137 138 /** 139 * Remove the event from happening when property disconnect. 140 * 141 * @param disconnect disconnect event 142 */ 143 protected void removeDisconnectEvent( 144 IHangingRequestDisconnectHandler disconnect) { 145 onDisconnectList.remove(disconnect); 146 } 147 148 /** 149 * Clears disconnect events list. 150 */ 151 protected void clearDisconnectEvents() { 152 onDisconnectList.clear(); 153 } 154 155 /** 156 * Initializes a new instance of the HangingServiceRequestBase class. 157 * 158 * @param service The service. 159 * @param handler Callback delegate to handle response objects 160 * @param heartbeatFrequency Frequency at which we expect heartbeats, in milliseconds. 161 */ 162 protected HangingServiceRequestBase(ExchangeService service, 163 IHandleResponseObject handler, int heartbeatFrequency) 164 throws ServiceVersionException { 165 super(service); 166 this.responseHandler = handler; 167 this.heartbeatFrequencyMilliseconds = heartbeatFrequency; 168 } 169 170 /** 171 * Exectures the request. 172 */ 173 public void internalExecute() throws Exception { 174 synchronized (this) { 175 this.response = this.validateAndEmitRequest(); 176 this.internalOnConnect(); 177 } 178 } 179 180 /** 181 * Parses the response. 182 * 183 */ 184 private void parseResponses() { 185 HangingTraceStream tracingStream = null; 186 ByteArrayOutputStream responseCopy = null; 187 188 189 try { 190 boolean traceEWSResponse = this.getService().isTraceEnabledFor(TraceFlags.EwsResponse); 191 InputStream responseStream = this.response.getInputStream(); 192 tracingStream = new HangingTraceStream(responseStream, 193 this.getService()); 194 //EWSServiceMultiResponseXmlReader. Create causes a read. 195 196 if (traceEWSResponse) { 197 responseCopy = new ByteArrayOutputStream(); 198 tracingStream.setResponseCopy(responseCopy); 199 } 200 201 while (this.isConnected()) { 202 T responseObject; 203 if (traceEWSResponse) { 204 EwsServiceMultiResponseXmlReader ewsXmlReader = 205 EwsServiceMultiResponseXmlReader.create(tracingStream, getService()); 206 responseObject = this.readResponse(ewsXmlReader); 207 this.responseHandler.handleResponseObject(responseObject); 208 209 // reset the stream collector. 210 responseCopy.close(); 211 responseCopy = new ByteArrayOutputStream(); 212 tracingStream.setResponseCopy(responseCopy); 213 214 } else { 215 EwsServiceMultiResponseXmlReader ewsXmlReader = 216 EwsServiceMultiResponseXmlReader.create(tracingStream, getService()); 217 responseObject = this.readResponse(ewsXmlReader); 218 this.responseHandler.handleResponseObject(responseObject); 219 } 220 } 221 } catch (SocketTimeoutException ex) { 222 // The connection timed out. 223 this.disconnect(HangingRequestDisconnectReason.Timeout, ex); 224 } catch (UnknownServiceException ex) { 225 // Stream is closed, so disconnect. 226 this.disconnect(HangingRequestDisconnectReason.Exception, ex); 227 } catch (ObjectStreamException ex) { 228 // Stream is closed, so disconnect. 229 this.disconnect(HangingRequestDisconnectReason.Exception, ex); 230 } catch (IOException ex) { 231 // Stream is closed, so disconnect. 232 this.disconnect(HangingRequestDisconnectReason.Exception, ex); 233 } catch (UnsupportedOperationException ex) { 234 LOG.error(ex); 235 // This is thrown if we close the stream during a 236 //read operation due to a user method call. 237 // Trying to delay closing until the read finishes 238 //simply results in a long-running connection. 239 this.disconnect(HangingRequestDisconnectReason.UserInitiated, null); 240 } catch (Exception ex) { 241 // Stream is closed, so disconnect. 242 this.disconnect(HangingRequestDisconnectReason.Exception, ex); 243 } finally { 244 if (responseCopy != null) { 245 try { 246 responseCopy.close(); 247 responseCopy = null; 248 } catch (Exception ex) { 249 LOG.error(ex); 250 } 251 } 252 } 253 } 254 255 private boolean isConnected; 256 257 /** 258 * Gets a value indicating whether this instance is connected. 259 * 260 * @return true, if this instance is connected; otherwise, false 261 */ 262 public boolean isConnected() { 263 return this.isConnected; 264 } 265 266 private void setIsConnected(boolean value) { 267 this.isConnected = value; 268 } 269 270 /** 271 * Disconnects the request. 272 */ 273 public void disconnect() { 274 synchronized (this) { 275 try { 276 this.response.close(); 277 } catch (IOException e) { 278 // Ignore exception on disconnection 279 } 280 this.disconnect(HangingRequestDisconnectReason.UserInitiated, null); 281 } 282 } 283 284 /** 285 * Disconnects the request with the specified reason and exception. 286 * 287 * @param reason The reason. 288 * @param exception The exception. 289 */ 290 public void disconnect(HangingRequestDisconnectReason reason, Exception exception) { 291 if (this.isConnected()) { 292 try { 293 this.response.close(); 294 } catch (IOException e) { 295 // Ignore exception on disconnection 296 } 297 this.internalOnDisconnect(reason, exception); 298 } 299 } 300 301 /** 302 * Perform any bookkeeping needed when we connect 303 * @throws XMLStreamException the XML stream exception 304 */ 305 private void internalOnConnect() throws XMLStreamException, 306 IOException, EWSHttpException { 307 if (!this.isConnected()) { 308 this.isConnected = true; 309 310 if (this.getService().isTraceEnabledFor(TraceFlags.EwsResponseHttpHeaders)) { 311 // Trace Http headers 312 this.getService().processHttpResponseHeaders( 313 TraceFlags.EwsResponseHttpHeaders, 314 this.response); 315 } 316 int poolSize = 1; 317 318 int maxPoolSize = 1; 319 320 long keepAliveTime = 10; 321 322 final ArrayBlockingQueue<Runnable> queue = 323 new ArrayBlockingQueue<Runnable>( 324 1); 325 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(poolSize, 326 maxPoolSize, 327 keepAliveTime, TimeUnit.SECONDS, queue); 328 threadPool.execute(new Runnable() { 329 public void run() { 330 parseResponses(); 331 } 332 }); 333 threadPool.shutdown(); 334 } 335 } 336 337 /** 338 * Perform any bookkeeping needed when we disconnect (cleanly or forcefully) 339 * 340 * @param reason The reason. 341 * @param exception The exception. 342 */ 343 private void internalOnDisconnect(HangingRequestDisconnectReason reason, 344 Exception exception) { 345 if (this.isConnected()) { 346 this.isConnected = false; 347 for (IHangingRequestDisconnectHandler disconnect : onDisconnectList) { 348 disconnect.hangingRequestDisconnectHandler(this, 349 new HangingRequestDisconnectEventArgs(reason, exception)); 350 } 351 } 352 } 353 354 /** 355 * Reads any preamble data not part of the core response. 356 * 357 * @param ewsXmlReader The EwsServiceXmlReader. 358 * @throws Exception 359 */ 360 @Override 361 protected void readPreamble(EwsServiceXmlReader ewsXmlReader) 362 throws Exception { 363 // Do nothing. 364 try { 365 ewsXmlReader.read(new XmlNodeType(XmlNodeType.START_DOCUMENT)); 366 } catch (XmlException ex) { 367 throw new ServiceRequestException("The response received from the service didn't contain valid XML.", ex); 368 } catch (ServiceXmlDeserializationException ex) { 369 throw new ServiceRequestException("The response received from the service didn't contain valid XML.", ex); 370 } 371 } 372}