Angular1 Observables

angular1

// Angular 1 - Observables:

Observables are a mix of Observer and Iterable design patterns.  Specifically, 
observable can be conceptualized as an immutable collection of data ordered in 
time, and iterated over similarly to collections such as arrays or lists.

let source = Rx.Observable.create(observer => {
  setTimeout(() => {
    observer.onNext(42);
    observer.onCompleted();
  }, 2000);
  console.log('Starting Observable Sequence');
});

let subscription = source.subscribe(
  value => console.log('Value:' + value),
  error => console.log(error),
  () => console.log('Completed Observable Sequence')
);

In the above code, we create an observable sequence named source.  This 
sequence emits a single value asynchronously using setTimeout() and then 
complete.  Then we subscribed to the observable sequence source, and provide 
an Observer represented by 3 callbacks.  Those callbacks are:

1. onNext: represents a function to be invoked when a new value is emitted onto 
   an observable sequence.
2. onError: represent a function to be invoked if an error occurs within an 
   observable sequence
3. onComplete: represents a function to be invoked when the observable sequence 
   completes.

let source = Rx.Observable.create(observer => {
  setTimeout(() => {
    try {
      observer.onNext(42);
      observer.onCompleted();
    } catch (error) {
      observer.onError(error);
    }
  }, 2000);
  console.log('Starting Observable Sequence');
});

In the above code, we added the try/catch block, and use the onError method to 
report the error, so that our onError callback is invoked.  If we do not do 
this, it would result in an uncaught error.  We could have a global error 
handling function that log the error, the file name, and line number where the 
error happpened, along with the call stack, but perhaps there are some 
situations where we should handle the error locally instead of using the 
global error handler.

let source = Rx.Observable.create(observer => {
  setTimeout(() => {
    try {
      console.log('In Timeout!');
      observer.onNext(42);
      observer.onCompleted();
    } catch (error) {
      observer.onError(error);
    }
  }, 2000);
  console.log('Starting Observable Sequence!');

  return onDispose = () => console.log('Stopping to listen to this observable sequence');
});

let subscription = source.subscribe(
  value => console.log('Value:' + value),
  error => console.log(error),
  () => console.log('Completed Observable Sequence!')
);

setTimeout(() => subscription.dispose(), 1000);

If we run the above code, we will notice that this observable did not emit any 
values because we invoked the dispose method after 1 second, and the observable 
is supposed to emit after 2 seconds.

When we invoke the subscribe method, it returns an instance of Disposable.  This 
allows us to call the dispose method on our subscription.  This is useful for 
any kind of clean-up that might be required.

The default behavior of an observable is to dispose the subscription as soon as 
onCompleted or onError messages are published.

Notice that our console.log statement within the setTimeout got invoked.  Even 
though our subscription was disposed of, the code within the setTimeout was 
still executed.  All that we achieved is that no values were emitted onto the 
observer to see.  We did not clean up our resource appropriately.  The correct 
implementation in this case would be to cancel the timeout:

let source = Rx.Observable.create(observer => {
  let timeoutId = setTimeout(() => {
    try {
      console.log('In Timeout!');
      observer.onNext(42);
      observer.onCompleted();
    } catch (error) {
      observer.onError(error);
    }
  }, 2000);

  console.log('Starting Observable Sequence!');

  return onDispose = () => {
    console.log('Releasing Resource of this Observable Sequence');
    clearTimeout(timeoutId);
  };
});

let subscription = source.subscribe(
  value => console.log('value: ' + value),
  error => console.log(error),
  () => console.log('Completed Observable Sequence!')
);

setTimeout(() => subscription.dispose(), 1000);

Both promises and observables provide us with abstractions that help us deal 
with the asynchronous nature of our applications.  However, there are important 
differences between observables and promises:

1. Observables are cancellable

2. Observables can be retried using one of the retry operators provided by the 
   API, such as retry and retryWhen.  On the other hand, in case of promises, 
   the caller must have access to the original function that returned the 
   promise in order to have a retry capability.

We can create observables from callbacks, promises, events, collections or by 
using many of the operator available on the API.

To create observables using interval and take:

Rx.Observable.interval(1000).take(5).subscribe(
  element => console.info(element),
  error => console.info(error),
  () => console.info('I am done')
);

The above observable will produce a value every 1 second, and only the first 5 
values will be emitted due to the use of take, otherwise the sequence will emit 
values indefinitely.

There are many more operators available within the API, such as range, timer.

To create observables using fromArray:

Rx.Observable.fromArray([1,2,3]).subscribe(
  element => console.info(element),
  error => console.info(error),
  () => console.info('I am done')
);

In the above code, we just pass the raw array [1,2,3] to fromArray.  It can be 
another array such as [0,1,2,3,4,5,6] or a variable that is an array.

To create an observable using fromPromise:

let promise = new Promise((resolve, reject) => resolve(42));

