RxJava cannot be completed, how to avoid toBlocking ()

I am currently using RxJava on Android with Kotlin, but I have a problem and I cannot solve it without using toBlocking ().

I have a method in an employee service that returns an Observable>:

fun all(): Observable<List<Employee>>

That's all good, as this Observable emits a new list of employees whenever an employee changes. But I would like to create a PDF file from employees, which obviously should not be launched every time an employee changes. Also, I would like to return an object Completablefrom my PDF generator method. I want to add a title to my PDF file, and then iterate over the employees and calculate the salary of each employee who also returns Observable, and this is the place I'm using toBlocking right now. My current approach is this:

private fun generatePdf(outputStream: OutputStream): Completable {
    return employeeService.all().map { employees ->
        try {
                addHeaderToPDF()
                for (i in employees) {
                    val calculated = employeeService.calculateWage(i.id).toBlocking().first()
                    // Print calculated to PDF....
                }
                addFooterToPDF()
                return @map Completable.complete()
            }
            catch (e: Exception) {
                return @map Completable.error(e)
            }
        }.first().toCompletable()

Is there a way to make this code a little cleaner using RxJava?

Thanks in advance!

+4
source share
2 answers

: .


: blocking, .

: .

1:

- . . .

/**
 * @param employeesObservable
 * Stream of employees we're interested in.
 * @param wageProvider
 * Transformation function which takes an employee and returns a [Single] of their wage.
 * @return
 * Observable stream spitting individual [Pair]s of employees and their wages.
 */
fun getEmployeesAndWagesObservable(
        employeesObservable: Observable<Employee>,
        wageProvider: Function<Employee, Single<Int>>
): Observable<Pair<Employee, Int>>? {
    val employeesAndWagesObservable: Observable<Pair<Employee, Int>>

    // Each Employee from the original stream will be converted
    // to a Single<Pair<Employee, Int>> via flatMapSingle operator.
    // Remember, we need a stream and Single is a stream.
    employeesAndWagesObservable = employeesObservable.flatMapSingle { employee ->
        // We need to get a source of wage value for current employee.
        // That source emits a single Int or errors.
        val wageForEmployeeSingle: Single<Int> = wageProvider.apply(employee)

        // Once the wage from said source is loaded...
        val employeeAndWageSingle: Single<Pair<Employee, Int> = wageForEmployeeSingle.map { wage ->
            // ... construct a Pair<Employee, Int>
            employee to wage
        }

        // This code is not executed now. It will be executed for each Employee
        // after the original Observable<Employee> starts spitting out items.
        // After subscribing to the resulting observable.
        return@flatMapSingle employeeAndWageSingle
    }

    return employeesAndWagesObservable
}

, :

  • .
  • .
  • .

employeesObservable onComplete - onError.

:

, :

fun doStuff() {
    val employeesObservable = employeeService.all()
    val wageProvider = Function<Employee, Single<Int>> { employee ->
        // Don't listen to changes. Take first wage and use that.
        employeeService.calculateWage(employee.id).firstOrError()
    }

    val employeesAndWagesObservable = 
            getEmployeesAndWagesObservable(employeesObservable, wageProvider)

    // Subscribe...
}

:

1:

,

val blockingIterable = employeesAndWagesObservable.blockingIterable()
blockingIterable.forEach { ... }

. , , , .

2:

  • .map Pair<Employee, Int> PDF.
  • Observables Observable.fromCallable { ... }, PDF.
  • Observable.concat(headerObs, employeeDataObs, footerObs)
  • .subscribe PDF PDF-.
  • TODO:
    • PDF ( ),
    • ,
    • .
+2

:

    return employeeService.all().first()
            .doOnSubscribe { addHeaderToPDF() }
            .flatMapIterable { it }
            .flatMap { employeeService.calculateWage(it.id).first() }
            .doOnNext { printEmployeeWage(it) }
            .doOnCompleted { addFooterToPDF }
            .toCompletable()

, ?:)

0

Source: https://habr.com/ru/post/1666154/


All Articles