cancel
Showing results for 
Search instead for 
Did you mean: 
Read only

Aggregator completion timeout

ljsap1
Explorer
0 Likes
1,219

I have a requirement where CPI receives data in high volumes, and they need to be collected and sent as bulk messages to the target server. So, the aggregator would have been a good option, however, the messages are flowing all the time, and the aggregator timeout only takes effect when there is a gap between 2 messages - and that would not happen here. It would have been great to have this timeout take effect regardless of the time gap between messages.

There is no indication in the payload as such that identifies a predicate completion neither. The reason to send in bulk is to avoid the target server from getting overloaded, for example, group 1000 messages together and send it as one message rather than 1000 individual messages.

Any ideas what else I could do?

Thank you.

Accepted Solutions (0)

Answers (2)

Answers (2)

ljsap1
Explorer
0 Likes

Thanks for the reply. I suppose I could also use the datastore and build a custom solution. I was hoping for something more standard but ok.

The messages are already coming from a queue (event mesh) by the way.

andrewfloriano
Product and Topic Expert
Product and Topic Expert
0 Likes

Hello,

You could use a custom approach here by building a message queue in your middleware. Instead of directly processing the messages for transmission, queue them until you reach the desired volume. Once that volume is reached, send them out as a bulk payload.

Here's a high-level plan:

  1. Create a Java class in CPI that holds a static list (your queue), a timer, and your desired threshold (1000 messages).
  2. When a new message is received in CPI, instead of processing it, append it to your static list.
  3. Start your timer, if it's not already running.
  4. When the list count reaches your threshold or the timer reaches its limit (whichever comes first), assemble all the messages in the list into a single bulk message and transmit it. Then clear the list and restart the timer.
  5. If no new messages are received during a timer cycle, the existing queue is sent as is and the timer is reset.

The code based solution needs adjustments as per the end systems setup and can have various flavors based on needs.

The cons are that you will need to manage the synchronized list properly, taking into account potential issues with thread safety and potential loss of messages if CPI node fails.

Consider partnering with an experienced developer if you haven't done this sort of development before.

Kind Regards,