001/*
002 * The MIT License
003 * Copyright (c) 2012 Microsoft Corporation
004 *
005 * Permission is hereby granted, free of charge, to any person obtaining a copy
006 * of this software and associated documentation files (the "Software"), to deal
007 * in the Software without restriction, including without limitation the rights
008 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
009 * copies of the Software, and to permit persons to whom the Software is
010 * furnished to do so, subject to the following conditions:
011 *
012 * The above copyright notice and this permission notice shall be included in
013 * all copies or substantial portions of the Software.
014 *
015 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
016 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
017 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
018 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
019 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
020 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
021 * THE SOFTWARE.
022 */
023
024package microsoft.exchange.webservices.data.notification;
025
026import microsoft.exchange.webservices.data.core.EwsUtilities;
027import microsoft.exchange.webservices.data.core.ExchangeService;
028import microsoft.exchange.webservices.data.core.exception.misc.ArgumentNullException;
029import microsoft.exchange.webservices.data.core.request.GetStreamingEventsRequest;
030import microsoft.exchange.webservices.data.core.request.HangingRequestDisconnectEventArgs;
031import microsoft.exchange.webservices.data.core.request.HangingServiceRequestBase;
032import microsoft.exchange.webservices.data.core.response.GetStreamingEventsResponse;
033import microsoft.exchange.webservices.data.core.enumeration.misc.ExchangeVersion;
034import microsoft.exchange.webservices.data.core.enumeration.misc.error.ServiceError;
035import microsoft.exchange.webservices.data.core.enumeration.service.ServiceResult;
036import microsoft.exchange.webservices.data.core.exception.misc.ArgumentException;
037import microsoft.exchange.webservices.data.core.exception.misc.ArgumentOutOfRangeException;
038import microsoft.exchange.webservices.data.core.exception.service.local.ServiceLocalException;
039import microsoft.exchange.webservices.data.core.exception.service.remote.ServiceResponseException;
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042
043import java.io.Closeable;
044import java.util.ArrayList;
045import java.util.HashMap;
046import java.util.List;
047import java.util.Map;
048
049/**
050 * Represents a connection to an ongoing stream of events.
051 */
052public final class StreamingSubscriptionConnection implements Closeable,
053                                                              HangingServiceRequestBase.IHandleResponseObject,
054    HangingServiceRequestBase.IHangingRequestDisconnectHandler {
055
056  private static final Log LOG = LogFactory.getLog(StreamingSubscriptionConnection.class);
057
058  /**
059   * Mapping of streaming id to subscriptions currently on the connection.
060   */
061  private Map<String, StreamingSubscription> subscriptions;
062
063  /**
064   * connection lifetime, in minutes
065   */
066  private int connectionTimeout;
067
068  /**
069   * ExchangeService instance used to make the EWS call.
070   */
071  private ExchangeService session;
072
073  /**
074   * Value indicating whether the class is disposed.
075   */
076  private boolean isDisposed;
077
078  /**
079   * Currently used instance of a GetStreamingEventsRequest connected to EWS.
080   */
081  private GetStreamingEventsRequest currentHangingRequest;
082
083
084  public interface INotificationEventDelegate {
085    /**
086     * Represents a delegate that is invoked when notification are received
087     * from the server
088     *
089     * @param sender The StreamingSubscriptionConnection instance that received
090     *               the events.
091     * @param args   The event data.
092     */
093    void notificationEventDelegate(Object sender, NotificationEventArgs args);
094  }
095
096
097  /**
098   * Notification events Occurs when notification are received from the
099   * server.
100   */
101  private List<INotificationEventDelegate> onNotificationEvent = new ArrayList<INotificationEventDelegate>();
102
103  /**
104   * Set event to happen when property Notify.
105   *
106   * @param notificationEvent notification event
107   */
108  public void addOnNotificationEvent(
109      INotificationEventDelegate notificationEvent) {
110    onNotificationEvent.add(notificationEvent);
111  }
112
113  /**
114   * Remove the event from happening when property Notify.
115   *
116   * @param notificationEvent notification event
117   */
118  public void removeNotificationEvent(
119      INotificationEventDelegate notificationEvent) {
120    onNotificationEvent.remove(notificationEvent);
121  }
122
123  /**
124   * Clears notification events list.
125   */
126  public void clearNotificationEvent() {
127    onNotificationEvent.clear();
128  }
129
130  public interface ISubscriptionErrorDelegate {
131
132    /**
133     * Represents a delegate that is invoked when an error occurs within a
134     * streaming subscription connection.
135     *
136     * @param sender The StreamingSubscriptionConnection instance within which
137     *               the error occurred.
138     * @param args   The event data.
139     */
140    void subscriptionErrorDelegate(Object sender,
141        SubscriptionErrorEventArgs args);
142  }
143
144
145  /**
146   * Subscription events Occur when a subscription encounters an error.
147   */
148  private List<ISubscriptionErrorDelegate> onSubscriptionError = new ArrayList<ISubscriptionErrorDelegate>();
149
150  /**
151   * Set event to happen when property subscriptionError.
152   *
153   * @param subscriptionError subscription event
154   */
155  public void addOnSubscriptionError(
156      ISubscriptionErrorDelegate subscriptionError) {
157    onSubscriptionError.add(subscriptionError);
158  }
159
160  /**
161   * Remove the event from happening when property subscription.
162   *
163   * @param subscriptionError subscription event
164   */
165  public void removeSubscriptionError(
166      ISubscriptionErrorDelegate subscriptionError) {
167    onSubscriptionError.remove(subscriptionError);
168  }
169
170  /**
171   * Clears subscription events list.
172   */
173  public void clearSubscriptionError() {
174    onSubscriptionError.clear();
175  }
176
177  /**
178   * Disconnect events Occurs when a streaming subscription connection is
179   * disconnected from the server.
180   */
181  private List<ISubscriptionErrorDelegate> onDisconnect = new ArrayList<ISubscriptionErrorDelegate>();
182
183  /**
184   * Set event to happen when property disconnect.
185   *
186   * @param disconnect disconnect event
187   */
188  public void addOnDisconnect(ISubscriptionErrorDelegate disconnect) {
189    onDisconnect.add(disconnect);
190  }
191
192  /**
193   * Remove the event from happening when property disconnect.
194   *
195   * @param disconnect disconnect event
196   */
197  public void removeDisconnect(ISubscriptionErrorDelegate disconnect) {
198    onDisconnect.remove(disconnect);
199  }
200
201  /**
202   * Clears disconnect events list.
203   */
204  public void clearDisconnect() {
205    onDisconnect.clear();
206  }
207
208  /**
209   * Initializes a new instance of the StreamingSubscriptionConnection class.
210   *
211   * @param service  The ExchangeService instance this connection uses to connect
212   *                 to the server.
213   * @param lifetime The maximum time, in minutes, the connection will remain open.
214   *                 Lifetime must be between 1 and 30.
215   * @throws Exception
216   */
217  public StreamingSubscriptionConnection(ExchangeService service, int lifetime)
218      throws Exception {
219    EwsUtilities.validateParam(service, "service");
220
221    EwsUtilities.validateClassVersion(service,
222        ExchangeVersion.Exchange2010_SP1, this.getClass().getName());
223
224    if (lifetime < 1 || lifetime > 30) {
225      throw new ArgumentOutOfRangeException("lifetime");
226    }
227
228    this.session = service;
229    this.subscriptions = new HashMap<String, StreamingSubscription>();
230    this.connectionTimeout = lifetime;
231  }
232
233  /**
234   * Initializes a new instance of the StreamingSubscriptionConnection class.
235   *
236   * @param service       The ExchangeService instance this connection uses to connect
237   *                      to the server.
238   * @param subscriptions Iterable subcriptions
239   * @param lifetime      The maximum time, in minutes, the connection will remain open.
240   *                      Lifetime must be between 1 and 30.
241   * @throws Exception
242   */
243  public StreamingSubscriptionConnection(ExchangeService service,
244      Iterable<StreamingSubscription> subscriptions, int lifetime)
245      throws Exception {
246    this(service, lifetime);
247    EwsUtilities.validateParamCollection(subscriptions.iterator(), "subscriptions");
248    for (StreamingSubscription subscription : subscriptions) {
249      this.subscriptions.put(subscription.getId(), subscription);
250    }
251  }
252
253  /**
254   * Adds a subscription to this connection.
255   *
256   * @param subscription The subscription to add.
257   * @throws Exception Thrown when AddSubscription is called while connected.
258   */
259  public void addSubscription(StreamingSubscription subscription)
260      throws Exception {
261    this.throwIfDisposed();
262    EwsUtilities.validateParam(subscription, "subscription");
263    this.validateConnectionState(false, "Subscriptions can't be added to an open connection.");
264
265    synchronized (this) {
266      if (this.subscriptions.containsKey(subscription.getId())) {
267        return;
268      }
269      this.subscriptions.put(subscription.getId(), subscription);
270    }
271  }
272
273  /**
274   * Removes the specified streaming subscription from the connection.
275   *
276   * @param subscription The subscription to remove.
277   * @throws Exception Thrown when RemoveSubscription is called while connected.
278   */
279  public void removeSubscription(StreamingSubscription subscription)
280      throws Exception {
281    this.throwIfDisposed();
282
283    EwsUtilities.validateParam(subscription, "subscription");
284
285    this.validateConnectionState(false, "Subscriptions can't be removed from an open connection.");
286
287    synchronized (this) {
288      this.subscriptions.remove(subscription.getId());
289    }
290  }
291
292  /**
293   * Opens this connection so it starts receiving events from the server.This
294   * results in a long-standing call to EWS.
295   *
296   * @throws Exception
297   * @throws ServiceLocalException Thrown when Open is called while connected.
298   */
299  public void open() throws ServiceLocalException, Exception {
300    synchronized (this) {
301      this.throwIfDisposed();
302
303      this.validateConnectionState(false, "The connection has already opened.");
304
305      if (this.subscriptions.size() == 0) {
306        throw new ServiceLocalException(
307            "You must add at least one subscription to this connection before it can be opened.");
308      }
309
310      this.currentHangingRequest = new GetStreamingEventsRequest(
311          this.session, this, this.subscriptions.keySet(),
312          this.connectionTimeout);
313
314      this.currentHangingRequest.addOnDisconnectEvent(this);
315
316      this.currentHangingRequest.internalExecute();
317    }
318  }
319
320  /**
321   * Called when the request is disconnected.
322   *
323   * @param sender The sender.
324   * @param args   The Microsoft.Exchange.WebServices.Data.
325   *               HangingRequestDisconnectEventArgs instance containing the
326   *               event data.
327   */
328  private void onRequestDisconnect(Object sender,
329      HangingRequestDisconnectEventArgs args) {
330    this.internalOnDisconnect(args.getException());
331  }
332
333  /**
334   * Closes this connection so it stops receiving events from the server.This
335   * terminates a long-standing call to EWS.
336   */
337  public void close() {
338    synchronized (this) {
339      try {
340        this.throwIfDisposed();
341
342        this.validateConnectionState(true, "The connection is already closed.");
343
344        // Further down in the stack, this will result in a
345        // call to our OnRequestDisconnect event handler,
346        // doing the necessary cleanup.
347        this.currentHangingRequest.disconnect();
348      } catch (Exception e) {
349        LOG.error(e);
350      }
351    }
352  }
353
354  /**
355   * Internal helper method called when the request disconnects.
356   *
357   * @param ex The exception that caused the disconnection. May be null.
358   */
359  private void internalOnDisconnect(Exception ex) {
360    if (!onDisconnect.isEmpty()) {
361      for (ISubscriptionErrorDelegate disconnect : onDisconnect) {
362        disconnect.subscriptionErrorDelegate(this,
363            new SubscriptionErrorEventArgs(null, ex));
364      }
365    }
366    this.currentHangingRequest = null;
367  }
368
369  /**
370   * Gets a value indicating whether this connection is opened
371   *
372   * @throws Exception
373   */
374  public boolean getIsOpen() throws Exception {
375
376    this.throwIfDisposed();
377    if (this.currentHangingRequest == null) {
378      return false;
379    } else {
380      return this.currentHangingRequest.isConnected();
381    }
382
383  }
384
385  /**
386   * Validates the state of the connection.
387   *
388   * @param isConnectedExpected Value indicating whether we expect to be currently connected.
389   * @param errorMessage        The error message.
390   * @throws Exception
391   */
392  private void validateConnectionState(boolean isConnectedExpected,
393      String errorMessage) throws Exception {
394    if ((isConnectedExpected && !this.getIsOpen())
395        || (!isConnectedExpected && this.getIsOpen())) {
396      throw new ServiceLocalException(errorMessage);
397    }
398  }
399
400  /**
401   * Handles the service response object.
402   *
403   * @param response The response.
404   * @throws ArgumentException
405   */
406  private void handleServiceResponseObject(Object response)
407      throws ArgumentException {
408    GetStreamingEventsResponse gseResponse = (GetStreamingEventsResponse) response;
409
410    if (gseResponse == null) {
411      throw new ArgumentNullException("GetStreamingEventsResponse must not be null",
412                                      "GetStreamingEventsResponse");
413    } else {
414      if (gseResponse.getResult() == ServiceResult.Success
415          || gseResponse.getResult() == ServiceResult.Warning) {
416        if (gseResponse.getResults().getNotifications().size() > 0) {
417          // We got notification; dole them out.
418          this.issueNotificationEvents(gseResponse);
419        } else {
420          // // This was just a heartbeat, nothing to do here.
421        }
422      } else if (gseResponse.getResult() == ServiceResult.Error) {
423        if (gseResponse.getErrorSubscriptionIds() == null
424            || gseResponse.getErrorSubscriptionIds().size() == 0) {
425          // General error
426          this.issueGeneralFailure(gseResponse);
427        } else {
428          // subscription-specific errors
429          this.issueSubscriptionFailures(gseResponse);
430        }
431      }
432    }
433  }
434
435  /**
436   * Issues the subscription failures.
437   *
438   * @param gseResponse The GetStreamingEvents response.
439   */
440  private void issueSubscriptionFailures(
441      GetStreamingEventsResponse gseResponse) {
442    ServiceResponseException exception = new ServiceResponseException(
443        gseResponse);
444
445    for (String id : gseResponse.getErrorSubscriptionIds()) {
446      StreamingSubscription subscription = null;
447
448      synchronized (this) {
449        // Client can do any good or bad things in the below event
450        // handler
451        if (this.subscriptions != null
452            && this.subscriptions.containsKey(id)) {
453          subscription = this.subscriptions.get(id);
454        }
455
456      }
457      if (subscription != null) {
458        SubscriptionErrorEventArgs eventArgs = new SubscriptionErrorEventArgs(
459            subscription, exception);
460
461        if (!onSubscriptionError.isEmpty()) {
462          for (ISubscriptionErrorDelegate subError : onSubscriptionError) {
463            subError.subscriptionErrorDelegate(this, eventArgs);
464          }
465        }
466      }
467      if (gseResponse.getErrorCode() != ServiceError.ErrorMissedNotificationEvents) {
468        // Client can do any good or bad things in the above event
469        // handler
470        synchronized (this) {
471          if (this.subscriptions != null
472              && this.subscriptions.containsKey(id)) {
473            // We are no longer servicing the subscription.
474            this.subscriptions.remove(id);
475          }
476        }
477      }
478    }
479  }
480
481  /**
482   * Issues the general failure.
483   *
484   * @param gseResponse The GetStreamingEvents response.
485   */
486  private void issueGeneralFailure(GetStreamingEventsResponse gseResponse) {
487    SubscriptionErrorEventArgs eventArgs = new SubscriptionErrorEventArgs(
488        null, new ServiceResponseException(gseResponse));
489
490    if (!onSubscriptionError.isEmpty()) {
491      for (ISubscriptionErrorDelegate subError : onSubscriptionError) {
492        subError.subscriptionErrorDelegate(this, eventArgs);
493      }
494    }
495  }
496
497  /**
498   * Issues the notification events.
499   *
500   * @param gseResponse The GetStreamingEvents response.
501   */
502  private void issueNotificationEvents(GetStreamingEventsResponse gseResponse) {
503
504    for (GetStreamingEventsResults.NotificationGroup events : gseResponse
505        .getResults().getNotifications()) {
506      StreamingSubscription subscription = null;
507
508      synchronized (this) {
509        // Client can do any good or bad things in the below event
510        // handler
511        if (this.subscriptions != null
512            && this.subscriptions
513            .containsKey(events.subscriptionId)) {
514          subscription = this.subscriptions
515              .get(events.subscriptionId);
516        }
517      }
518      if (subscription != null) {
519        NotificationEventArgs eventArgs = new NotificationEventArgs(
520            subscription, events.events);
521
522        if (!onNotificationEvent.isEmpty()) {
523          for (INotificationEventDelegate notifyEvent : onNotificationEvent) {
524            notifyEvent.notificationEventDelegate(this, eventArgs);
525          }
526        }
527      }
528    }
529  }
530
531  /**
532   * Frees resources associated with this StreamingSubscriptionConnection.
533   */
534  public void dispose() {
535    synchronized (this) {
536      if (!this.isDisposed) {
537        if (this.currentHangingRequest != null) {
538          this.currentHangingRequest = null;
539        }
540
541        this.subscriptions = null;
542        this.session = null;
543
544        this.isDisposed = true;
545      }
546    }
547  }
548
549  /**
550   * Throws if disposed.
551   *
552   * @throws Exception
553   */
554  private void throwIfDisposed() throws Exception {
555    if (this.isDisposed) {
556      throw new Exception(this.getClass().getName());
557    }
558  }
559
560  @Override
561  public void handleResponseObject(Object response) throws ArgumentException {
562    this.handleServiceResponseObject(response);
563  }
564
565  @Override
566  public void hangingRequestDisconnectHandler(Object sender,
567      HangingRequestDisconnectEventArgs args) {
568    this.onRequestDisconnect(sender, args);
569  }
570
571}