How to handle execution update using ReactiveX Observables / Subjects?

I am writing an Angular application that uses the ReactiveX API to handle asynchronous operations. I used the API before in an Android project, and I really like how it simplifies parallel processing of tasks. But there is one thing that I'm not sure how to solve.

How to update the observer from the current task? The task in this case will take time to load / create a complex / large object, and I can return the intermediate progress, but not the object itself. An observable can only return one data type. Therefore, I know two possibilities.

  • Create an object that has a progress field and a data field. This object can simply be returned using Observable.onNext (object). The progress field will be updated with every onNext, while the data field is empty until the last onNext, which sets it to the loaded value.

  • Create two observable, observable data and observable progress. The observer must subscribe to the progress observed for progress updates, and to the observed data, which will be notified when the data is finally downloaded / created. They can also be copied together for a single subscription.

I used both methods, they both work, but I want to know if there is a single standard, a clean way, how to solve this problem. Of course, this could be brand new. Im open to every decision.

+4
source share
1 answer

After careful consideration, I use a solution similar to option two in my question. The main observable is the actual result of the operation. The HTTP request is in this case, but the file iteration example is similar. It is returned by the "work" function.

A second observer / subscriber can be added via the function parameter. This subscriber only deals with progress information. Thus, all operations are neutral and do not require type checking.

The second version of the work function, without a progress observer, can be used if updating the user interface is not required.

export class FileUploadService {

 doWork(formData: FormData, url: string): Subject<Response> {
    return this.privateDoWork(formData, url, null);
 }

 doWorkWithProgress(formData: FormData, url: string, progressObserver: Observer<number>): Subject<Response> {
    return this.privateDoWork(formData, url, progressObserver);
 }

 private privateDoWork(formData: FormData, url: string, progressObserver: Observer<number> | null): Subject<Response> {

     return Observable.create(resultObserver => {
     let xhr: XMLHttpRequest = new XMLHttpRequest();
     xhr.open("POST", url);

     xhr.onload = (evt) => {
         if (progressObserver) {
            progressObserver.next(1);
            progressObserver.complete();
            }
         resultObserver.next((<any>evt.target).response);
         resultObserver.complete()
     };
     xhr.upload.onprogress = (evt) => {
         if (progressObserver) {
            progressObserver.next(evt.loaded / evt.total);
         }

     };
     xhr.onabort = (evt) => resultObserver.error("Upload aborted by user");
     xhr.onerror = (evt) => resultObserver.error("Error");

     xhr.send(formData);
     });
 }

, . // .

 this.fileUploadService.doWorkWithProgress(this.chosenSerie.formData, url, new Subscriber((progress) => console.log(progress * 100)).subscribe(
    (result) => console.log(result),
    (error) => console.log(error),
    () => console.log("request Completed")
    );

"" . nececcary, .

Typescript, ReactiveX.

+1

Source: https://habr.com/ru/post/1685435/


All Articles