How to get new table rows every minute

I have a table to which rows are only added (not updated or deleted) with transactions (I will explain why this is important), and I need to extract new, previously unprocessed rows of this table, every minute using cron.

How do i do this? In any programming language (I use Perl, but that doesn't matter.)

I list how I thought about how to solve this problem, and ask you to show me the correct one (there should be one ...)

The first method that appeared in my head was to save (in a file) the largest auto_incrementing identifier from the selected lines, so the next minute I can get with: WHERE id > $last_id . But this may skip the lines. Since new lines are inserted into the transaction, it is possible that a transaction that saves the line with id = 5 makes a transaction that saves the line with id = 4. Therefore, it is possible that the cron script retrieves line 5, but not line 4, and when line 4 will be fixed one second later, it will never be fetched (since 4 is not> 5, but $ last_id).

Then I thought that I could force the cron job to get all the lines that have a date field in the last two minutes, check which of these lines was received again in the previous cron job run (for this I will need to save somewhere which identifiers lines were received), compare and process only new ones. Unfortunately, this is difficult, and also does not solve the problem that arises if any transaction insertion takes two minutes and HALF minutes to fix the database for some strange reason, which will cause the date to be too old for the next iterate the cron job to retrieve.

Then I thought about installing Message Queuing (MQ), such as RabbitMQ or whatever. The same process that performs the insert transaction will notify RabbitMQ of the new line, and RabbitMQ will notify of the constantly running process that processes new lines. Thus, instead of inserting a package of lines at the last minute, this process will receive new lines one by one as they are written. This sounds good, but it has too many points of failure - RabbitMQ can be shut down for a second (for example, when rebooting), in which case the insert transaction will be transmitted without receiving the process that has ever received a new line. Thus, a new line will be skipped. Not good.

I just thought about one more solution: about the reception processes (there 30 of them, performing the same task using exactly the same data, therefore the same lines are processed 30 times, one each receiving process) can write in another that they processed the row X when they were processed, and then, when the time comes, they can query all the rows in the main table that do not exist in the has_processed table with the OUTER JOIN query. But I believe (correct me if I am mistaken) that such a request will consume a lot of CPU and HD on the database server, since it will have to compare the entire list of identifiers of two tables to find new records (and the table is huge and getting bigger and bigger ) It would be fast if there was only one receiving process - then I could add an indexed field with the name "has_read" in the main table, which would make finding new rows extremely fast and easy on the database server.

What is the right way to do this? What do you suggest? The question is simple, but the solution seems difficult (for me) to find.

Thanks.

+4
source share
5 answers

I thought about this for a while. So let me see if I understood correctly. You have a HUGE table in which N, an amount that can change over time, processes the record (let me call them producers). Now there are such M, the amount that changes me over time, other processes that should at least be processed after each of these entries is added (call them consumers).

Key issues found:

  • Make sure the solution will work with dynamic N and M
  • Need to track raw records for each user
  • The decision should cost as much as possible due to the huge number of entries

To solve these problems, I thought about it. Create this table (PK in bold):

  • PENDING_RECORDS ( UserID , HugeTableID )

Change the consumers so that each time they add an entry to HUGE_TABLE, they also add M records to the PENDING_RECORDS table to have a HugeTableID, as well as each ConsumerID that exists at that time. Each time the consumer starts, he queries the PENDING_RECORDS table and finds a small number of matches for himself. Then it will join HUGE_TABLE (note that this will be an internal join, not a left join) and a selection of the actual data that needs to be processed. After processing the data, the consumer will delete the records from the PENDING_RECORDS table, keeping it decently small.

+1
source

I believe that the best way to do this is to use one process that checks for new lines and delegates them to thirty consumer processes. Then your task is simplified for database management, and the delegation process is not so difficult to write.

If you are stuck with thirty consumer processes through a database, the best option I could come up with is to create a trigger in a table that copies each row to the secondary table. Copy each row to the second table thirty times (once for each consumer process). Add a column to this secondary table indicating the "target" consumer process (for example, a number from 1 to 30). Each consumer process checks for new lines with a unique number, and then deletes them. If you are worried that some lines are deleted before they are processed (because the consumer falls in the middle of processing), you can receive, process and delete them in turn.

Since the secondary table is kept small by continuously deleting processed rows, INSERT s, SELECT and DELETE will be very fast. All operations on this secondary table will also be indexed using the primary key (if you put the consumer ID as the first field of the primary key).

In MySQL operations, it will look like this:

 CREATE TABLE `consumer`( `id` INTEGER NOT NULL, PRIMARY KEY (`id`) ); INSERT INTO `consumer`(`id`) VALUES (1), (2), (3) -- all the way to 30 ; CREATE TABLE `secondaryTable` LIKE `primaryTable`; ALTER TABLE `secondaryTable` ADD COLUMN `targetConsumerId` INTEGER NOT NULL FIRST; -- alter the secondary table further to allow several rows with the same primary key (by adding targetConsumerId to the primary key) DELIMTER // CREATE TRIGGER `mark_to_process` AFTER INSERT ON `primaryTable` FOR EACH ROW BEGIN -- by doing a cross join with the consumer table, this automatically inserts the correct amount of rows and adding or deleting consumers is just a matter of adding or deleting rows in the consumer table INSERT INTO `secondaryTable`(`targetConsumerId`, `primaryTableId`, `primaryTableField1`, `primaryTableField2`) SELECT `consumer`.`id`, `primaryTable`.`id`, `primaryTable`.`field1`, `primaryTable`.`field2` FROM `consumer`, `primaryTable` WHERE `primaryTable`.`id` = NEW.`id`; END// DELIMITER ; -- loop over the following statements in each consumer until the SELECT doesn't return any more rows START TRANSACTION; SELECT * FROM secondaryTable WHERE targetConsumerId = MY_UNIQUE_CONSUMER_ID LIMIT 1; -- here, do the processing (so before the COMMIT so that crashes won't let you miss rows) DELETE FROM secondaryTable WHERE targetConsumerId = MY_UNIQUE_CONSUMER_ID AND primaryTableId = PRIMARY_TABLE_ID_OF_ROW_JUST_SELECTED; COMMIT; 
+2
source

Interesting, I have to say :)

