Configure and access Flink Queryable State (NullPointerException)

I use Flink v1.4.0and I created two different jobs. The first is a pipeline that consumes data from a Kafka theme and stores it in Queryable State (QS). Data is bound by date. The second sends the request to the QS job and processes the returned data.

Both jobs worked great with Flink v.1.3.2. But with the new update, everything broke. Here is a piece of code for the first job:

private void runPipeline() throws Exception {
    StreamExecutionEnvironment env = configurationEnvironment();

    QueryableStateStream<String, DataBucket> dataByDate = env.addSource(sourceDataFromKafka())
        .map(NewDataClass::new)
        .keyBy(data.date)
        .asQueryableState("QSName", reduceIntoSingleDataBucket());
}

and here is the client side code:

QueryableStateClient client = new QueryableStateClient("localhost", 6123);

// the state descriptor of the state to be fetched.
ValueStateDescriptor<DataBucket> descriptor = new ValueStateDescriptor<>(
    "QSName",
    TypeInformation.of(new TypeHint<DataBucket>() {}));

jobId = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
String key = "2017-01-06";

CompletableFuture<ValueState<DataBucket> resultFuture = client.getKvState(
    jobId, 
    "QSName", 
    key, 
    BasicTypeInfo.STRING_TYPE_INFO, 
    descriptor);

try {

    ValueState<DataBucket>  valueState = resultFuture.get();
    DataBucket bucket = valueState.value();          
    System.out.println(bucket.getLabel());

} catch (IOException | InterruptionException | ExecutionException e) {
    throw new RunTimeException("Unable to query bucket key: " + key , e);
}

I followed the instructions at the following link: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html

Flink, flink-queryable-state-runtime_2.11-1.4.0.jar opt/ Flink lib/ , .

:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:84)
    at org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:253)
    at org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:210)
    at org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:174)
    at com.company.dept.query.QuerySubmitter.main(QuerySubmitter.java:37)

, ? , QS ... , - . .

+4
1

, , 2 , . -, descriptor . , , QS TypeHint, , keySerialiser :

ValueStateDescriptor<DataBucket> descriptor = new ValueStateDescriptor<>(
    "QSName",
    TypeInformation.of(new TypeHint<DataBucket>() {}).createSerializer(new ExecutionConfig()),
    DataBucket.emptyBucket());    // or anything that can be used as a default value

. v1.3.2, 9069, localhost . : Started the Queryable State Proxy Server @ ....

, , - , (FLINK-7788) : https://issues.apache.org/jira/browse/FLINK-7788.

+3

Source: https://habr.com/ru/post/1692211/


All Articles