Monday 30 July 2018

Preventing premature completion of an async pipeable operator in RxJS

I'm creating pipeable operators using RxJS 6, and am unclear about how to complete() the observer when the operation is asynchronous.

For a synchronous operation, the logic is simple. In the example below, all values from the source Observable will be passed to observer.next(), and after that observer.complete() is called.

const syncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => observer.next(x),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(syncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

For an asynchronous operation, however, I'm a bit at a loss. In the example below, the asynchronous operation is represented by a call to setTimeout(). Obviously, observer.complete() will be called before any of the values are passed to observer.next().

const asyncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => setTimeout(() => observer.next(x), 100),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

So the question is: what is the idiomatic RxJS approach to make it so that the call to observer.complete() is only made after all values are asynchronously passed to observer.next()? Should I be manually keeping track of pending calls or is there a more "reactive" solution?

(Note that the example above is a simplification of my actual code, and that the call to setTimeout() is meant to represent "any asynchronous operation". I'm looking for a general approach to dealing with async operations in pipeable operators, not advice on how to deal with delays or timeouts in RxJS.)



from Preventing premature completion of an async pipeable operator in RxJS

No comments:

Post a Comment