How to wait for an object inside an IObservable with a specific property value?

To clarify, I have a method:

public static IObservable<Node> GetNodes() { var computers = GetComputersInLan(); return computers.Select(computerAddress => GetNode(computerAddress)); } 

The GetComputersInLan method returns IObservable IPAddress

 private static IObservable<IPAddress> GetComputersInLan() { var tasks = new List<Task<PingReply>>(); for (int i = 1; i < 255; i++) { Ping p = new Ping(); ipBytes[3] = (byte)(++ipBytes[3]); IPAddress address = new IPAddress(ipBytes); tasks.Add(p.SendPingAsync(address, 2000)); } return tasks.ToObservable().Where(x => x.Result.Status == IPStatus.Success).Select(y => y.Result.Address); } 

The GetNode method creates a Node.

 private static Node GetNode(IPAddress ipAddress) { return new Node(ipAddress, (IHandler)Activator.GetObject(typeof(Handler), "tcp://" + ipAddress + ":1337/handler")); } public class Node { private IHandler Handler { get; set; } public IPAddress Address { get; set; } public int AvailableCores { get; set; } public async Task<TResult> Invoke<TResult>(Func<TResult> method) { AvailableCores--; var result = await Task.Run<TResult>(() => Handler.Invoke(method)); AvailableCores++; return result; } } 

A handler is a remote computer, and AvailableCores represents its processor core.

What I want is to wait for the GetNodes method to return the first Node that has more than 0 codes available.

 await GetNodes().FirstAsync(node => node.AvailableCore > 0) 

But what happens is that after a sufficient number of Invoke calls, instead of waiting for the kernels to appear, it raises the exception "sequence does not contain elements".

+4
source share
1 answer

This is the expected behavior for this method. FirstAsync will only check the current state of the items you pass in, either return the first match or throw an exception that you encounter if there is no match.

You will have to manage the case of waiting until the kernel becomes available to yourself. You can try FirstOrDefaultAsync to return null instead of throwing an exception when all the cores are busy. From there, you will need some kind of circuitry to detect when the kernel becomes available for the next block of work, whether it's an event or a poll.

+1
source

Source: https://habr.com/ru/post/1379300/


All Articles