- Load static data outside of Kafka streams and just use KStreams # map () to add metadata. This is possible because Kafka Streams is just a library.
It works. But usually people choose the next parameter that you specified, since the side data to enrich the input stream is usually not completely static; rather, it changes, but somewhat infrequently:
- Upload the metadata to the Kafka theme, upload it to KTable and make KStreams # leftJoin (), this seems more natural and leaves sections, etc. for Kafka threads. However, this requires us that KTable boot with all values. Please note that we will have to download all the search data, not just the changes.
This is a common approach, and I would recommend sticking with it unless you have a specific reason.
However, this requires us that KTable boot with all values. Please note that we will have to download all the search data, not just the changes.
So, I think you also prefer the second option, but it bothers you whether it is effective.
Short answer: Yes, KTable will be loaded with all (last) values ​​for each key. The table will contain all the search data, but keep in mind that KTable is split backstage: if, for example, your input topic (for the table) has sections 3
, you can run up to 3
instances of your application, each of which receives section 1
table (provided that the data is distributed evenly across the sections, then each section / general access to the table will contain about 1/3 of the table data). Therefore, in practice, it is more likely that this "just works."
Is it possible to always force only one thread to be read from the beginning at reboot, so that all metadata can be loaded into KTable.
You do not need to worry about it. Simply put, if the local "copy" of the table is not available, then the Streams API automatically ensures that the data in the table is read completely from scratch. If there is a local copy available, your application will reuse that copy (and update its local copy when new data is available in the table entry topic).
Longer answer with examples
Imagine the following input (think: changelog stream) for your KTable
, notice how this input consists of 6
messages:
(alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22)
And here the various states of the “logical” KTable
that would arise from this input will be when each newly received input message (for example, (alice, 1)
) leads to a new state of the table:
Key Value -------------- alice | 1 // (alice, 1) received | V Key Value -------------- alice | 1 bob | 40 // (bob, 40) received | V Key Value -------------- alice | 2 // (alice, 2) received bob | 40 | V Key Value -------------- alice | 2 bob | 40 charlie | 600 // (charlie, 600) received | V Key Value -------------- alice | 5 // (alice, 5) received bob | 40 charlie | 600 | V Key Value -------------- alice | 5 bob | 22 // (bob, 22) received charlie | 600
Here you can see that although the input can have many, many messages (or “changes”, as you said, we have 6
), the number of records / lines in the received KTable
(which undergoes continuous mutations based on newly accepted input) - the number of unique input keys (here: start from 1
, increase to 3
), which is usually much less than the number of messages. So, if the number of messages at the input N
and the number of unique keys for these messages is M
, then usually M << N
( M
much less than N
; plus for the record, we have the invariant M <= N
).
This is the first reason that “it requires us to store a KTable loaded with all the values” is usually not a problem, since only the last value is stored for each key.
The second reason this helps is because, as Matthias J. Sachs points out, Kafka Streams uses RocksDB as the default storage mechanism for such tables (more precisely: state stores the table back). RocksDB allows you to maintain tables that are larger than the available main memory / Java memory space of your application, since it can spill onto a local drive.
Finally, the third reason is that a KTable
is split. So, if your input tag for the table (for example) is configured with sections 3
, then what happens behind the scenes is that KTable
itself KTable
divided (I think: fined) in the same way. In the above example, here is what you could get, although the exact “splits” depend on how the original input spreads over sections of the table entry topic:
Logical KTable (last state of what I showed above):
Key Value -------------- alice | 5 bob | 22 charlie | 600
Actual KTable partitioned (assuming 3
sections for the table entry topic, plus keys = usernames are distributed evenly across the sections):
Key Value -------------- alice | 5 // Assuming that all data for `alice` is in partition 1 Key Value -------------- bob | 22 // ...for `bob` is in partition 2 Key Value -------------- charlie | 600 // ...for `charlie` is in partition 3
In practice, this separation of input data - among other things - allows the "size" of the actual manifestations of KTable.
Another example:
- Imagine that the last state of your KTable is usually 1 TB (again, the approximate size depends on the number of unique message keys in the table input multiplied by the average size of the associated message value).
- If there is only section
1
in the table entry topic, then KTable itself also has only 1
section with a size of 1 TB. Here, since the input topic has section 1
, you can run the application with up to 1
instances of the application (so there isn’t a lot of parallelism, heh). - If there are
500
partitions in the table entry topic, then KTable has 500
partitions with a size of ~ 2 GB each (provided that the data is distributed across different partitions). Here you can run the application up to 500
application instances. If you were to execute exactly 500
instances, then each instance of the application would receive exactly 1
partition / shard of logical KTable, thus ending with 2 GB of table data; if you were to run only instances of 100
, then each instance would receive 500 / 100 = 5
partitions / shards of the table, resulting in about 2 GB * 5 = 10 GB
of table data.