How to pass a dependency to a class that implements IEventProcessor (Event Hub)

I have the following problem.

we use an event hub. In the next class, we inherit from IEventProcessor, and as you can see, we use the Service Locator. We cannot get it to work with the constructor / property installation. It seems that Castle Windsor cannot resolve dependencies when a class that inherits from IEventProcessor. Is this a known issue or is there something I need to do to make it work?

Below is the code:

public class EventProcessor : IEventProcessor
{
    private readonly IEventService _eventService;
    private readonly ILogger _logger;
    private readonly Lazy<RetryPolicy> _retryPolicy;
    private readonly IConfigurationProvider _configurationProvider;

    public EventProcessor()
    {
        try
        {
            _eventService = ContainerProvider.Current.Container.Resolve<IEventService>();
            _logger = ContainerProvider.Current.Container.Resolve<ILogger>();
            _configurationProvider =     ContainerProvider.Current.Container.Resolve<IConfigurationProvider>();

        }
        catch (Exception exception)
        {
            _logger.WriteError(string.Format("Error occured when intializing EventProcessor: '{0}'", exception));
        }
    }

    public Task OpenAsync(PartitionContext context)
    {
        return Task.FromResult(0);
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
    {
        var eventsList = events.ToList();
        EventData lastEvent = null;
        foreach (var eventData in eventsList)
        {
            _logger.WriteVerbose(string.Format("Consumming {0} events...", eventsList.Count()));
            _eventService.ProcessEvent(eventData);
            lastEvent = eventData;
        }

        if (lastEvent != null)
        {
            await AzureServiceBusRetryPolicy.ExecuteAsync(async () => await context.CheckpointAsync(lastEvent));
        }
    }

    public async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        _logger.WriteInfo("EventHub processor was closed for this reason: " + reason);

        if (reason == CloseReason.Shutdown)
        {
            await AzureServiceBusRetryPolicy.ExecuteAsync(async () => await context.CheckpointAsync());
        }

    }


}

thank

+4
source share
2 answers

I am using Autofac, but ran into the same problem.

IEventProcessorFactory EventProcessorHost.

, EventProcessorHost :

public class EventHubProcessorHost
{
    private readonly IEventProcessorFactory _eventProcessorFactory;
    private readonly string _serviceBusConnectionString;
    private readonly string _storageConnectionString;
    private readonly string _eventHubName;

    public EventHubProcessorHost(IEventProcessorFactory eventProcessorFactory, string serviceBusConnectionString, string storageConnectionString, string eventHubName)
    {
        _eventProcessorFactory = eventProcessorFactory;
        _serviceBusConnectionString = serviceBusConnectionString;
        _storageConnectionString = storageConnectionString;
        _eventHubName = eventHubName;
    }

    public void Start()
    {
        var builder = new ServiceBusConnectionStringBuilder(_serviceBusConnectionString)
        {
            TransportType = TransportType.Amqp
        };

        var client = EventHubClient.CreateFromConnectionString(builder.ToString(), _eventHubName);

        try
        {
            var eventProcessorHost = new EventProcessorHost("singleworker",
              client.Path, client.GetDefaultConsumerGroup().GroupName, builder.ToString(), _storageConnectionString);

            eventProcessorHost.RegisterEventProcessorFactoryAsync(_eventProcessorFactory);
        }
        catch (Exception exp)
        {
            Console.WriteLine("Error on send: " + exp.Message);
        }
    }
}

factory, , IoC:

public class MyEventProcessorFactory : IEventProcessorFactory
{
    private readonly IComponentContext _componentContext;

    public MyEventProcessorFactory(IComponentContext componentContext)
    {
        _componentContext = componentContext;
    }

    public IEventProcessor CreateEventProcessor(PartitionContext context)
    {
        return _componentContext.Resolve<IEventProcessor>();
    }
}

EventProcessor:

public class MyEventProcessor : IEventProcessor
{
    private IFoo _foo;

    public MyEventProcessor(IFoo foo)
    {
        _foo = foo;
    }

    public Task OpenAsync(PartitionContext context)
    {
        return Task.FromResult<object>(null);
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
    {
        foreach (var eventData in events)
        {
            // Processing code
        }

        await context.CheckpointAsync();
    }

    public async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }
}

Autofac :

builder.RegisterType<Foo>().As<IFoo>()

builder.RegisterType<MyEventProcessor>().As<IEventProcessor>()

builder.Register(c => new MyEventProcessorFactory(c.Resolve<IComponentContext>())).As<IEventProcessorFactory>();

, .

+4

dependecies . ctor-injection, , . sth.

public EventProcessor(IEventService eventService, ILogger logger, IConfigurationProvider configProvider)
{
    try
    {
        this._eventService = eventService;
        this._logger = logger; // however, i suggest using Property injection for that, see next example
        this._configurationProvider = configProvider;

    }
    catch (Exception exception)
    {
        this._logger.WriteError(string.Format("Error occured when intializing EventProcessor: '{0}'", exception));
    }
}

EventProcessor, .

. , .

public ILogger Logger {get; set;}

Castle Windsors (. LoggingFacility Winsdor. , NLog log4net (, )? → , nugetpackage ( Windor log4net).

0

Source: https://habr.com/ru/post/1570512/


All Articles