How to customize Kafka themes so that the interconnected entity schema can be used as events in databases such as RDMS and graph

I have a case where I have Information objects containing Element objects. If I save the Information object, it will try to find the previously existing Element objects based on the unique value field, otherwise insert them. Information objects and Element objects cannot be deleted at this time. To add a parent, you need two previous Element objects. I planned to use three topics: CreateElement , CreateInformation , AddParentOfElement for Created Element Event , Created Information Event and Added Parent Event . I understood, since there are no guarantees of the order between topics and between sections that these events, as shown in the figure, can be used in a different order, so the scheme cannot be stored in the RDBMS, for example. I assume identifiers are used to assign Theme sections as usual.

Here is my chart:

enter image description here

Scenario

  • Element with (id = 1) was created by user
  • Information with (id = 1) containing Elements (1,2,3) was created by user
  • Element with (id = 5) was created by user
  • Element parent with (id = 5) was set as Element with (id = 3) user
  • Information with (id = 2) containing Elements (1,3 and 5) was created by user

I am wondering if my theme choices make sense, and I would appreciate any suggestions on how to have events that when processed by consumer database services are idempotent - do not put the system in an incorrect state.

Thanks!

+1
source share
1 answer

After considering this solution: How to implement Event Driven microservice architecture with Spring Kafka cloud stream and database for the service , but are not satisfied with the proposals. I explored Confluent Bottled Water ( https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/ ) and then a more active but similar Debezium ( http: / /debezium.io/ )

ฮ™ decided to follow the Debezian path. Debezium is a plugin that is read directly from the mysql / Postgres binlog and publishes these changes (schema and data) to Kafka.

The example I'm using uses docker, and this is how I set it up for Docker Toolbox (Windows) and Docker (Linux).

 1a) Linux (Docker) sudo docker stop $(sudo docker ps -a -q) \ sudo docker rm -f $(sudo docker ps -a -q) \ sudo docker run -d --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.5 \ sudo docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper \ sudo docker run -d --name kafka -e ADVERTISED_HOST_NAME=<YOUR_IP> -e ZOOKEEPER_CONNECT=<YOUR_IP> --link zookeeper:zookeeper -p 9092:9092 debezium/kafka \ sudo docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=<YOUR_IP> --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect \ sudo docker run -d --net=host -e "PROXY=true" -e ADV_HOST=<YOUR_IP> -e "KAFKA_REST_PROXY_URL=http://<YOUR_IP>:8082" -e "SCHEMAREGISTRY_UI_URL=http://<YOUR_IP>:8081" landoop/kafka-topics-ui \ sudo docker run -p 8082:8082 --name kafka-rest --env ZK_CONNECTION_STRING=<YOUR_IP>:2181 frontporch/kafka-rest:latest 1b) Windows (Docker Toolbox) docker stop $(docker ps -a -q) ; docker rm -f $(docker ps -a -q) ; docker run -d --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.5 ; docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper ; docker run -d --name kafka -e ADVERTISED_HOST_NAME=192.168.99.100 -e ZOOKEEPER_CONNECT=192.168.99.100 --link zookeeper:zookeeper -p 9092:9092 debezium/kafka ; docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=192.168.99.100 --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect ; docker run -d --net=host -e "PROXY=true" -e ADV_HOST=192.168.99.100 -e "KAFKA_REST_PROXY_URL=http://192.168.99.100:8082" -e "SCHEMAREGISTRY_UI_URL=http://192.168.99.100:8081" landoop/kafka-topics-ui ; docker run -p 8082:8082 --name kafka-rest --env ZK_CONNECTION_STRING=192.168.99.100:2181 frontporch/kafka-rest:latest ; 2) connect the databse to the debezium connect send a POST application/json to <YOUR_IP>/connectors (for Linux) or 192.168.99.100:8083/connectors (for Windows Docker Toolbox) with body { "name": "inventory-connector", "config": { "name": "inventory-connector", "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory" } } 

Debezium creates kafka themes, one for each table - by navigating to the landoop / kafka-themes-ui server on port 8000 you can see how the message diagram of useful information looks like below. The important part is payload before and after , which send the old values โ€‹โ€‹and new values โ€‹โ€‹of the corresponding database row. Also op is 'c' to create 'u' for update, etc.

enter image description here

Each consuming Microservice uses spring-cloud kafka binders using these maven dependencies:

 <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Brixton.SR7</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.2.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka-parent</artifactId> <version>1.2.0.RELEASE</version> </dependency> </dependencies> </dependencyManagement> <dependencies> [...] <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> [...] </dependencies> 

