No object back loop in RX

I have the following equations of motion

move = target_position - position
position = position + move

where target_position is the stream, and the position is initialized to zero. I would like to have a flow of position. I tried something like this (in rx pseudo code)

moves = Subject()
position = moves.scan(sum,0)
target_position.combine_latest(position,diff).subscribe( moves.on_next)

This works, but I read that using Subject should be avoided. Is it possible to calculate the position flow without an object?

In python, the full implementation looks like this:

from pprint import pprint 
from rx.subjects import Subject

target_position = Subject()

moves = Subject()

position = moves.scan(lambda x,y: x+y,0.0)

target_position\
    .combine_latest(position,compute_next_move)\
    .filter(lambda x: abs(x)>0)\
    .subscribe( moves.on_next)

position.subscribe( lambda x: pprint("position is now %s"%x))

moves.on_next(0.0)
target_position.on_next(2.0)
target_position.on_next(3.0)
target_position.on_next(4.0)
+4
source share
1 answer

You can use the expand operator

targetPosition.combineLatest(position, (target, current) => [target, current])
  .expand(([target, current]) => {
    // if you've reached your target, stop
    if(target === current) {
      return Observable.empty()
    }
    // otherwise, calculate the new position, emit it
    // and pump it back into `expand`
    let newPosition = calcPosition(target, current);
    return Observable.just(newPosition)
  })
  .subscribe(updateThings);
+1
source

Source: https://habr.com/ru/post/1608620/


All Articles