I am trying to create an Observable where each element is created through an asynchronous task. The next element should be created using an asynchronous call based on the result of the previous element (core-recursion). In the "Create" expression, it would look something like this: except for Generate, it does not support async (and also does not support the delegate in its original state.
var ob = Observable.Generate(
async () => await ProduceFirst(),
prev => Continue(prev)
async prev => await ProduceNext(prev)
item => item
);
As a more specific example, to view all messages from the ServiceBus queue, selecting them 100 messages at a time, run ProduceFirst, Continue, and ProduceNext as follows:
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
const int batchSize = 100;
return _serviceBusReceiver.PeekBatchAsync(batchSize);
}
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
const int batchSize = 100;
return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}
And then call .SelectMany(i => i)on IObservable<IEnumerable<BrokeredMessage>>to turn it intoIObservable<BrokeredMessage>
Where _serviceBusReceiver is an instance of the interface as follows:
public interface IServiceBusReceiver
{
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}
BrokeredMessage https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx