Flume agent: add a host to the message and then post to the kafka topic

We began to consolidate event log data from our applications by posting in the Kafka thread. Although we could write directly from the application in Kafka, we decided to consider it as a general problem and use the Flume agent. This provides some flexibility: if we want to grab something else from the server, we can simply drive another source and post it to another Kafka topic.

We created a conf file for the Flume agent to record the journal and publish it in the Kafka theme:

tier1.sources  = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = exec
tier1.sources.source1.command = tail -F /var/log/some_log.log
tier1.sources.source1.channels = channel1

tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = some_log
tier1.sinks.sink1.brokerList = hadoop01:9092,hadoop02.com:9092,hadoop03.com:9092
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.batchSize = 20

Unfortunately, the messages themselves do not indicate the host that created them. If we have an application running on several hosts and an error occurs, we cannot understand which host created the message.

, Flume HDFS, Flume HDFS. , , - Kafka, .. , . .

Flume / , ?

+4
2

exec, .

: , , :

tier1.sources.source1.type = exec
tier1.sources.source1.shell = /bin/sh -c
tier1.sources.source1.command =  tail -F /var/log/auth.log | sed --unbuffered "s/^/$(hostname) /"

:

frb.hi.inet 2015-11-17 08:39:39.432 INFO [...]

... frb.hi.inet .

+1

TCP, .

@Override
    public void configure(Context context) {
        port = context.getInteger("port");
        buffer = context.getInteger("buffer");

        try{
            serverSocket = new ServerSocket(port);
            logger.info("FlumeTCP source initialized");
        }catch(Exception e) {
            logger.error("FlumeTCP source failed to initialize");
        }
    }

@Override
    public void start() {
        try {
            clientSocket = serverSocket.accept();
            receiveBuffer = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
            logger.info("Connection established with client : " + clientSocket.getRemoteSocketAddress());
            final ChannelProcessor channel = getChannelProcessor();
            final Map<String, String> headers = new HashMap<String, String>();
            headers.put("hostname", clientSocket.getRemoteSocketAddress().toString());
            String line = "";
            List<Event> events = new ArrayList<Event>();

            while ((line = receiveBuffer.readLine()) != null) {
                Event event = EventBuilder.withBody(
                        line, Charset.defaultCharset(),headers);

                logger.info("Event created");
                events.add(event);
                if (events.size() == buffer) {
                    channel.processEventBatch(events);
                }
            }
        } catch (Exception e) {

        }
        super.start();
    }

flume-conf.properties :

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'

agent.sources = CustomTcpSource
agent.channels = memoryChannel
agent.sinks = loggerSink

# For each one of the sources, the type is defined
agent.sources.CustomTcpSource.type = com.vishnu.flume.source.CustomFlumeTCPSource
agent.sources.CustomTcpSource.port = 4443
agent.sources.CustomTcpSource.buffer = 1


# The channel can be defined as follows.
agent.sources.CustomTcpSource.channels = memoryChannel

# Each sink type must be defined
agent.sinks.loggerSink.type = logger

#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel

# Each channel type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

, , :

Event: { headers:{hostname=/127.0.0.1:50999} body: 74 65 73 74 20 6D 65 73 73 61 67 65             test message }

github

+2

Source: https://habr.com/ru/post/1616133/


All Articles