How to read metadata in gRPC using Java on the client side

I use the Java compiler and Protoc 3.0, and my proto file is mentioned below. https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang

syntax = "proto3"; package Telemetry; // Interface exported by Agent service OpenConfigTelemetry { // Request an inline subscription for data at the specified path. // The device should send telemetry data back on the same // connection as the subscription request. rpc telemetrySubscribe(SubscriptionRequest) returns (stream OpenConfigData) {} // Terminates and removes an exisiting telemetry subscription rpc cancelTelemetrySubscription(CancelSubscriptionRequest) returns (CancelSubscriptionReply) {} // Get the list of current telemetry subscriptions from the // target. This command returns a list of existing subscriptions // not including those that are established via configuration. rpc getTelemetrySubscriptions(GetSubscriptionsRequest) returns (GetSubscriptionsReply) {} // Get Telemetry Agent Operational States rpc getTelemetryOperationalState(GetOperationalStateRequest) returns (GetOperationalStateReply) {} // Return the set of data encodings supported by the device for // telemetry data rpc getDataEncodings(DataEncodingRequest) returns (DataEncodingReply) {} } // Message sent for a telemetry subscription request message SubscriptionRequest { // Data associated with a telemetry subscription SubscriptionInput input = 1; // List of data models paths and filters // which are used in a telemetry operation. repeated Path path_list = 2; // The below configuration is not defined in Openconfig RPC. // It is a proposed extension to configure additional // subscription request features. SubscriptionAdditionalConfig additional_config = 3; } // Data associated with a telemetry subscription message SubscriptionInput { // List of optional collector endpoints to send data for // this subscription. // If no collector destinations are specified, the collector // destination is assumed to be the requester on the rpc channel. repeated Collector collector_list = 1; } // Collector endpoints to send data specified as an ip+port combination. message Collector { // IP address of collector endpoint string address = 1; // Transport protocol port number for the collector destination. uint32 port = 2; } // Data model path message Path { // Data model path of interest // Path specification for elements of OpenConfig data models string path = 1; // Regular expression to be used in filtering state leaves string filter = 2; // If this is set to true, the target device will only send // updates to the collector upon a change in data value bool suppress_unchanged = 3; // Maximum time in ms the target device may go without sending // a message to the collector. If this time expires with // suppress-unchanged set, the target device must send an update // message regardless if the data values have changed. uint32 max_silent_interval = 4; // Time in ms between collection and transmission of the // specified data to the collector platform. The target device // will sample the corresponding data (eg,. a counter) and // immediately send to the collector destination. // // If sample-frequency is set to 0, then the network device // must emit an update upon every datum change. uint32 sample_frequency = 5; } // Configure subscription request additional features. message SubscriptionAdditionalConfig { // limit the number of records sent in the stream int32 limit_records = 1; // limit the time the stream remains open int32 limit_time_seconds = 2; } // Reply to inline subscription for data at the specified path is done in // two-folds. // 1. Reply data message sent out using out-of-band channel. // 2. Telemetry data send back on the same connection as the // subscription request. // 1. Reply data message sent out using out-of-band channel. message SubscriptionReply { // Response message to a telemetry subscription creation or // get request. SubscriptionResponse response = 1; // List of data models paths and filters // which are used in a telemetry operation. repeated Path path_list = 2; } // Response message to a telemetry subscription creation or get request. message SubscriptionResponse { // Unique id for the subscription on the device. This is // generated by the device and returned in a subscription // request or when listing existing subscriptions uint32 subscription_id = 1; } // 2. Telemetry data send back on the same connection as the // subscription request. message OpenConfigData { // router name:export IP address string system_id = 1; // line card / RE (slot number) uint32 component_id = 2; // PFE (if applicable) uint32 sub_component_id = 3; // Path specification for elements of OpenConfig data models string path = 4; // Sequence number, monotonically increasing for each // system_id, component_id, sub_component_id + path. uint64 sequence_number = 5; // timestamp (milliseconds since epoch) uint64 timestamp = 6; // List of key-value pairs repeated KeyValue kv = 7; } // Simple Key-value, where value could be one of scalar types message KeyValue { // Key string key = 1; // One of possible values oneof value { double double_value = 5; int64 int_value = 6; uint64 uint_value = 7; sint64 sint_value = 8; bool bool_value = 9; string str_value = 10; bytes bytes_value = 11; } } // Message sent for a telemetry subscription cancellation request message CancelSubscriptionRequest { // Subscription identifier as returned by the device when // subscription was requested uint32 subscription_id = 1; } // Reply to telemetry subscription cancellation request message CancelSubscriptionReply { // Return code ReturnCode code = 1; // Return code string string code_str = 2; }; // Result of the operation enum ReturnCode { SUCCESS = 0; NO_SUBSCRIPTION_ENTRY = 1; UNKNOWN_ERROR = 2; } // Message sent for a telemetry get request message GetSubscriptionsRequest { // Subscription identifier as returned by the device when // subscription was requested // --- or --- // 0xFFFFFFFF for all subscription identifiers uint32 subscription_id = 1; } // Reply to telemetry subscription get request message GetSubscriptionsReply { // List of current telemetry subscriptions repeated SubscriptionReply subscription_list = 1; } // Message sent for telemetry agent operational states request message GetOperationalStateRequest { // Per-subscription_id level operational state can be requested. // // Subscription identifier as returned by the device when // subscription was requested // --- or --- // 0xFFFFFFFF for all subscription identifiers including agent-level // operational stats // --- or --- // If subscription_id is not present then sent only agent-level // operational stats uint32 subscription_id = 1; // Control verbosity of the output VerbosityLevel verbosity = 2; } // Verbosity Level enum VerbosityLevel { DETAIL = 0; TERSE = 1; BRIEF = 2; } // Reply to telemetry agent operational states request message GetOperationalStateReply { // List of key-value pairs where // key = operational state definition // value = operational state value repeated KeyValue kv = 1; } // Message sent for a data encoding request message DataEncodingRequest { } // Reply to data encodings supported request message DataEncodingReply { repeated EncodingType encoding_list = 1; } // Encoding Type Supported enum EncodingType { UNDEFINED = 0; XML = 1; JSON_IETF = 2; PROTO3 = 3; } 

