Using the following buffer code:
syntax = "proto3";
package pb;
message SimpleRequest {
int64 number = 1;
}
message SimpleResponse {
int64 doubled = 1;
}
service Test {
rpc Downstream(SimpleRequest) returns (stream SimpleResponse);
}
I can successfully open the stream and constantly receive the next double number from the server.
My code to run is as follows:
ctxDownstream, cancel := context.WithCancel(ctx)
downstream, err := testClient.Downstream(ctxDownstream, &pb.SimpleRequest{Number: 1})
for {
responseDownstream, err := downstream.Recv()
if err != io.EOF {
println(fmt.Sprintf("downstream response: %d, error: %v", responseDownstream.Doubled, err))
if responseDownstream.Doubled >= 32 {
break
}
}
}
cancel() // !!This is not a graceful shutdown
println(fmt.Sprintf("%v", downstream.Trailer()))
The problem I am facing is using context cancellation, meaning my downstream.Trailer () request is empty. Is there a way to gracefully close this connection from the client side and get downstream.Trailer ().
Note: if I close the server downlink, my trailers will be full. But I can not instruct my server side to close this particular thread. Thus, there should be a way to gracefully close the client side of the stream.
Thank.
As requested server code:
func (b *binding) Downstream(req *pb.SimpleRequest, stream pb.Test_DownstreamServer) error {
request := req
r := make(chan *pb.SimpleResponse)
e := make(chan error)
ticker := time.NewTicker(200 * time.Millisecond)
defer func() { ticker.Stop(); close(r); close(e) }()
go func() {
defer func() { recover() }()
for {
select {
case <-ticker.C:
response, err := b.Endpoint(stream.Context(), request)
if err != nil {
e <- err
}
r <- response
}
}
}()
for {
select {
case err := <-e:
return err
case response := <-r:
if err := stream.Send(response); err != nil {
return err
}
request.Number = response.Doubled
case <-stream.Context().Done():
return nil
}
}
}
. grpc.StreamServerInterceptor .