1) First of all - is it possible to add a field to a table in which only rows are added (name it "transactional_table")? I mean, this is a design paradigm, and you have a reason not to make any updates in this table, or it is "structurally" blocked (i.e. the user connecting to db does not have privileges to perform updates in this table )?

Since then the easiest way to do this is to add the has_read column to this table with a default value of 0 and update this column in the selected rows with 1 (even if 30 processess do this at the same time, you should be fine, as it should be very fast and it will not damage your data). Even if 30 processess marks the same 1000 lines that are selected, nothing is damaged. Although, if you are not working with InnoDB, this may not be the best way in terms of performance (MyISAM blocks entire tables during updates, only updated InnoDB rows).

2) If this is not something that you could use, I would surely consider the solution that you gave as the last, with a slight modification. Create a table (say: fetched_ids) and save the identifiers of the selected rows in this table. Then you can use something like:

 SELECT tt.* from transactional_table tt RIGHT JOIN fetched_ids fi ON tt.id = fi.row_id WHERE fi.row_id IS NULL 

This will return rows from your transaction table that were not saved as already selected. As long as both (tt.id) and (fi.row_id) have (perfectly unique) indexes, this should work fine even on large datasets. MySQL handles JOINS perfectly on indexed fields. Do not be afraid to try it - create a new table, copy it into it, delete some of them and run your query. You will see the results, and you will find out if they are satisfied :)

PS Of course, adding rows to this "fetched_ids" table should be done strictly so as not to create unnecessary duplicates (30 simultaneous processes could record 30 times more data that you need), and if you need performance, you should follow this case )

0
source

How about a second table with a structure like this:

source_fk - this will contain the identifier of the rows of data that you want to read. process_id - this will be a unique identifier for one of the 30 processes.

then do a LEFT JOIN and exclude items from your source that have entries matching the specified process_id.

as soon as you get your results, just go back and add source_fk and process_id for each result you get.

One plus point about this is that you can add several processes later without any problems.

0
source

I would try adding a timestamp column and use it as a reference when getting new rows.

0
source

Source: https://habr.com/ru/post/1500828/


All Articles