Rxjs the number of observed subscriptions

In my Angular 2 application, I have a lot of observables and subscriptions. Of course, I have to unsubscribe when I leave the page, but I'm trying to figure out if I can get the number of active subscribers. Just for debugging information or when I forget to unsubscribe.

Is there any such information in rxjs?

+10
source share
3 answers

Perhaps a little late, but you can take help rxjs-spy.

This solution is equivalent to the proposed one and, in my opinion, is better to maintain.

I usually include it globally in main.ts as a forget and forget strategy. You just have to:

  1. rxjs-spy
  2. main.ts : import { create } from 'rxjs-spy';
  3. rxjs-spy angular :

    
    if (environment.production) {
        enableProdMode();
    }
    else {
        //we enable RXjs Spy on non production bulds only
        const spy = create();
        // we call show for two purposes: first is to log to the console an empty snapshot so we can see that everything is working as expected, then to suppress unused variable usage (the latter is a convention on mine)
        spy.show();
    }
    
    
  4. :

    
    import { tag } from 'rxjs-spy/operators';
    
    ...
    
    // This is a sample method which asks for a "Product" entity. Product and this.http is omitted as the focus is on tagging the observable
    public getProductById(productId: number): Observable<Product> {
        let params = new HttpParams()
            .append('productId', productId.toString())
            ;
        // we tag the returned observable with the name 'getProductById' (this is a convention on mine, you can choose whatsoever name)
        return this.http.get<Product>(this.baseUrl + "api/product", { params: params }).pipe(tag("getProductById"));
    }
    
    
  5. rxjs, rxSpy.show(),

. - rxjs Spy.

( , -, , , )

+3

RxJS refCounting. refCount . refCount , . , , 0, .

- () . , , , , , (refCount 0 1). , , . , refCount 0, .

:

const {
  Observable
} = Rx;

let sourceObservable = Observable
  .create((observer) => {
    let count = 0;
    let interval = setInterval(() => {
      observer.next(count++)
    }, 700);

    setTimeout(() => {
      clearInterval(interval);
      observer.complete();
    }, 5500);

    return () => {
      clearInterval(interval);
      console.log('######## Source observable unsubscribed');
    }
  })
  .do((x) => console.log('#### Source emits: ' + x));

let subject = sourceObservable
  .share()
  //.do((x) => console.log('#### Subject emits: ' + x))
  ;

let pageOneObserver;
let pageTwoObserver;
let pageThreeObserver;

setTimeout(() => {
  console.log('pageOneObserver will subscribe');
  pageOneObserver = subject.subscribe({
    next: (x) => {
      console.log('pageOneObserver gets: ' + x);
    },
    complete: () => {
      console.log('pageOneObserver: complete');
    }
  });
}, 1000);

setTimeout(() => {
  console.log('pageTwoObserver will subscribe');
  pageTwoObserver = subject.subscribe({
    next: (x) => {
      console.log('pageTwoObserver gets: ' + x);
    },
    complete: () => {
      console.log('pageTwoObserver: complete');
    }
  });
}, 4000);

setTimeout(() => {
  console.log('pageOneObserver will unsubscribe');
  pageOneObserver.unsubscribe();
}, 7000);

setTimeout(() => {
  console.log('pageTwoObserver will unsubscribe');
  pageTwoObserver.unsubscribe();
}, 10000);

setTimeout(() => {
  console.log('pageThreeObserver will subscribe');
  pageThreeObserver = subject.subscribe({
    next: (x) => {
      console.log('pageThreeObserver gets: ' + x);
    },
    complete: () => {
      console.log('pageThreeObserver: complete');
    }
  });
}, 13000);

setTimeout(() => {
  console.log('pageThreeObserver will unsubscribe');
  pageThreeObserver.unsubscribe();
}, 16000);
<script src="https://unpkg.com/rxjs@5.1.1/bundles/Rx.min.js"></script>
Hide result

. :

sourceObservable.share();
// is the same as
sourceObservable.publish().refCount();
sourceObservable.publish().refCount();
// is the same as
sourceObservable.multicast(new Rx.Subject()).refCount();
sourceObservable.publishReplay().refCount();
// is the same as
sourceObservable.multicast(new Rx.ReplaySubject(1)).refCount();
sourceObservable.publishBehavior().refCount();
// is the same as
sourceObservable.multicast(new Rx.BehaviorSubject(0)).refCount();
sourceObservable.publishLast().refCount();
// is the same as
sourceObservable.multicast(new Rx.AsyncSubject()).refCount();

sourceObservable.share(); factory, , - , sourceObservable, . , factory .

, Rx.Subject(), , , ( , , ), :

const {
  Observable
} = Rx;

let sourceObservable = Observable
  .create((observer) => {
    let count = 0;
    let interval = setInterval(() => {
      observer.next(count++)
    }, 700);

    setTimeout(() => {
      clearInterval(interval);
      observer.complete();
    }, 5500);

    return () => {
      clearInterval(interval);
      console.log('######## Source observable unsubscribed');
    }
  })
  .do((x) => console.log('#### Source emits: ' + x));

/* You could return whatever subject instance you like here */
let subjectFactory = () => new Rx.ReplaySubject(1);

let subject = sourceObservable
	.multicast(subjectFactory)
	.refCount();
	//.do((x) => console.log('#### Subject emits: ' + x))
	;

let pageOneObserver;
let pageTwoObserver;
let pageThreeObserver;

setTimeout(() => {
  console.log('pageOneObserver will subscribe');
  pageOneObserver = subject.subscribe({
    next: (x) => {
      console.log('pageOneObserver gets: ' + x);
    },
    complete: () => {
      console.log('pageOneObserver: complete');
    }
  });
}, 1000);

setTimeout(() => {
  console.log('pageTwoObserver will subscribe');
  pageTwoObserver = subject.subscribe({
    next: (x) => {
      console.log('pageTwoObserver gets: ' + x);
    },
    complete: () => {
      console.log('pageTwoObserver: complete');
    }
  });
}, 4000);

setTimeout(() => {
  console.log('pageOneObserver will unsubscribe');
  pageOneObserver.unsubscribe();
}, 7000);

setTimeout(() => {
  console.log('pageTwoObserver will unsubscribe');
  pageTwoObserver.unsubscribe();
}, 10000);

setTimeout(() => {
  console.log('pageThreeObserver will subscribe');
  pageThreeObserver = subject.subscribe({
    next: (x) => {
      console.log('pageThreeObserver gets: ' + x);
    },
    complete: () => {
      console.log('pageThreeObserver: complete');
    }
  });
}, 13000);

setTimeout(() => {
  console.log('pageThreeObserver will unsubscribe');
  pageThreeObserver.unsubscribe();
}, 16000);
<script src="https://unpkg.com/rxjs@5.1.1/bundles/Rx.min.js"></script>
Hide result

-, - .

+2

...

function subscriberCount<T>(sourceObservable: Observable<T>, description: string) {
  let counter = 0;
  return Observable.create((subscriber: Subscriber<T>) => {
    const subscription = sourceObservable.subscribe(subscriber);
    counter++;
    console.log(`${description} subscriptions: ${counter}`);

    return () => {
      subscription.unsubscribe();
      counter--;
      console.log(`${description} subscriptions: ${counter}`);
    }
  });
}

:

const timer$ = subscriberCount(Observable.timer(1000), 'Timer');

, .

+1
source

Source: https://habr.com/ru/post/1653575/


All Articles