001/* 002 * The MIT License 003 * Copyright (c) 2012 Microsoft Corporation 004 * 005 * Permission is hereby granted, free of charge, to any person obtaining a copy 006 * of this software and associated documentation files (the "Software"), to deal 007 * in the Software without restriction, including without limitation the rights 008 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 009 * copies of the Software, and to permit persons to whom the Software is 010 * furnished to do so, subject to the following conditions: 011 * 012 * The above copyright notice and this permission notice shall be included in 013 * all copies or substantial portions of the Software. 014 * 015 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 016 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 017 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 018 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 019 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 020 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 021 * THE SOFTWARE. 022 */ 023 024package microsoft.exchange.webservices.data.notification; 025 026import microsoft.exchange.webservices.data.core.EwsUtilities; 027import microsoft.exchange.webservices.data.core.ExchangeService; 028import microsoft.exchange.webservices.data.core.exception.misc.ArgumentNullException; 029import microsoft.exchange.webservices.data.core.request.GetStreamingEventsRequest; 030import microsoft.exchange.webservices.data.core.request.HangingRequestDisconnectEventArgs; 031import microsoft.exchange.webservices.data.core.request.HangingServiceRequestBase; 032import microsoft.exchange.webservices.data.core.response.GetStreamingEventsResponse; 033import microsoft.exchange.webservices.data.core.enumeration.misc.ExchangeVersion; 034import microsoft.exchange.webservices.data.core.enumeration.misc.error.ServiceError; 035import microsoft.exchange.webservices.data.core.enumeration.service.ServiceResult; 036import microsoft.exchange.webservices.data.core.exception.misc.ArgumentException; 037import microsoft.exchange.webservices.data.core.exception.misc.ArgumentOutOfRangeException; 038import microsoft.exchange.webservices.data.core.exception.service.local.ServiceLocalException; 039import microsoft.exchange.webservices.data.core.exception.service.remote.ServiceResponseException; 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042 043import java.io.Closeable; 044import java.util.ArrayList; 045import java.util.HashMap; 046import java.util.List; 047import java.util.Map; 048 049/** 050 * Represents a connection to an ongoing stream of events. 051 */ 052public final class StreamingSubscriptionConnection implements Closeable, 053 HangingServiceRequestBase.IHandleResponseObject, 054 HangingServiceRequestBase.IHangingRequestDisconnectHandler { 055 056 private static final Log LOG = LogFactory.getLog(StreamingSubscriptionConnection.class); 057 058 /** 059 * Mapping of streaming id to subscriptions currently on the connection. 060 */ 061 private Map<String, StreamingSubscription> subscriptions; 062 063 /** 064 * connection lifetime, in minutes 065 */ 066 private int connectionTimeout; 067 068 /** 069 * ExchangeService instance used to make the EWS call. 070 */ 071 private ExchangeService session; 072 073 /** 074 * Value indicating whether the class is disposed. 075 */ 076 private boolean isDisposed; 077 078 /** 079 * Currently used instance of a GetStreamingEventsRequest connected to EWS. 080 */ 081 private GetStreamingEventsRequest currentHangingRequest; 082 083 084 public interface INotificationEventDelegate { 085 /** 086 * Represents a delegate that is invoked when notification are received 087 * from the server 088 * 089 * @param sender The StreamingSubscriptionConnection instance that received 090 * the events. 091 * @param args The event data. 092 */ 093 void notificationEventDelegate(Object sender, NotificationEventArgs args); 094 } 095 096 097 /** 098 * Notification events Occurs when notification are received from the 099 * server. 100 */ 101 private List<INotificationEventDelegate> onNotificationEvent = new ArrayList<INotificationEventDelegate>(); 102 103 /** 104 * Set event to happen when property Notify. 105 * 106 * @param notificationEvent notification event 107 */ 108 public void addOnNotificationEvent( 109 INotificationEventDelegate notificationEvent) { 110 onNotificationEvent.add(notificationEvent); 111 } 112 113 /** 114 * Remove the event from happening when property Notify. 115 * 116 * @param notificationEvent notification event 117 */ 118 public void removeNotificationEvent( 119 INotificationEventDelegate notificationEvent) { 120 onNotificationEvent.remove(notificationEvent); 121 } 122 123 /** 124 * Clears notification events list. 125 */ 126 public void clearNotificationEvent() { 127 onNotificationEvent.clear(); 128 } 129 130 public interface ISubscriptionErrorDelegate { 131 132 /** 133 * Represents a delegate that is invoked when an error occurs within a 134 * streaming subscription connection. 135 * 136 * @param sender The StreamingSubscriptionConnection instance within which 137 * the error occurred. 138 * @param args The event data. 139 */ 140 void subscriptionErrorDelegate(Object sender, 141 SubscriptionErrorEventArgs args); 142 } 143 144 145 /** 146 * Subscription events Occur when a subscription encounters an error. 147 */ 148 private List<ISubscriptionErrorDelegate> onSubscriptionError = new ArrayList<ISubscriptionErrorDelegate>(); 149 150 /** 151 * Set event to happen when property subscriptionError. 152 * 153 * @param subscriptionError subscription event 154 */ 155 public void addOnSubscriptionError( 156 ISubscriptionErrorDelegate subscriptionError) { 157 onSubscriptionError.add(subscriptionError); 158 } 159 160 /** 161 * Remove the event from happening when property subscription. 162 * 163 * @param subscriptionError subscription event 164 */ 165 public void removeSubscriptionError( 166 ISubscriptionErrorDelegate subscriptionError) { 167 onSubscriptionError.remove(subscriptionError); 168 } 169 170 /** 171 * Clears subscription events list. 172 */ 173 public void clearSubscriptionError() { 174 onSubscriptionError.clear(); 175 } 176 177 /** 178 * Disconnect events Occurs when a streaming subscription connection is 179 * disconnected from the server. 180 */ 181 private List<ISubscriptionErrorDelegate> onDisconnect = new ArrayList<ISubscriptionErrorDelegate>(); 182 183 /** 184 * Set event to happen when property disconnect. 185 * 186 * @param disconnect disconnect event 187 */ 188 public void addOnDisconnect(ISubscriptionErrorDelegate disconnect) { 189 onDisconnect.add(disconnect); 190 } 191 192 /** 193 * Remove the event from happening when property disconnect. 194 * 195 * @param disconnect disconnect event 196 */ 197 public void removeDisconnect(ISubscriptionErrorDelegate disconnect) { 198 onDisconnect.remove(disconnect); 199 } 200 201 /** 202 * Clears disconnect events list. 203 */ 204 public void clearDisconnect() { 205 onDisconnect.clear(); 206 } 207 208 /** 209 * Initializes a new instance of the StreamingSubscriptionConnection class. 210 * 211 * @param service The ExchangeService instance this connection uses to connect 212 * to the server. 213 * @param lifetime The maximum time, in minutes, the connection will remain open. 214 * Lifetime must be between 1 and 30. 215 * @throws Exception 216 */ 217 public StreamingSubscriptionConnection(ExchangeService service, int lifetime) 218 throws Exception { 219 EwsUtilities.validateParam(service, "service"); 220 221 EwsUtilities.validateClassVersion(service, 222 ExchangeVersion.Exchange2010_SP1, this.getClass().getName()); 223 224 if (lifetime < 1 || lifetime > 30) { 225 throw new ArgumentOutOfRangeException("lifetime"); 226 } 227 228 this.session = service; 229 this.subscriptions = new HashMap<String, StreamingSubscription>(); 230 this.connectionTimeout = lifetime; 231 } 232 233 /** 234 * Initializes a new instance of the StreamingSubscriptionConnection class. 235 * 236 * @param service The ExchangeService instance this connection uses to connect 237 * to the server. 238 * @param subscriptions Iterable subcriptions 239 * @param lifetime The maximum time, in minutes, the connection will remain open. 240 * Lifetime must be between 1 and 30. 241 * @throws Exception 242 */ 243 public StreamingSubscriptionConnection(ExchangeService service, 244 Iterable<StreamingSubscription> subscriptions, int lifetime) 245 throws Exception { 246 this(service, lifetime); 247 EwsUtilities.validateParamCollection(subscriptions.iterator(), "subscriptions"); 248 for (StreamingSubscription subscription : subscriptions) { 249 this.subscriptions.put(subscription.getId(), subscription); 250 } 251 } 252 253 /** 254 * Adds a subscription to this connection. 255 * 256 * @param subscription The subscription to add. 257 * @throws Exception Thrown when AddSubscription is called while connected. 258 */ 259 public void addSubscription(StreamingSubscription subscription) 260 throws Exception { 261 this.throwIfDisposed(); 262 EwsUtilities.validateParam(subscription, "subscription"); 263 this.validateConnectionState(false, "Subscriptions can't be added to an open connection."); 264 265 synchronized (this) { 266 if (this.subscriptions.containsKey(subscription.getId())) { 267 return; 268 } 269 this.subscriptions.put(subscription.getId(), subscription); 270 } 271 } 272 273 /** 274 * Removes the specified streaming subscription from the connection. 275 * 276 * @param subscription The subscription to remove. 277 * @throws Exception Thrown when RemoveSubscription is called while connected. 278 */ 279 public void removeSubscription(StreamingSubscription subscription) 280 throws Exception { 281 this.throwIfDisposed(); 282 283 EwsUtilities.validateParam(subscription, "subscription"); 284 285 this.validateConnectionState(false, "Subscriptions can't be removed from an open connection."); 286 287 synchronized (this) { 288 this.subscriptions.remove(subscription.getId()); 289 } 290 } 291 292 /** 293 * Opens this connection so it starts receiving events from the server.This 294 * results in a long-standing call to EWS. 295 * 296 * @throws Exception 297 * @throws ServiceLocalException Thrown when Open is called while connected. 298 */ 299 public void open() throws ServiceLocalException, Exception { 300 synchronized (this) { 301 this.throwIfDisposed(); 302 303 this.validateConnectionState(false, "The connection has already opened."); 304 305 if (this.subscriptions.size() == 0) { 306 throw new ServiceLocalException( 307 "You must add at least one subscription to this connection before it can be opened."); 308 } 309 310 this.currentHangingRequest = new GetStreamingEventsRequest( 311 this.session, this, this.subscriptions.keySet(), 312 this.connectionTimeout); 313 314 this.currentHangingRequest.addOnDisconnectEvent(this); 315 316 this.currentHangingRequest.internalExecute(); 317 } 318 } 319 320 /** 321 * Called when the request is disconnected. 322 * 323 * @param sender The sender. 324 * @param args The Microsoft.Exchange.WebServices.Data. 325 * HangingRequestDisconnectEventArgs instance containing the 326 * event data. 327 */ 328 private void onRequestDisconnect(Object sender, 329 HangingRequestDisconnectEventArgs args) { 330 this.internalOnDisconnect(args.getException()); 331 } 332 333 /** 334 * Closes this connection so it stops receiving events from the server.This 335 * terminates a long-standing call to EWS. 336 */ 337 public void close() { 338 synchronized (this) { 339 try { 340 this.throwIfDisposed(); 341 342 this.validateConnectionState(true, "The connection is already closed."); 343 344 // Further down in the stack, this will result in a 345 // call to our OnRequestDisconnect event handler, 346 // doing the necessary cleanup. 347 this.currentHangingRequest.disconnect(); 348 } catch (Exception e) { 349 LOG.error(e); 350 } 351 } 352 } 353 354 /** 355 * Internal helper method called when the request disconnects. 356 * 357 * @param ex The exception that caused the disconnection. May be null. 358 */ 359 private void internalOnDisconnect(Exception ex) { 360 if (!onDisconnect.isEmpty()) { 361 for (ISubscriptionErrorDelegate disconnect : onDisconnect) { 362 disconnect.subscriptionErrorDelegate(this, 363 new SubscriptionErrorEventArgs(null, ex)); 364 } 365 } 366 this.currentHangingRequest = null; 367 } 368 369 /** 370 * Gets a value indicating whether this connection is opened 371 * 372 * @throws Exception 373 */ 374 public boolean getIsOpen() throws Exception { 375 376 this.throwIfDisposed(); 377 if (this.currentHangingRequest == null) { 378 return false; 379 } else { 380 return this.currentHangingRequest.isConnected(); 381 } 382 383 } 384 385 /** 386 * Validates the state of the connection. 387 * 388 * @param isConnectedExpected Value indicating whether we expect to be currently connected. 389 * @param errorMessage The error message. 390 * @throws Exception 391 */ 392 private void validateConnectionState(boolean isConnectedExpected, 393 String errorMessage) throws Exception { 394 if ((isConnectedExpected && !this.getIsOpen()) 395 || (!isConnectedExpected && this.getIsOpen())) { 396 throw new ServiceLocalException(errorMessage); 397 } 398 } 399 400 /** 401 * Handles the service response object. 402 * 403 * @param response The response. 404 * @throws ArgumentException 405 */ 406 private void handleServiceResponseObject(Object response) 407 throws ArgumentException { 408 GetStreamingEventsResponse gseResponse = (GetStreamingEventsResponse) response; 409 410 if (gseResponse == null) { 411 throw new ArgumentNullException("GetStreamingEventsResponse must not be null", 412 "GetStreamingEventsResponse"); 413 } else { 414 if (gseResponse.getResult() == ServiceResult.Success 415 || gseResponse.getResult() == ServiceResult.Warning) { 416 if (gseResponse.getResults().getNotifications().size() > 0) { 417 // We got notification; dole them out. 418 this.issueNotificationEvents(gseResponse); 419 } else { 420 // // This was just a heartbeat, nothing to do here. 421 } 422 } else if (gseResponse.getResult() == ServiceResult.Error) { 423 if (gseResponse.getErrorSubscriptionIds() == null 424 || gseResponse.getErrorSubscriptionIds().size() == 0) { 425 // General error 426 this.issueGeneralFailure(gseResponse); 427 } else { 428 // subscription-specific errors 429 this.issueSubscriptionFailures(gseResponse); 430 } 431 } 432 } 433 } 434 435 /** 436 * Issues the subscription failures. 437 * 438 * @param gseResponse The GetStreamingEvents response. 439 */ 440 private void issueSubscriptionFailures( 441 GetStreamingEventsResponse gseResponse) { 442 ServiceResponseException exception = new ServiceResponseException( 443 gseResponse); 444 445 for (String id : gseResponse.getErrorSubscriptionIds()) { 446 StreamingSubscription subscription = null; 447 448 synchronized (this) { 449 // Client can do any good or bad things in the below event 450 // handler 451 if (this.subscriptions != null 452 && this.subscriptions.containsKey(id)) { 453 subscription = this.subscriptions.get(id); 454 } 455 456 } 457 if (subscription != null) { 458 SubscriptionErrorEventArgs eventArgs = new SubscriptionErrorEventArgs( 459 subscription, exception); 460 461 if (!onSubscriptionError.isEmpty()) { 462 for (ISubscriptionErrorDelegate subError : onSubscriptionError) { 463 subError.subscriptionErrorDelegate(this, eventArgs); 464 } 465 } 466 } 467 if (gseResponse.getErrorCode() != ServiceError.ErrorMissedNotificationEvents) { 468 // Client can do any good or bad things in the above event 469 // handler 470 synchronized (this) { 471 if (this.subscriptions != null 472 && this.subscriptions.containsKey(id)) { 473 // We are no longer servicing the subscription. 474 this.subscriptions.remove(id); 475 } 476 } 477 } 478 } 479 } 480 481 /** 482 * Issues the general failure. 483 * 484 * @param gseResponse The GetStreamingEvents response. 485 */ 486 private void issueGeneralFailure(GetStreamingEventsResponse gseResponse) { 487 SubscriptionErrorEventArgs eventArgs = new SubscriptionErrorEventArgs( 488 null, new ServiceResponseException(gseResponse)); 489 490 if (!onSubscriptionError.isEmpty()) { 491 for (ISubscriptionErrorDelegate subError : onSubscriptionError) { 492 subError.subscriptionErrorDelegate(this, eventArgs); 493 } 494 } 495 } 496 497 /** 498 * Issues the notification events. 499 * 500 * @param gseResponse The GetStreamingEvents response. 501 */ 502 private void issueNotificationEvents(GetStreamingEventsResponse gseResponse) { 503 504 for (GetStreamingEventsResults.NotificationGroup events : gseResponse 505 .getResults().getNotifications()) { 506 StreamingSubscription subscription = null; 507 508 synchronized (this) { 509 // Client can do any good or bad things in the below event 510 // handler 511 if (this.subscriptions != null 512 && this.subscriptions 513 .containsKey(events.subscriptionId)) { 514 subscription = this.subscriptions 515 .get(events.subscriptionId); 516 } 517 } 518 if (subscription != null) { 519 NotificationEventArgs eventArgs = new NotificationEventArgs( 520 subscription, events.events); 521 522 if (!onNotificationEvent.isEmpty()) { 523 for (INotificationEventDelegate notifyEvent : onNotificationEvent) { 524 notifyEvent.notificationEventDelegate(this, eventArgs); 525 } 526 } 527 } 528 } 529 } 530 531 /** 532 * Frees resources associated with this StreamingSubscriptionConnection. 533 */ 534 public void dispose() { 535 synchronized (this) { 536 if (!this.isDisposed) { 537 if (this.currentHangingRequest != null) { 538 this.currentHangingRequest = null; 539 } 540 541 this.subscriptions = null; 542 this.session = null; 543 544 this.isDisposed = true; 545 } 546 } 547 } 548 549 /** 550 * Throws if disposed. 551 * 552 * @throws Exception 553 */ 554 private void throwIfDisposed() throws Exception { 555 if (this.isDisposed) { 556 throw new Exception(this.getClass().getName()); 557 } 558 } 559 560 @Override 561 public void handleResponseObject(Object response) throws ArgumentException { 562 this.handleServiceResponseObject(response); 563 } 564 565 @Override 566 public void hangingRequestDisconnectHandler(Object sender, 567 HangingRequestDisconnectEventArgs args) { 568 this.onRequestDisconnect(sender, args); 569 } 570 571}