Graceful gRPC closure downstream

Using the following buffer code:

syntax = "proto3";

package pb;

message SimpleRequest {
    int64 number = 1;
}

message SimpleResponse {
    int64 doubled = 1;
}

// All the calls in this serivce preform the action of doubling a number.
// The streams will continuously send the next double, eg. 1, 2, 4, 8, 16.
service Test {
    // This RPC streams from the server only.
    rpc Downstream(SimpleRequest) returns (stream SimpleResponse);
}

I can successfully open the stream and constantly receive the next double number from the server.

My code to run is as follows:

ctxDownstream, cancel := context.WithCancel(ctx)
downstream, err := testClient.Downstream(ctxDownstream, &pb.SimpleRequest{Number: 1})
for {
    responseDownstream, err := downstream.Recv()
    if err != io.EOF {
        println(fmt.Sprintf("downstream response: %d, error: %v", responseDownstream.Doubled, err))

        if responseDownstream.Doubled >= 32 {
            break
        }
    }
}
cancel() // !!This is not a graceful shutdown
println(fmt.Sprintf("%v", downstream.Trailer()))

The problem I am facing is using context cancellation, meaning my downstream.Trailer () request is empty. Is there a way to gracefully close this connection from the client side and get downstream.Trailer ().

Note: if I close the server downlink, my trailers will be full. But I can not instruct my server side to close this particular thread. Thus, there should be a way to gracefully close the client side of the stream.

Thank.

As requested server code:

func (b *binding) Downstream(req *pb.SimpleRequest, stream pb.Test_DownstreamServer) error {
    request := req

    r := make(chan *pb.SimpleResponse)
    e := make(chan error)
    ticker := time.NewTicker(200 * time.Millisecond)
    defer func() { ticker.Stop(); close(r); close(e) }()

    go func() {
        defer func() { recover() }()
        for {
            select {
            case <-ticker.C:
                response, err := b.Endpoint(stream.Context(), request)
                if err != nil {
                    e <- err
                }
                r <- response
            }
        }
    }()

    for {
        select {
        case err := <-e:
            return err
        case response := <-r:
            if err := stream.Send(response); err != nil {
                return err
            }
            request.Number = response.Doubled
        case <-stream.Context().Done():
            return nil
        }
    }
}

. grpc.StreamServerInterceptor .

+4
2

grpc go

, . . CloseAndRecv  stream.Recv non-nil ( io.EOF).

, , -

ctxDownstream, cancel := context.WithCancel(ctx)
defer cancel()
for {
  ...
  // on error or EOF
  break;
}
println(fmt.Sprintf("%v", downstream.Trailer()))

. cancel , .

0

, , .

grpc-status , (.. ). , .

, gRPC rpc, .

; , - :

message SimpleRequest {
    oneof RequestType {
        int64 number = 1;
        bool cancel = 2;
    }
}
....
rpc Downstream(stream SimpleRequest) returns (stream SimpleResponse);

.

0

Source: https://habr.com/ru/post/1694530/


All Articles