Then I have in each of my consuming Spring Cloud Microservices a Listener, which listens to all topics that are of interest to it at once, and delegates each theme event to a dedicated event handler:

 import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.concurrent.CountDownLatch; @Component public class Listener { public final CountDownLatch countDownLatch1 = new CountDownLatch(1); @KafkaListener(id = "listener", topics = { "dbserver1.inventory.entity", "dbserver1.inventory.attribute", "dbserver1.inventory.entity_types" } , group = "group1") public void listen(ConsumerRecord<?, ?> record) { String topic = record.topic(); if (topic.equals("dbserver1.inventory.entity") { // delegate to appropriate handler // EntityEventHandler.handle(record); } else if (...) {} } } 

In my case, I wanted to update the schedule based on the changes that are taking place on the RDBMS side. Of course, the graph database will ultimately match the RDBMS. I was worried that since the topics include changes, for example, in join_tables, as well as in the connected sides of the table, I would not be able to create the corresponding edges and vertices, not knowing that each of the vertices of the edges exists. So I decided to ask debezium gitter ( https://gitter.im/debezium/dev ):

In the discussion below, there are two ways: either create edges and vertices using placeholders for those that have not yet been consumed, or use Kafka threads so that the seam threads return to their original structures, which seems more painful to me than the first way . So I decided to go the first way :)

Mihail Mikhailidis @ zifnab87 Apr 17 11:23 Hi, I managed to integrate Mysql with Debezium Connect and use landoop / themes-ui I can see that the themes are correctly selected and the messages are sent as they should. I saw that for each of the tables there is a theme. for example, join tables are also separate topics. If you can say that I have three tables of order, product and order_product, and I have a service that consumes all three topics .. I could first insert on order_product and then insert order. This can cause a problem if I try to transfer this information to the graph database. I will try to create an edge on the top, which is not there yet. How can I make consumers who consume events speak transactional or least aware of the boundary context .. is there an easy way to listen for these events and then deserialize them to a real Java object so I can push this to the graph database or index search? If not, would you approach this problem? Thanks!

Randall Hauch @rhauch Apr 17 19:19 @ zifnab87 Debezium CDC is purely line-based, so by default all users see line level change events ultimately sequentially. Of course, the task of the possible consistency of downstream systems is that they can potentially leak data about data that never existed in the upstream source. But many other really huge advantages come with this: consumers at a lower level of consumption are much simpler, more resistant to failure, have a lower latency (since there is no need to wait for completed transactions to appear before processing), and are less decoupled from the upstream system. You gave an example of a table of orders and products with order_product to cross the table. I agree that it does not make sense for the order_product relationship to be added before instances of order and product exist. But do you need to live with this restriction? Can the user order_product create placeholder nodes in the graph database for any missing order and / or product values โ€‹โ€‹referenced by the link? In this case, when the customer order_product is slightly ahead of the customer, he can create an empty order node using the appropriate key or identifier, and when the customer finally processes the new order in which he finds the existing node placeholder and fill in the details. Of course, when order comes before the order_product relationship, then everything works as you might expect. Such an approach cannot be resolved by a descending graph database system or by the business levels of constraints defined in the graph database. But if it is allowed and downstream applications and services are designed to handle such conditions, then you will get considerable simplicity so that this approach gives as consumers become almost trivial. You will be managing a less intermediate state, and your system will be more likely to continue to work when something goes wrong (for example, a consumer failure or withdrawn for service). If your downstream consumers have to stick to transaction boundaries in the database source, then you can consider using Kafka threads to join order and order_products and create a single aggregate order object with all relationships to reference products. if you do not assume that the product already exists, then you can also use the product theme to add additional product details to the general order object. Of course, there are still a lot of problems, since the way for a stream processor consuming these threads to know what it has seen all the row level change events for a given transaction is when a subsequent transaction is considered on each thread. As you might expect, this is not ideal, since the last transaction before any silent period will not be completed immediately.

Michael Mihailidis @ zifnab87 Apr 17 23:49 Thanks @rhauch very well explained! I explored the flows of Kafka, waiting for your answers! now i think i will try to encode the placeholder for example when there is no vertex, etc.

Randall Hauch @rhauch Apr 17 23:58 @ zifnab87 glad this helped, at least a little! Make sure that you also consider the fact that the consumer can see the sequence of messages that she already consumes. It will only happen when something goes wrong (for example, with the connector or the process (s) in which the connector works, or the broker, network partition, etc.); when everything works fine, the consumer should not see duplicate messages.

Michael Mihailidis @ zifnab87 Apr 18 01:15 @rhauch Of course it helped! Yeap I mean - consumer processes should be idempotent. I'm curious if, for example, a sink for say elastic search, mongodb and graph databases can be implemented to consolidate created events from debezium-mysql no matter what order using placeholders for missing things .. for example, climbers shells do is it alreadu if you know by chance? I try to avoid overriding things that already exist .. Also my decisions can be very fragile if mysql changes the schema and I do not consume new events. I feel so many things are missing in the world of microservices

Randall Hauch @rhauch Apr 18 03:30 I'm not sure how these sinks work. Ideally, they should handle the create, update, and delete events correctly. But since Debezium events have an envelope at the top of each event level, you probably have to use SMT to grab the contents of the after field (or exclude the before field), so the "significant" parts are placed in the sink system. This is likely to get more as more SMTs are added to KC. If you find that these are also many SMTs and, rather, Debezium added the SMT that did this, please register the function request with JIRA.

Hopefully this answer / guide will help others move to the original source of events, having as their center a message broker such as Kafka.

+3
source

Source: https://habr.com/ru/post/1014851/


All Articles