Logstash: combining two magazines into one output

I installed syslog to send logs to logstash with the following filters:

output { elasticsearch { hosts => ["localhost:9200"] document_id => "%{job_id}" } } filter { grok { overwrite => ["message"] } json { source => "message" } } 

A typical message from one of my applications will have an initial state and job_id:

 {"job_id": "xyz782", state: "processing", job_type: "something"} 

In a few minutes, another log will have the same log_id, different state and processing time:

 {"job_id": "xyz782", state:"failed", processing_time: 12.345} 

These fields load correctly, but two documents are created. I would like for only one document to be created for the source journal, and the second journal would update the first instead, that is, the updated document would have the following fields:

 {"job_id": "xyz782", state: "failed", job_type: "something", processing_time: 12.345} 

As you can see in my logstash conf message, I use job_id as the document identifier, however the second message seems to replace the fields from the first message, but also removes all fields in the first message, t in the second, for example, the job_type field present in the first message is not displayed in the final document. This may be due to the fact that json comes from the same "message" field both times. Is there any other way to get the union of two log messages into one document in logstash?

+5
source share
1 answer

You can use the aggregate filter for this. An aggregate filter provides support for combining multiple log lines into a single event based on a common field value. In your case, the general field will be the job_id field.

Then we need another field to detect the first event against the second event, which should be aggregated. In your case, it will be a state field.

Therefore, you just need to add another filter to your existing Logstash configuration, for example:

 filter { ...your other filters if [state] == "processing" { aggregate { task_id => "%{job_id}" } } else if [state] == "failed" { aggregate { task_id => "%{job_id}" end_of_task => true timeout => 120 } } } 

You can adjust the timeout (in seconds) depending on how long your tasks take.

+4
source

Source: https://habr.com/ru/post/1242297/


All Articles