Fast Repeat TakeWhile causes an infinite loop

How can I do the next observable repeat until stream.DataAvailable becomes false? Currently, it seems like he never stops.

AsyncReadChunk and Observable.Return inside the Defer section, make an OnNext call, and then OnCompleted. When Repeat receives an OnNext call, it passes it to TakeWhile. When TakeWhile fails, it completes the observable, but I think that OnCompleted, which appears immediately after OnNext is so fast that it causes Repeat to re-subscribe to the observable and causes an infinite loop.

How can I fix this behavior?

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize) { return Observable.Defer(() => { try { return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]); } catch (Exception) { return Observable.Return(new byte[0]); } }) .Repeat() .TakeWhile((dataChunk, index) => dataChunk.Length > 0); } 
+6
source share
1 answer

MOST ANSWER: (Below is the answer that Samte wrote, the author of the question. However, he sent the answer as part of the question. I will go to a separate answer, marking it as a wiki community, since the author did not move it himself.)


I found refactoring that this is a problem with planners. Return uses an Immediate scheduler, and Repeat uses CurrentThread. The fixed code is below.

  public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize) { return Observable.Defer(() => { try { return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0], Scheduler.CurrentThread); } catch (Exception) { return Observable.Return(new byte[0], Scheduler.CurrentThread); } }) .Repeat() .TakeWhile((dataChunk, index) => dataChunk.Length > 0); } 
+2
source

Source: https://habr.com/ru/post/902477/


All Articles