I use the Java compiler and Protoc 3.0, and my proto file is mentioned below. https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang
syntax = "proto3"; package Telemetry; // Interface exported by Agent service OpenConfigTelemetry { // Request an inline subscription for data at the specified path. // The device should send telemetry data back on the same // connection as the subscription request. rpc telemetrySubscribe(SubscriptionRequest) returns (stream OpenConfigData) {} // Terminates and removes an exisiting telemetry subscription rpc cancelTelemetrySubscription(CancelSubscriptionRequest) returns (CancelSubscriptionReply) {} // Get the list of current telemetry subscriptions from the // target. This command returns a list of existing subscriptions // not including those that are established via configuration. rpc getTelemetrySubscriptions(GetSubscriptionsRequest) returns (GetSubscriptionsReply) {} // Get Telemetry Agent Operational States rpc getTelemetryOperationalState(GetOperationalStateRequest) returns (GetOperationalStateReply) {} // Return the set of data encodings supported by the device for // telemetry data rpc getDataEncodings(DataEncodingRequest) returns (DataEncodingReply) {} } // Message sent for a telemetry subscription request message SubscriptionRequest { // Data associated with a telemetry subscription SubscriptionInput input = 1; // List of data models paths and filters // which are used in a telemetry operation. repeated Path path_list = 2; // The below configuration is not defined in Openconfig RPC. // It is a proposed extension to configure additional // subscription request features. SubscriptionAdditionalConfig additional_config = 3; } // Data associated with a telemetry subscription message SubscriptionInput { // List of optional collector endpoints to send data for // this subscription. // If no collector destinations are specified, the collector // destination is assumed to be the requester on the rpc channel. repeated Collector collector_list = 1; } // Collector endpoints to send data specified as an ip+port combination. message Collector { // IP address of collector endpoint string address = 1; // Transport protocol port number for the collector destination. uint32 port = 2; } // Data model path message Path { // Data model path of interest // Path specification for elements of OpenConfig data models string path = 1; // Regular expression to be used in filtering state leaves string filter = 2; // If this is set to true, the target device will only send // updates to the collector upon a change in data value bool suppress_unchanged = 3; // Maximum time in ms the target device may go without sending // a message to the collector. If this time expires with // suppress-unchanged set, the target device must send an update // message regardless if the data values have changed. uint32 max_silent_interval = 4; // Time in ms between collection and transmission of the // specified data to the collector platform. The target device // will sample the corresponding data (eg,. a counter) and // immediately send to the collector destination. // // If sample-frequency is set to 0, then the network device // must emit an update upon every datum change. uint32 sample_frequency = 5; } // Configure subscription request additional features. message SubscriptionAdditionalConfig { // limit the number of records sent in the stream int32 limit_records = 1; // limit the time the stream remains open int32 limit_time_seconds = 2; } // Reply to inline subscription for data at the specified path is done in // two-folds. // 1. Reply data message sent out using out-of-band channel. // 2. Telemetry data send back on the same connection as the // subscription request. // 1. Reply data message sent out using out-of-band channel. message SubscriptionReply { // Response message to a telemetry subscription creation or // get request. SubscriptionResponse response = 1; // List of data models paths and filters // which are used in a telemetry operation. repeated Path path_list = 2; } // Response message to a telemetry subscription creation or get request. message SubscriptionResponse { // Unique id for the subscription on the device. This is // generated by the device and returned in a subscription // request or when listing existing subscriptions uint32 subscription_id = 1; } // 2. Telemetry data send back on the same connection as the // subscription request. message OpenConfigData { // router name:export IP address string system_id = 1; // line card / RE (slot number) uint32 component_id = 2; // PFE (if applicable) uint32 sub_component_id = 3; // Path specification for elements of OpenConfig data models string path = 4; // Sequence number, monotonically increasing for each // system_id, component_id, sub_component_id + path. uint64 sequence_number = 5; // timestamp (milliseconds since epoch) uint64 timestamp = 6; // List of key-value pairs repeated KeyValue kv = 7; } // Simple Key-value, where value could be one of scalar types message KeyValue { // Key string key = 1; // One of possible values oneof value { double double_value = 5; int64 int_value = 6; uint64 uint_value = 7; sint64 sint_value = 8; bool bool_value = 9; string str_value = 10; bytes bytes_value = 11; } } // Message sent for a telemetry subscription cancellation request message CancelSubscriptionRequest { // Subscription identifier as returned by the device when // subscription was requested uint32 subscription_id = 1; } // Reply to telemetry subscription cancellation request message CancelSubscriptionReply { // Return code ReturnCode code = 1; // Return code string string code_str = 2; }; // Result of the operation enum ReturnCode { SUCCESS = 0; NO_SUBSCRIPTION_ENTRY = 1; UNKNOWN_ERROR = 2; } // Message sent for a telemetry get request message GetSubscriptionsRequest { // Subscription identifier as returned by the device when // subscription was requested // --- or --- // 0xFFFFFFFF for all subscription identifiers uint32 subscription_id = 1; } // Reply to telemetry subscription get request message GetSubscriptionsReply { // List of current telemetry subscriptions repeated SubscriptionReply subscription_list = 1; } // Message sent for telemetry agent operational states request message GetOperationalStateRequest { // Per-subscription_id level operational state can be requested. // // Subscription identifier as returned by the device when // subscription was requested // --- or --- // 0xFFFFFFFF for all subscription identifiers including agent-level // operational stats // --- or --- // If subscription_id is not present then sent only agent-level // operational stats uint32 subscription_id = 1; // Control verbosity of the output VerbosityLevel verbosity = 2; } // Verbosity Level enum VerbosityLevel { DETAIL = 0; TERSE = 1; BRIEF = 2; } // Reply to telemetry agent operational states request message GetOperationalStateReply { // List of key-value pairs where // key = operational state definition // value = operational state value repeated KeyValue kv = 1; } // Message sent for a data encoding request message DataEncodingRequest { } // Reply to data encodings supported request message DataEncodingReply { repeated EncodingType encoding_list = 1; } // Encoding Type Supported enum EncodingType { UNDEFINED = 0; XML = 1; JSON_IETF = 2; PROTO3 = 3; }
To make a service call (rpc TelemetrySubscribe), I first need to read the header that has the subscription ID, and then start reading the messages. Now, using Java, I can connect to the service, I introduced the interceptor, but when I print / receive the header, it is zero. My interceptor code is below,
ClientInterceptor interceptor = new HeaderClientInterceptor(); originChannel = OkHttpChannelBuilder.forAddress(host, port) .usePlaintext(true) .build(); Channel channel = ClientInterceptors.intercept(originChannel, interceptor); telemetryStub = OpenConfigTelemetryGrpc.newStub(channel);
This is interceptor code for reading metadata.
@Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { @Override public void start(Listener<RespT> responseListener, Metadata headers) { super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) { @Override public void onHeaders(Metadata headers) { Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY", Metadata.ASCII_STRING_MARSHALLER); System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY));
I wonder if there is another way to read metadata or the first message that has a subscription identifier? All I need is to read the first message that has a subscription identifier and return the same subscription identifier to the server in order to start streaming, I have equivalent Python code using the same proto file and it communicates with the server via the code below is for reference only:
sub_req = SubscribeRequestMsg("host",port) data_itr = stub.telemetrySubscribe(sub_req, _TIMEOUT_SECONDS) metadata = data_itr.initial_metadata() if metadata[0][0] == "responseKey": metainfo = metadata[0][1] print metainfo subreply = agent_pb2.SubscriptionReply() subreply.SetInParent() google.protobuf.text_format.Merge(metainfo, subreply) if subreply.response.subscription_id: SUB_ID = subreply.response.subscription_id
From the above Python code, I can easily get the metadata object, not sure how to get the same using Java?
After reading metaData, all I get is: Metadata({content-type=[application/grpc], grpc-encoding=[identity], grpc-accept-encoding=[identity,deflate,gzip]})
But I know that there is another line of metadata that
response { subscription_id: 2 }
How can I extract the last response from a header that has a subscription id. I tried many options and I got lost here.