Rx.Observable.fromPromise(promise).subscribe(
  (value) => console.log(value)
);

In the above code, we specify only the first callback.  We ignored the onError 
callback and the onComplete callback.  Also note that, in the above case, we 
create an observable using fromPromise, and then we directly call the subscribe 
method from it.  The callbacks that we passed to subscribe are not parameters 
that are used to create the observable, and the subscribe method is not what is 
creating the observable.

To create an observable using fromEvent:

Rx.Observable.fromEvent(document, 'click').subscribe(
  clickEvent => console.info(
    clickEvent.clientX + ', ' + clickEvent.clientY
  )
);

Everything can be made into a stream using observables.

We can perform other operations such as filter or map, and many more as defined 
in the RxJS API.  This is what bridges observable with the iterable pattern, 
and lets us conceptualize them as a collections.

Rx.Observable.fromEvent(document, 'click')
  .filter(clickEvent: MouseEvent => clickEvent.altKey)
  .subscribe(clickEvent: MouseEvent => console.info(
    clickEvent.clientX + ', ' + clickEvent.clientY
  );

Note the chaining function style.  Functions like filter, returns an observable 
much like how promises can be chained (like multiple .then invocations).

To make asynchronous requests using Observables:

let responseStream = Rx.Observable.create(observer => {
  $http.get('http://someurl')
    .then(response => observer.onNext(response))
    .then(null, error => observer.oError(error));
});

responseStream.subscribe(
  (response) => $log.info('Data:', response);
  (error) => $log.info('Error: ', error);
);

The traditional way to make an asynchronous request with Angular:

this.$http.get('http://someurl')
  .then((response) => {
    this.$log.info(response.data);
    this.tasks = response.data;
  })
  .then(null, (error) => this.$log.error(status, error));

With the approach of using observable, we created a custom response data stream 
and notifying the observers of the stream when the data arrived.

We can also create an observable from a promise:

let responseStream = Rx.Observable.fromPromise(
  $http.get('http://someurl')
);

responseStream.subscribe(
  (response) => $log.info('Data:', response),
  (error) => $log.error('Error:', error)
);

To use an observable stream with flatMap:

let eventStream = Rx.Observable.formEvent(document.body, 'click');
let responseStream = eventStream
  .flatMap(() => Rx.Observable.fromPromise(
    $http.get('http://someurl')
  ));
responseStream.subscribe(
  (response) => $log.info('Data:', response),
  (error) => $log.info('Error:', error)
)

In the above code, first, we create an observable of button click events on 
some button.  Then we use the flatMap function to transform our event stream 
into our response stream.  The flatMap function flattens a stream of observables 
(observable of observables) to a stream of emitted values (a simple observable).  
Alternatively, if we were to use the map function instead, we would create a 
meta stream (a stream of stream).

let metaStream = eventStream
  .map(() => Rx.Observable.fromPromise(
    $http.get(http://someurl')
  ));
// We would have to subscribe to each stream received below
// to achieve the same behavior
metaStream.subscribe(
  (stream) => $log.info('Data:', stream),
  (error) => $log.info('Error:', error)
);

A pre-recorded video is an example of a cold observable.  You press play and the 
movie starts playing from the beginning.  Someone else can start playing the 
same movie 25 minutes later.  On the other hand, a live concert performance is 
an example of a hot observable.  If you arrive at the concert 25 minute late, 
the band does not perform again from the beginning, and you have to watch the 
performance from where it is.

The publish method within the RxJS API, takes a cold observable and return an 
instance of a ConnectableObservable, and we will have to explicitly call connect 
on our hot observable to start broadcasting values to its subscribers.

let source = Rx.Observable.interval(1000).take(7).publish();

setTimeout(() => {
  source.connect();
}, 1000);

setTimeout(() => {
  source.subscribe(
    value => console.log('Subscription A:' + value)
  );
}, 0);

setTimeout(() => {
  source.subscribe(
    value => console.log('Subscription B:' + value)
  );
}, 5000);

In the above case, the live performance starts at 1 second.  Subscriber A 
arrive to the concert hall 1 second early to get a good seat, and Subscriber B 
arrived 4 seconds late and missed a bunch of songs.

Another useful method to work with hot observables instead of connect is 
refCount.  This is an "auto connect" method, that will start broadcasting as 
soon as there is more than one subscriber.  It will stop if the number of 
subscriber goes to 0.

We need to clean up our subscription when it is no longer needed.  Because we 
need to dispose of our subscription, we need to keep a reference to them when we 
subscribe:

let tasksSubscription = tasksStore.tasks
  .subscribe(tasks => this.tasks = tasks);

Now we can use the $on method of the scope object to subscribe to the $destroy 
event of our component:

constructor(
  private $log,
  private $scope,
  private tasksStore
) {
  ...
  this.$scope.$on('$destroy', () => {
    tasksSubscription.dispose();
  });
}
Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License