Akka streams with Alpakka indexing in ES: index name is evaluated only at startup

I wrote code with Akka Streams and Alpakka, which reads from Amazon SQS and indexes events in Elasticsearch. Everything works smoothly and the performance is awesome, but I have a problem with index names. I have this code:

class ElasticSearchIndexFlow(restClient: RestClient) {

  private val elasticSettings = ElasticsearchSinkSettings(bufferSize = 10)

  def flow: Flow[IncomingMessage[DomainEvent, NotUsed], Seq[IncomingMessageResult[DomainEvent, NotUsed]], NotUsed] =
    ElasticsearchFlow.create[DomainEvent](index, "domain-event", elasticSettings)(
      restClient,
      DomainEventMarshaller.domainEventWrites
    )

  private def index = {
    val now = DateTime.now()
    s"de-${now.getYear}.${now.getMonthOfYear}.${now.getDayOfMonth}"
  }
}

The problem is that after several days of running the thread, the index name does not change. I believe that Akka Streams creates a fused actor under the hood and that the function indexto get the index name is evaluated only at the beginning of execution.

Any idea what I can do to index events in ES with the index name according to the current date?

+4
source share
1 answer

IncomingMessage.withIndexName

:

def flow: Flow[(DomainEvent, Message), IncomingMessage[DomainEvent, Message], NotUsed] =
  Flow[(DomainEvent, Message)].map {
    case (domainEvent, message) =>
      IncomingMessage(Some(domainEvent.eventId), domainEvent, message)
        .withIndexName(indexName(domainEvent.ocurredOn))
}

:

def flow: Flow[IncomingMessage[DomainEvent, NotUsed], Seq[IncomingMessageResult[DomainEvent, NotUsed]], NotUsed] =
  ElasticsearchFlow.create[DomainEvent]("this-index-name-is-not-used", "domain-event", elasticSettings)(
    restClient,
    DomainEventMarshaller.domainEventWrites
  )
0

Source: https://habr.com/ru/post/1696279/


All Articles