Minimum lot size Kinesis Lambda

I am using AWS Lambda (node.js) as a consumer of AWS Kinesis. I see that you can set a maximum batch size, but I wonder if I can set a minimum batch size. So that I can guarantee that each lambda will process records of at least 50 (or any numbers).

I would like to have a minimum packet size because the lambda consumer will establish a connection to the MySQL RDS instance, and I am trying to maintain a low number of concurrent connections.

If there is no configuration option that would set a minimum, any ideas for workarounds would be appreciated.

Thanks.

+5
source share
2 answers

The first question I have is how many debris do you have in the stream? You get only 1 instance of lambda for each shard executed simultaneously. So if you have only 1 shard, then you will only have 1 lambda at a time to hit your RDS instance.

Do you have data that indicates a problem?

What follows is a hack that may or may not work reliably. And probably should not be used in a prod environment.

For a minimum batch size, you can return error from your lambda function node.js if the batch size is less than the desired number of records.

eg.

 handler(event, context, callback) { const records = event.Records; if (records.length() < minBatchSize) { callback('insufficient batch size'); } else { processRecords(records, callback); } } 

BUT there are two problems:

1) You cannot do this indefinitely without risking losing data, since the maximum time limit is configured in your stream. After this time, the entries disappear from the stream. Please note that you pay an additional function for this function (see advanced data storage ).

You can specify the age of the group from the age metrics of the lambda / kinesis iterator, see http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html .

I'm not sure how reliable this is, especially if you have more than 1 shard, but, for example,

 handler(event, context, callback) { const records = event.Records; if (records.length() < minBatchSize) { if (calculateLambdaAge() > tooLongDelayThreshold) { processRecords(records, callback); } else { callback(new Error('insufficient batch size')); } } else { processRecords(records, callback); } } calculateLambdaAge() { // interrogate cloudwatch } 

If the cloudwatch does not tell you, you may need to track it yourself somewhere, which is at least as scalable as your RDS (redis / dynamo).

2) Instead of making an effort to make # 1 reliable, could this extra effort just expand your RDS instance to make your current use more efficient?


I mentioned this and this when building code samples.

0
source

One way could be to use Kinesis Firehose, which combines multiple inbound records based on buffering your delivery stream.

  • Send Firehose data - either directly place recordings in the Firehose Stream using their API, or attach Firehose to an existing kinesia stream.
  • Set S3 as the Firehose Destination - This will essentially be the collection of your individual entries and put them in S3 as one entity. You can specify your separators and even convert lambda functions to single entries.
  • Listen to S3: PutObject - attach your lambda to listen to the S3 bucket that receives these aggregated records from the Firehose stream.
0
source

Source: https://habr.com/ru/post/1271439/


All Articles