Ideal way to enrich KStream with search data

In my stream there is a column called “category” and I have additional static metadata for each category in another store, it is updated once every couple of days. What is the right way to do this search? There are two options with Kafka streams.

  • Load static data outside of Kafka streams and just use KStreams#map() to add metadata. This is possible because Kafka streams are just a library.

  • Upload the metadata to the Kafka theme, upload it to KTable and execute KStreams#leftJoin() , this seems more natural and leaves separation, etc. for Kafka threads. However, this requires KTable load with all values. Please note that we will have to download all the search data, not just the changes.

    • For example, let's say, at first there was only one category 'c1'. The Kafka streams app was stopped gracefully and restarted again. After the reboot, a new category 'c2' was added. My guess is that table = KStreamBuilder (). Table ('metadataTopic') will have a value of "c2" because this is the only thing that has changed since the application started a second time. I would like for him to have c1 and c2.
    • If it has 'c1', will the data be deleted from KTable (perhaps by setting the key sending = null message?)?

Which of the above methods is the right way to find metadata?

Is it possible to always force only one thread to be read from the beginning at reboot, so that all metadata can be loaded into KTable .

Is there any other way to use stores?

+6
source share
3 answers
  • Load static data outside of Kafka streams and just use KStreams # map () to add metadata. This is possible because Kafka Streams is just a library.

It works. But usually people choose the next parameter that you specified, since the side data to enrich the input stream is usually not completely static; rather, it changes, but somewhat infrequently:

  1. Upload the metadata to the Kafka theme, upload it to KTable and make KStreams # leftJoin (), this seems more natural and leaves sections, etc. for Kafka threads. However, this requires us that KTable boot with all values. Please note that we will have to download all the search data, not just the changes.

This is a common approach, and I would recommend sticking with it unless you have a specific reason.

However, this requires us that KTable boot with all values. Please note that we will have to download all the search data, not just the changes.

So, I think you also prefer the second option, but it bothers you whether it is effective.

Short answer: Yes, KTable will be loaded with all (last) values ​​for each key. The table will contain all the search data, but keep in mind that KTable is split backstage: if, for example, your input topic (for the table) has sections 3 , you can run up to 3 instances of your application, each of which receives section 1 table (provided that the data is distributed evenly across the sections, then each section / general access to the table will contain about 1/3 of the table data). Therefore, in practice, it is more likely that this "just works."

Is it possible to always force only one thread to be read from the beginning at reboot, so that all metadata can be loaded into KTable.

You do not need to worry about it. Simply put, if the local "copy" of the table is not available, then the Streams API automatically ensures that the data in the table is read completely from scratch. If there is a local copy available, your application will reuse that copy (and update its local copy when new data is available in the table entry topic).

Longer answer with examples

Imagine the following input (think: changelog stream) for your KTable , notice how this input consists of 6 messages:

 (alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22) 

And here the various states of the “logical” KTable that would arise from this input will be when each newly received input message (for example, (alice, 1) ) leads to a new state of the table:

 Key Value -------------- alice | 1 // (alice, 1) received | V Key Value -------------- alice | 1 bob | 40 // (bob, 40) received | V Key Value -------------- alice | 2 // (alice, 2) received bob | 40 | V Key Value -------------- alice | 2 bob | 40 charlie | 600 // (charlie, 600) received | V Key Value -------------- alice | 5 // (alice, 5) received bob | 40 charlie | 600 | V Key Value -------------- alice | 5 bob | 22 // (bob, 22) received charlie | 600 

Here you can see that although the input can have many, many messages (or “changes”, as you said, we have 6 ), the number of records / lines in the received KTable (which undergoes continuous mutations based on newly accepted input) - the number of unique input keys (here: start from 1 , increase to 3 ), which is usually much less than the number of messages. So, if the number of messages at the input N and the number of unique keys for these messages is M , then usually M << N ( M much less than N ; plus for the record, we have the invariant M <= N ).

This is the first reason that “it requires us to store a KTable loaded with all the values” is usually not a problem, since only the last value is stored for each key.

