Understanding the breakpoint in eventhub

I want to make sure that if my Eventhub client crashes (currently it is a console application), it receives only those events that have not yet been received from Eventhub. One way to achieve this is to use offsets. However, this (in my opinion) requires the client to keep the last offset (except that the events do not necessarily fall into the foreach loop of the ProcessEventsAsync method ordered by SequenceNumber).

An alternative is to use breakpoints. I think they are saved through the server (eventhub) using the provided credentials of the storage account. It is right?

This is the preliminary code that I am currently using:

public class SimpleEventProcessor : IEventProcessor { private Stopwatch _checkpointStopWatch; async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason) { Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason); if (reason == CloseReason.Shutdown) { await context.CheckpointAsync(); } } Task IEventProcessor.OpenAsync(PartitionContext context) { Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset); _checkpointStopWatch = new Stopwatch(); _checkpointStopWatch.Start(); return Task.FromResult<object>(null); } async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) { foreach (var eventData in messages) { // do something } //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts. if (_checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5)) { await context.CheckpointAsync(); _checkpointStopWatch.Restart(); } } } 

I believe sending sends a checkpoint to the server every 5 minutes. How does the server know which client sent the breakpoint (via context)? Also, how can I prevent event re-processing after the client restarts? In addition, a window of up to 5 minutes may remain in which events are processed again. Perhaps I should use a queue / topic, given my requirements?

PS:

This seems sufficient:

 async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) { foreach (var eventData in messages) { // do something } await context.CheckpointAsync(); } 
+13
source share
1 answer

Lemm put forward some basic terms before answering :

EventHubs is a high-performance pipeline for receiving long-term events. Simply put, this is a reliable stream of events in the cloud.

The offset on EventData (one event in the stream) is literally a cursor in the stream. The presence of this cursor will enable operations such as - resume reading from this cursor (aka Offset) - inclusively or exclusively.

The EventProcessor library is a framework created by the EventHubs team on top of the ServiceBus SDK to create an "event gu receiver" - it looks simpler. ZooKeeper for Kafka <-> EPH for Event Hub . It will make sure that the process that starts the EventProcessor on a specific partition dies / crashes - it will be resumed from the last Checkpointed offset - in another available instance of EventProcessorHost.

CheckPoint : Today - EventHubs only supports client-side validation. When you call Checkpoint from your client code:

 await context.CheckpointAsync(); 

- it is converted to a storage call (directly from the client), which will store the current offset in the storage account you provided. The EventHubs service will not contact the repository to check the check.

ANSWER

The EventProcessor Framework is designed to achieve exactly what you are looking for.

Control points are not saved through the server (aka EVENTHUBS Service). It is purely on the client side. You are talking to Azure Storage. For this reason, the EventProcessor library introduces a new additional dependency - AzureStorageClient . You can connect to the storage account and to the container in which the control points are recorded - we store information about the owner - EPH instances (names) to the sections of the EventHub hubs that they own and to which control point they are currently being read / processed until since then.

According to the timer-based checkpoint pattern — you initially had — if the process stopped, you would re-do the events in the last 5-minute window. This is a healthy sample, like:

  1. the fundamental assumption is that faults are rare events, so you will be dealing with duplicate events
  2. in the end, you'll make fewer calls to the vault service (which you can easily overflow by checking frequently). I would take one more step and actually make the checkpoint call asynchronously. OnProcessEvents don't need to fail if a breakpoint fails!

if you want to repeat themselves completely without events, you will need to build this deduplication logic in a downstream pipeline.

  • every time EventProcessorImpl is started - there is no request for your last flow from the downstream. he received and continues to discard events until the current sequence no.

here's a more general reading on Event Hubs ...

+20
source

Source: https://habr.com/ru/post/1243252/


All Articles