To make a service call (rpc TelemetrySubscribe), I first need to read the header that has the subscription ID, and then start reading the messages. Now, using Java, I can connect to the service, I introduced the interceptor, but when I print / receive the header, it is zero. My interceptor code is below,

  ClientInterceptor interceptor = new HeaderClientInterceptor(); originChannel = OkHttpChannelBuilder.forAddress(host, port) .usePlaintext(true) .build(); Channel channel = ClientInterceptors.intercept(originChannel, interceptor); telemetryStub = OpenConfigTelemetryGrpc.newStub(channel); 

This is interceptor code for reading metadata.

  @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { @Override public void start(Listener<RespT> responseListener, Metadata headers) { super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) { @Override public void onHeaders(Metadata headers) { Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY", Metadata.ASCII_STRING_MARSHALLER); System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY)); 

I wonder if there is another way to read metadata or the first message that has a subscription identifier? All I need is to read the first message that has a subscription identifier and return the same subscription identifier to the server in order to start streaming, I have equivalent Python code using the same proto file and it communicates with the server via the code below is for reference only:

  sub_req = SubscribeRequestMsg("host",port) data_itr = stub.telemetrySubscribe(sub_req, _TIMEOUT_SECONDS) metadata = data_itr.initial_metadata() if metadata[0][0] == "responseKey": metainfo = metadata[0][1] print metainfo subreply = agent_pb2.SubscriptionReply() subreply.SetInParent() google.protobuf.text_format.Merge(metainfo, subreply) if subreply.response.subscription_id: SUB_ID = subreply.response.subscription_id 

From the above Python code, I can easily get the metadata object, not sure how to get the same using Java?

After reading metaData, all I get is: Metadata({content-type=[application/grpc], grpc-encoding=[identity], grpc-accept-encoding=[identity,deflate,gzip]})

But I know that there is another line of metadata that

 response { subscription_id: 2 } 

How can I extract the last response from a header that has a subscription id. I tried many options and I got lost here.

+5
source share
2 answers

The method you use is for request metadata, not response metadata:

 public void start(Listener<RespT> responseListener, Metadata headers) { 

For the response metadata, you need ClientCall.Listener and wait for the onHeaders callback:

 public void onHeaders(Metadata headers) 

It seems to me that using the mentioned metadata seems strange. Metadata usually contains additional error information or cross-cutting functions that are not specific to the RPC method (e.g. auth, tracing, etc.).

+4
source

Often using ClientInterceptor is inconvenient because you need to maintain a link to it in order to pull data. In your case, the data is actually metadata. One way to facilitate access to metadata is to place it inside a Context .

For example, you can create Context.Key for a subscription identifier. In your client interceptor, you can extract the Metadata header you want and put it in Context using Context.current().withValue(key, metadata) . Inside StreamObserver you can extract this by pressing key.get(Context.current()) . It is assumed that you are using the Async API, not the locking API.

The reason this is more complicated is because metadata is usually information about the call, but is not directly related to the call itself. This applies to things like tracing, coding, statistics, cancellation, and the like. If something changes the way the request is processed, he probably needs to go directly to the request itself, and not be on the side.

+4
source

Source: https://habr.com/ru/post/1266874/


All Articles