The second reason this helps is because, as Matthias J. Sachs points out, Kafka Streams uses RocksDB as the default storage mechanism for such tables (more precisely: state stores the table back). RocksDB allows you to maintain tables that are larger than the available main memory / Java memory space of your application, since it can spill onto a local drive.

Finally, the third reason is that a KTable is split. So, if your input tag for the table (for example) is configured with sections 3 , then what happens behind the scenes is that KTable itself KTable divided (I think: fined) in the same way. In the above example, here is what you could get, although the exact “splits” depend on how the original input spreads over sections of the table entry topic:

Logical KTable (last state of what I showed above):

 Key Value -------------- alice | 5 bob | 22 charlie | 600 

Actual KTable partitioned (assuming 3 sections for the table entry topic, plus keys = usernames are distributed evenly across the sections):

 Key Value -------------- alice | 5 // Assuming that all data for `alice` is in partition 1 Key Value -------------- bob | 22 // ...for `bob` is in partition 2 Key Value -------------- charlie | 600 // ...for `charlie` is in partition 3 

In practice, this separation of input data - among other things - allows the "size" of the actual manifestations of KTable.

Another example:

  • Imagine that the last state of your KTable is usually 1 TB (again, the approximate size depends on the number of unique message keys in the table input multiplied by the average size of the associated message value).
  • If there is only section 1 in the table entry topic, then KTable itself also has only 1 section with a size of 1 TB. Here, since the input topic has section 1 , you can run the application with up to 1 instances of the application (so there isn’t a lot of parallelism, heh).
  • If there are 500 partitions in the table entry topic, then KTable has 500 partitions with a size of ~ 2 GB each (provided that the data is distributed across different partitions). Here you can run the application up to 500 application instances. If you were to execute exactly 500 instances, then each instance of the application would receive exactly 1 partition / shard of logical KTable, thus ending with 2 GB of table data; if you were to run only instances of 100 , then each instance would receive 500 / 100 = 5 partitions / shards of the table, resulting in about 2 GB * 5 = 10 GB of table data.
+9
source

Your overall observation is correct, and it depends on which tradeoffs are more important to you. If your metadata is small, option 1 seems best. If the metadata is large, option 2 seems to be the way to go.

If you use map() , you need to have a full copy of your metadata in each instance of the application (since you cannot know exactly how the streams will split the KStream data for KStream ). Thus, if your metadata does not fit into main memory using map() , this will not work.

If you use KTable , the streams will ensure that the metadata is correctly distributed across all running application instances, so duplication of data is not required. In addition, KTable uses RocksDB as a data storage engine and therefore can spill onto disk.

BEGINNING OF WORK

On the availability of all data in KTable : if you have two categories for the same key, the second value will overwrite the first value if you read the data directly from the topic in KTable via builder.table(...) (changelog semantics). However, you can easily get around this by reading the topic as a write stream (i.e. builder.stream(...) ) and apply aggregation to compute KTable . Aggregation will simply list all the values ​​for each key.

About removal: KTable uses the semantics of the change and understands the message with the label to remove key-value pairs. Thus, if you read KTable from the topic, and the text contains the message <key:null> , the current record in KTable with this key will be deleted. This is more difficult to achieve when KTable is the result of aggregation, because the aggregation input record with the key null or null will simply be ignored and will not update the aggregation result.

A workaround would be to add the map() step before aggregation and enter a null value (that is, a custom “object” that represents the tombstone, but not null ), in your case you could call it null-category ). In your aggregation, you simply return null as the result of aggegation if the input record has null-category as the value. This then converts to a headstone message for your KTable and deletes the current category list for this key.

EDIT END

And, of course, you can always create your own solution through the Processor API. However, if DSL can give you the information you need, there is no good reason for this.

+4
source

Let's say the stream data is: (alice, 1) → (bob, 40) → (charlie, 600) and have 3 sections.

Suppose these tables are the same as below and with the same three sections:

 Key Value -------------- alice | 5 bob | 40 charlie | 600 

Now, if we have 3 consumers in the consumer group, is there a way to guarantee that the records (stream and table) with "Alice" will ultimately get the same consumer for the search?

0
source

Source: https://habr.com/ru/post/1012986/


All Articles