Observed TcpListener ends after a single connection

I'm new to Rx, so I'm probably making some important mistakes here.

I wanted to create a very simple socket server that could receive messages from clients using Observables. To do this, I use Rxx, which provides extension methods in the System.Net.Sockets namespace, and also provides a static factory class ObserableTcpListener.

Here is what I have stolen quite a bit from various sources so far:

IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9001);
TcpListener listener = new TcpListener(endpoint);

IObservable<TcpClient> clients = listener
    .StartSocketObservable(1)
    .SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket));
    .Finally(listener.Stop)

clients.Subscribe(client =>
{
    OnConnect(client).Subscribe(
        message => OnMessage(client, message),
        ex => OnException(client, ex),
        () => OnCompleted(client));
});

private static IObservable<TcpClient> SocketToTcpClient(Socket socket)
{
    TcpClient client = new TcpClient();
    client.Client = socket;
    return Observable.Return<TcpClient>(client);
}

private static IObservable<byte[]> OnConnect(TcpClient client)
{
    return client.Client.ReceiveUntilCompleted(SocketFlags.None);
}

private static void OnMessage(TcpClient client, byte[] message)
{
    Console.WriteLine("Mesage Received! - {0}", Encoding.UTF8.GetString(message));
}

private static void OnCompleted(TcpClient client)
{
    Console.WriteLine("Completed.");
}

private static void OnException(TcpClient client, Exception ex)
{
    Console.WriteLine("Exception: {0}", ex.ToString());
}

It works ... to the point. I can create one client connection. As soon as this connection completes, it seems that the Observable sequence completes and is called .Finally(listener.Stop). Obviously, this is not what I want.

ObserableTcpListener.Start() factory, .

IObservable<TcpClient> sockets = ObservableTcpListener.Start(endpoint);
sockets.Subscribe(client =>
{
    OnConnect(client).Subscribe(
        message => OnMessage(client, message),
        ex => OnException(client, ex),
        () => OnCompleted(client));
});

, : clients , .Finally(listener.Stop).

, ? ?

+4
1

Observable , .

IObservable<TcpClient> clients = listener
    .StartSocketObservable(1)
    .SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket))
    .Finally(listener.Stop)
    .Publish().RefCount();
+4

Source: https://habr.com/ru/post/1625646/


All Articles