Collecting events and execute delayed

This is a pretty general question but I stuck nonetheless.

I have an application where users do actions - mostly crud but also more complicated use cases. For each action an event with necessary information is published via amqp.

On the other side I have a service who is supposed to update an elasticsearch index based on these actions.

What I stuck at is the second part - Collecting events and execute actions without the risk of race conditions. The solution should needs to support multiple instances of this service. I thought about some approaches but there are downsides on every of them.

  1. Reacting on each event immediately.
    Pro: Very simple to implement
    Contra: Race conditions very often since every processing can take time of a few seconds

  2. Collecting and storing all events over a time of let’s say 10 seconds. Then let one instance grab and kinda lock them and process them
    Pro: One could optimize the work in front of the actual processing
    Contra: We did this back then with Laravel (php) and its job queue but the multiple instances “stole” the events from each other since one could not block the actions quickly enough. On top there were dead locks all the time due to permanently reading and writing.
    And what happens if an event is grabbed and sth. fails in a way that it can not be released again?

  3. Recreate the whole index every x seconds.
    Pro: Most simple solution
    Contra: Waste of resources and time consuming on large scales

n. Some other solutions but always similar to one of the 3 above.

Since I think this problem is not really new I am pretty sure that there is a basic approach for this. Unfortunately I just could not found it yet.

I would be pleased if someone could give me a hint on this :slight_smile:

Why don’t you just read them from AMQP and update Elastic?

I do use it. But as I described: The problem are the race conditions. Think about one entity is changed twice within one second:
Message 1 comes, grabs the entity, changes it and writes it back.
Meanwhile message 2 comes which changes another field and writes it back after message 1 is processed. This leads then to the problem that the change from message 1 is lost.

This is just a simple example. In my case the processing of the messages are more complicated.

Event sourcing just came to my mind but even there: How can one handle parallelism?

It doesn’t matter, you just store all events in the database. If you need to recreate a state in the past (projection), then use event sourcing. If you use a queue like Kafka or NATS Streaming, it can store events itself.

The problem is not about storing the events. The problem is about using the events to update the elasticsearch index based on them in parallel.

But after some researching it seems that in this case due to the principle it is just impossible to process them in parallel.

EDIT: It is about creating resp. updating the reading layer based on incoming events.

Oh I see now, this is Elastic specific. However, if you have event store, you can index snapshots based on time interval or number of events. Basically, Elastic would be your CQRS read model.

It is not Elastic specific - It is actually a general problem - I just use it for my explenation because we are using it. It would be the same with every kind of handling data coming from events.

You mean recreating the whole index based on an interval resp. count of messages? Depending on the amount of data this would be very time consuming.

And it would need me to keep the data in memory all the time - But ok, this is due to the principle ES is based on.

Not the whole index, but just one aggregate (entity or group of entities). Let’s say you have aggregate person, then on every 10th event on this person you make snapshot and index in Elastic. If you use snapshots in event store, you basically take the last snapshot, apply following 10 events and create new spashot.

When I said Elastic specific, I meant that Elastic does not support ACID transactions. Databases that do support this should not have problems with concurrency.

Ah, I see… The problem is that the elasticsearch entries are aggregations of multiple entities - Visibilities for based on groups connected to entries of people. It would just not be possible to separate them.

What do you think about:

Let’s say if the size of your batch is N and you’ve M instances.

Then you can consume N/M events per instance, you can do this in parallel.

You can use worker pattern, see:

But this just does not solve the actual problem. When I think about it it gets clearer and clearer - Since there is a sequence. The only chance is that all changes are applied in an atomar way - Similar to PATCH vs PUT in HTTP

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.