How to correctly insert JSON in HDFS using Flume

I am using HTTPSource in Flume to receive json formatted POST events as follows:

 {"username":"xyz","password":"123"} 

My question is: should I change the event source (I mean the one that sends json to Flume), so JSON has the following format:

 [{ "headers" : { "timestamp" : "434324343", "host" : "random_host.example.com" }, "body" : "{"username":"xyz","password":"123"}" }] 

Is this the best way to do this? Or can I change it everywhere?

My conf file for flume agent :

 ## Componentes SomeAgent.sources = SomeHTTP SomeAgent.channels = MemChannel SomeAgent.sinks = SomeHDFS ## Fuente e Interceptores SomeAgent.sources.SomeHTTP.type = http SomeAgent.sources.SomeHTTP.port = 5140 SomeAgent.sources.SomeHTTP.handler = org.apache.flume.source.http.JSONHandler SomeAgent.sources.SomeHTTP.channels = MemChannel SomeAgent.sources.SomeHTTP.interceptors = i1 i2 ## Interceptores SomeAgent.sources.SomeHTTP.interceptors.i1.type = timestamp SomeAgent.sources.SomeHTTP.interceptors.i2.type = host SomeAgent.sources.SomeHTTP.interceptors.i2.hostHeader = hostname ## Canal SomeAgent.channels.MemChannel.type = memory SomeAgent.channels.MemChannel.capacity = 10000 SomeAgent.channels.MemChannel.transactionCapacity = 1000 ## Sumidero SomeAgent.sinks.SomeHDFS.type = hdfs SomeAgent.sinks.SomeHDFS.channel = MemChannel SomeAgent.sinks.SomeHDFS.hdfs.path = /raw/logs/%Y-%m-%d SomeAgent.sinks.SomeHDFS.hdfs.fileType = DataStream SomeAgent.sinks.SomeHDFS.hdfs.filePrefix = SomeLogs- SomeAgent.sinks.SomeHDFS.hdfs.writeFormat = Text SomeAgent.sinks.SomeHDFS.hdfs.batchSize = 100 SomeAgent.sinks.SomeHDFS.hdfs.rollSize = 0 SomeAgent.sinks.SomeHDFS.hdfs.rollCount = 10000 SomeAgent.sinks.SomeHDFS.hdfs.rollInterval = 600 SomeAgent.sinks.SomeHDFS.hdfs.useLocalTimeStamp = true 

Running cat from hadoop fs

 $ hadoop fs -ls -R /raw/logs/somes drwxr-xr-x - flume-agent supergroup 0 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16 -rw-r--r-- 3 flume-agent supergroup 3814 2015-06-16 12:33 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434471803369 -rw-r--r-- 3 flume-agent supergroup 3719 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434472404774 $ hadoop fs -cat /raw/logs/somes/2015-06-16/SomeLogs.1434471803369 | head $ 

(you look right, empty lines)

If now I look at the file (for example, using the binary HUE ):

 0000000: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................ 0000010: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................ 0000020: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................ 
+6
source share
3 answers

If I understand well, you want to serialize both the data and the headers. In this case, you do not need to change the data source, but use some standard Flume elements and create your own serializer for HDFS.

The first step is to create a Flume that creates the desired JSON structure, i.e. headers + body. Flume can do this for you, just use the JSONHandler on your HTTPSource, this way:

 a1.sources = r1 a1.sources.r1.hnadler = org.apache.flume.source.http.JSONHandler 

In fact, there is no need to configure the JSON handler, as it is standard for HTTPSource.

Then use the Interceptor Timestamp and Host Interceptor to add the desired headers. The only trick is that the Flume agent must run on the same machine as the sender process, so that the intercepted host is the same as the sender:

 a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = timestamp a1.sources.r1.interceptors.i2.type = host a1.sources.r1.interceptors.i2.hostHeader = hostname 

At this point you will have the desired event. However, standard serializers for HDFS only retain the body, not the headers. So create your own serializer that implements org.apache.flume.serialization.EventSerializer . It is configured as:

 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.serializer = my_custom_serializer 

NTN

+4
source

The answer posted by @frb was correct, the only thing missing is that the JSON generator should send the body part (I have to admit / complain that the docs at this point are not clear), so the correct way to post json is

 [body:"{'username':'xyz','password':'123'}"] 

Note that json data is now a string.

With this change, json now displayed in hdfs .

+3
source

Flute HTTPSource using JSONHandler by default expects a list of fully-formed Flume events in the JSON view [{ headers: ..., body: ... }] be sent to the endpoint; to create an agent endpoint that can host an application-level guest structure, for example {"username":"xyz", "password":"123"} , you can override the handler using an alternative class that implements HTTPSourceHandler ; see the source of JSONHandler - there are not many.

 public List<Event> getEvents(HttpServletRequest request) throws ... 

In custom JSONHandler, you can also add headers to the event based on an HTTP request, such as source IP address, User-Agent, etc. (Interceptor will have no context for this). You might want to check the JSON provided by the application at the moment (although the default handler does not work).

Although, as you already found, you can only pass the [{body: ...}] , such a custom handler can also be useful if you want the generator to not insert headers for the event.

+1
source

Source: https://habr.com/ru/post/989156/


All Articles