Reading data written to s3 by Amazon Kinesis Firehose

I am writing a recording to the Kinesis Firehose stream, which ultimately writes to the Amazon Kinesis Firehose S3 file.

My recording object looks like

ItemPurchase { String personId, String itemId } 

Data is written to S3, it looks like this:

 {"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"} 

NO COMMERCIAL DEATH.

NO STARTING BRACKET, as in a Json array

 [ 

NO ENDING BRACKET, as in a Json array

 ] 

I want to read this data to get a list of ItemPurchase objects.

 List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent)) 

What is the correct way to read this data?

+9
source share
8 answers

I also had the same problem, this is how I solved it.

  • replace "} {" with "} \ n {"
  • divide the string by "\ n".

     input_json_rdd.map(lambda x : re.sub("}{", "}\n{", x, flags=re.UNICODE)) .flatMap(lambda line: line.split("\n")) 

The nested json object has several "}" s, so the split line on "}" does not solve the problem.

+5
source

I had the same problem.

It would be better if AWS allowed us to set the delimiter, but we can do it ourselves.

In my case, I listened to a stream of tweets, and when I received a new tweet, I immediately put it on Firehose .

This, of course, led to a 1-line file that could not be parsed.

So, to solve this problem, I combined a JSON tweet with \n . This, in turn, allows me to use some packages that can output lines when reading the contents of a stream and easily parse the file.

Hope this helps you.

+3
source

It seems to me that Amazon Firehose thus unloads JSON messages on S3 and does not allow you to set a delimiter or something else.

Ultimately, the trick I found to solve this problem was to process the text file using the JSON raw_decode method

This will allow you to read a bunch of concatenated JSON entries without any separators between them.

Python Code:

 import json decoder = json.JSONDecoder() with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file: content = content_file.read() content_length = len(content) decode_index = 0 while decode_index < content_length: try: obj, decode_index = decoder.raw_decode(content, decode_index) print("File index:", decode_index) print(obj) except JSONDecodeError as e: print("JSONDecodeError:", e) # Scan forward and keep trying to decode decode_index += 1 
+3
source

If the source of the input signal for the fire hose is the Analytics application, this non-delimited composite JSON is a known issue, which is cited here . You should have a lambda function, like here , that outputs JSON objects in multiple lines.

+2
source

If there is a way to change the way data is written, separate all the records by line. This way you can read data simply line by line. If not, just create a scanner object that accepts "}" as a separator and uses the scanner to read it. That would do the job.

0
source

I think the best ways to solve this problem is to first create a properly formatted json file containing well-separated json objects inside them. In my case, I added a "," to events that were embroiled in a fire. Then After saving the file to s3, all files will contain a json object separated by some separator (in our case). Another thing to be added is the "[" and "]" at the beginning and end of the file. Then you have the correct json file containing several json objects. Now they can be disassembled.

0
source

You can find every valid JSON by counting the brackets. Assuming the file starts with { this Python snippet should work:

 import json def read_block(stream): open_brackets = 0 block = '' while True: c = stream.read(1) if not c: break if c == '{': open_brackets += 1 elif c == '}': open_brackets -= 1 block += c if open_brackets == 0: yield block block = '' if __name__ == "__main__": c = 0 with open('firehose_json_blob', 'r') as f: for block in read_block(f): record = json.loads(block) print(record) 
0
source

Use this simple Python code.

 input_str = '''{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}''' data_str = "[{}]".format(input_str.replace("}{","},{")) data_json = json.loads(data_str) 

And then (if you want) convert to pandas.

 import pandas as pd df = pd.DataFrame().from_records(data_json) print(df) 

And this is the result

 itemId personId 0 i-111 p-111 1 i-222 p-222 2 i-333 p-333 
0
source

Source: https://habr.com/ru/post/1239232/


All Articles