When we set out to provide a seamless platform for our customers that consumes their customer events and fans them out to any number of destinations they desire, we built our product around “bring your own message queue.” Our customers could sign up for either Google Pub/Sub or a cloud hosted Kafka cluster and we would wire the MetaRouter Platform to utilize whichever message queue they would provide.

    We found, however, that using a third party queuing system not only feels cumbersome, but it also creates a security concern that customer data is leaving the MetaRouter platform. There is also the additional cost of using these platforms and having our engineers maintain a codebase that would support such message queues. 

    So, the team researched multiple event streaming technologies including Kafka, NATS, RabbitMQ, and Pulsar. We decided on Kafka since we were already supporting it from a consumer/producer perspective, it is well established, and it provides a large number of features such as exactly once delivery, message ordering, and horizontal scaling. 

    But to really prove out our decision, we put it to the test.

    Trial run with 400 million events

    In order to meet our expectations on throughput, we needed to be able to process 1 billion events per day or around 12,000 events per second (eps) with the size of each event around 2 KB. Our initial test would involve processing 400 million events. 

    To make our deployments easier across Enterprise Customers we utilize Helm with Kubernetes. This led us to use Bitnami’s Kafka Helm Charts since they are simple and easy to work with. 

    Broker Setup

    Based on some preliminary testing these are the Kafka and Kubernetes settings we deployed for our POC. 

    kafka: resources: requests: cpu: 4 memory: 16Gi persistence: size: 500Gi replicaCount: 3 numPartitions: 100 defaultReplicationFactor: 2
    • Kafka is very memory intensive and, keeping in mind the throughput we wanted to achieve, we initially used a Kubernetes resource request of 4 vCPU and 16 GiB of memory per Kafka pod. We did not set a limit so that the three brokers could grow their resources as necessary. 
    • In order to reduce the amount of data individual brokers store, we are only replicating data twice across brokers. Upon customer request we could increase this for further protection against data loss. 
      • We also utilize both the retention.bytes and settings to remove already-processed data from the brokers. Setting these values low reduces the required storage significantly.
    • The majority of our microservices have the ability to autoscale up to 100 pods. For this reason we chose 100 partitions per topic so that we could support horizontally scaling our services across a consumer group. 
    • We have set the node priority in kubernetes for the brokers to the highest level in our cluster to ensure they will launch successfully. We eventually plan to roll the Kafka pods off onto their own node cluster. 

    Initial Testing

    All of our services connecting to the brokers are written in Go. The source events came from S3 as gzip files that were consumed by 50 pods to load them into Kafka as fast as the brokers could keep up. During this phase, we were only running around 10 million events. Each event is around 2KB in size.

    When we ingest events into our system, we put them on a single topic. This topic is then read and fanned out onto destination topics. The only destination we used during this test was batching events in compressed files back to S3. 

    On the producer side, we set the batch size to 10,000 events with a batch timeout of 5 seconds. We also chose to compress our batches using Snappy. Our consumers were configured to have a fetch a minimum of 5 MB, with a maximum wait time of 10 seconds. 

    Unfortunately, the first run was a little disappointing. 

    Messages seemed to be getting produced quickly; however, the consumers were not processing fast as we’d like, although the rate did exceed our stated goal of 12,000 events per second. This was only around 25,000 events per second or 90 million per hour

    After digging into the consumers, we discovered that we were committing offsets synchronously. 

    Once we switched our consumers to asynchronous commits, we ran the data through again and achieved a throughput of about 65,000 events per second or 234 million per hour, which would give us roughly 5.6 billion events per day. 

    Time to give it a try. 

    Full test run: 200 million events per hour

    The total number of events in our dataset that we used to test throughput within our system is around 400 million. With this many events, we could build confidence that we can easily process over a billion events per day. 

    For this test, we added Amplitude and Braze as HTTP destinations and kept S3. This roughly doubled the amount of messages that were being written to the brokers.

    Ingestion Success

    Ingesting the full 400 million events took roughly 2 hours or 200 million events in an hour. Our S3 forwarders kept up with the streaming load and finished roughly 10 minutes after ingesting all events. Amplitude and Braze are sent single events, instead of a batch, and we were backed up by their rate limiting. Those two destinations slowly made their way through their queues but eventually caught up. 

    Bytes written to topics per second, averaging between 2.5 GB on follower topics and 3 GB on main topic. Close to 10 GB / second of data written to all topics. 

    The message queues seemed to be handling the volume with ease.

    Ongoing Monitoring

    With this much data in the system, making sure the cluster and Kafka brokers are healthy is our main concern. We utilize prometheus to collect broker metrics. These metrics are then used to build dashboards and alerts. 

    These are a few of the metrics we watch closely:

    Consumer group lag - This helps us visualize how far behind a destination is compared to others. If lag on a consumer group continues to climb we will be alerted since this could indicate an issue on the consumers themselves. 

    Broker disk space used - This metric is important for us to keep track of since brokers will stop working when the disk storage is full. We will be alerted any time a broker’s disk utilization is over 80%. 

    Partition size by segment and topic - Understanding how the broker’s storage is being used by each topic and their segment size allows us to change how long and how large a specific topic’s size should be. 

    Percent requested memory and CPU - In Kubernetes, we do not set a resource limit on the brokers for CPU and memory; however, we do want to make sure our nodes are large enough that the brokers will support them. By keeping track of their usage, we can easily give them larger nodes. 

    Conclusion: Kafka can handle the volume 

    With a 5 broker Kafka cluster each with 4 vCPU and 12 GB memory we are able to sustain 200 million events per hour or 55,000 events per second. In our production environments we have increased our brokers' memory to 32 GB of memory for better memory caching of events and less disk access. In the future, we also plan to move our brokers to their own dedicated nodes so we can scale them outside of general service scaling. 

    After moving from Google PubSub to our own Kafka brokers, we have realized that instead of using multiple topics for queues, we could remove the middle layer and give each destination service its own consumer group. This way we reduce the amount of producing and storage on the brokers and only need to use their consumer groups to keep track of where they are in the queue. 

    To ensure the utmost security for our high-volume customers, this is a solution in which we can feel confident.

    Photo by Aperture Vintage on Unsplash

    Jonathan Ohms

    Written by Jonathan Ohms

    Senior Software Engineer at MetaRouter