AWS SQS polling from sharded Akka Cluster running on Kubernetes
As that Proof-of-concept(PoC) proved promising, we started building a high-throughput and low-latency system based on the gained experiences and learnings.
The system under consideration polls (fetches) messages from AWS SQS and does the following:
- Processes polled SQS messages (such as JSON modifications)
- Stores polled SQS messages in a datastore
- Stores the latest state derived from polled SQS messages in-memory
- Publishes the processed SQS message to destination AWS SQS (for other systems to work with them)
- Finally acknowledging back the polled SQS messages to source AWS SQS.
This sounds pretty simple to implement at first, but turns into a challenging task when it happens at scale (up to 45,000 SQS-messages-processed/second).
Characteristics of the SQS message(s):
SQS message’s size varies from 5KBs to 100KBs
SQS message is uniquely identified by an identifier, let’s call it
event_id. And there are more than 250,000 unique event_id(s) in the system
SQS messages are versioned and some lower versioned SQS messages will be acknowledged back to source AWS SQS(as these messages does not affect the state of system) without any processing(JSON Modification), storing into datastore and publishing to destination AWS SQS
SQS messages are evenly distributed by
event_id, i.e in theory, all the SQS messages in one batch have a unique
Polling AWS SQS is easy. Controlled and dynamic polling based on the workload of a highly distributed system is challenging where failure is inevitable.
In the beginning, the implementation was simple and straightforward. One Actor (let’s say SQS Batch Poller) was responsible for polling and sending those polled SQS messages to desired entity actors to be processed, stored, published to destination SQS and eventually be acknowledged back to source SQS.
Moreover, the performance (time taken to process, CPU, memory etc) of the system depended on the size of SQS messages. A 5KB SQS message was quicker to process and required less resources compared to a 100KBs SQS message. This variation in size of the SQS messages made the workload of the system very dynamic and unpredictable.
This implementation worked fine with few thousand messages in SQS, but failed catastrophically when this number grew up to millions.
The failure happened because the SQS Batch Poller Actor kept polling SQS messages from AWS SQS without any knowledge of the state (processed or unprocessed) of already polled SQS messages. This filled the cluster with more than 120,000 unprocessed SQS messages and reduced the throughput to 10–12 SQS-messages-processed/sec. This resulted in unreachable Akka cluster nodes (Kubernetes Pods), killing them with OOM and eventually bringing down the whole system (Akka cluster).
Why did the Akka Cluster stop polling after ~120,000 SQS messages? Because that’s the limit imposed by AWS SQS. SQS can only have ~120,000 un-acknowledged or in-flight messages.
A better approach to poll SQS, without hitting the Akka cluster’s limits and killing it, was needed. The SQS Batch Poller Actor needed to be aware of the workload of the system and adjust the rate of polling AWS SQS accordingly.
The solution was to inform SQS Batch Poller Actor about the state of unprocessed SQS messages(Workload) in the system. i.e implementing Back-Pressure.
The key point in the Back-Pressure strategy was to limit the number of unprocessed messages the cluster can have at any given point in time. This strategy ensured that SQS is only polled if there is a demand for more SQS messages in the system and allowed the system to behave in a predictable manner irrespective of the size of SQS message.
The diagram below depicts the high-level architecture of the Back-Pressure Strategy.
The architecture consists of two main Actors, namely SQSBatchPollerManager and SQSBatchPoller, responsible for managing Back-Pressure and Polling SQS.
Before starting to define and implement Back-Pressure strategy, a few important details/assumptions need to be laid down.
maxUnprocessedMessages: A configurable limit on maximum number of SQS messages that can be present in the system at any given point in time. This limit can be adapted according to the throughput requirements and system limits. Increasing this limit comes at the cost of higher resources such as Memory, CPU, Network, etc.
parallelism: Parallelism factor to limit the number of SQS batches polled in parallel. This is a prevention against creating a peak in resource usages such as overwhelming database or a third party service with burst of thousands of request at once to load initial state of Entity actor.
batchSize: Each SQS batch can have a maximum of 10 SQS messages.
Involved Actors in Back-Pressure strategy
SQS Batch Poller Manager Actor (SQSBatchPollerManager):SQSBatchPollerManager actor is responsible for keeping track of unprocessed SQS messages in the system and to calculate the number of messages to be polled from SQS.
SQS Batch Poller Actor (SQSBatchPoller):SqsBatchPoller actor actually polls SQS message batch from AWS SQS and keeps track of the lifecycle of the polled SQS messages. It also informs back to the SqsBatchPollerManager upon complete processing of the SQS messages batch.
Entity Actor (EntityActor):EntityActor is responsible for processing(such as JSON Modification), storing into datastore, publishing to destination SQS, acknowledging back the polled SQS message to the source SQS and, finally informing back to SQSBatchPoller about successful or failed processing of this polled SQS message.
How these Actor(s) collectively implement Back-Pressure strategy? After successful cluster formation, the cluster is ready to poll and process SQS messages. Let’s see the whole process of Back-Pressured SQS polling step by step for a better understanding.
- SQSBatchPollerManager receives a message PollSqs to start SQS polling.
- Upon receiving PollSqs message, SQSBatchPollerManager calculates the number of SQS batches that can be polled in parallel (parallelism) while not exceeding the maximum number of unprocessed SQS messages (maxUnprocessedMessages) the cluster can sustain. After calculating the number of SQS messages to poll, SQSBatchPollerManager creates child actor(s), SQSBatchPoller, and sends a message PollSqsBatch to it.
- Upon receiving PollSqsBatch message from SQSBatchPollerManager, SQSBatchPoller polls AWS SQS and sends these polled SQS messages to Cluster Shard Region Actor which in turn forwards these SQS messages to respective EntityActor.
- Upon receiving SQS messages, EntityActor processes(such as JSON Modification), stores the state into datastore, publishes to destination SQS, acknowledges the polled SQS message to the source SQS and, finally sends a message SQSMessageProcessed back to SQSBatchPoller.
- SQSBatchPoller waits for all the EntityActor(s) to send back an acknowledgement message SQSMessageProcessed. After receiving all the acknowledgement back from concerned EntityActor(s), it sends a message BatchProcessed back to SQSBatchPollerManager and kills itself.
- SQSBatchPollerManager upon receiving BatchProcessed sends itself a message PollSqs and the whole process repeats from step 2 again.
With this strategy, AWS SQS polling is controlled by the speed of processing SQS messages by the system (Akka Cluster).
What’s described above is a simplified version of the actual Back-Pressure strategy used in production system, But the underlying principle of Back-Pressure is exactly the same. Some obvious caveats such as handling SQS failures, Node(s) crashes, Actor crashes, optimization in polling AWS SQS, etc are excluded here and is out of the scope of this post.
I will try to write more about the handling of the failure cases listed above and optimizations in following posts.