Building Centrifuge: Part 2 — Praxis
In part one of Building Centrifuge I introduced readers to some of the assumptions and problems inherent when building a system design to handle many thousands of HTTP requests per second. In part two I’ll describe the architecture I used to implement the system and what I learned along the way.
Centrifuge and systems like it, have the goal of doing many HTTP requests without the propagation of delays. Of course I could simply do one request after another but then if I encounter a delay in one request, that delay will be propagated to all subsequent requests. So the intention is to isolate delays by parallelising the execution of requests, avoiding the propagation of delays to subsequent requests.
A word of caution, I will assume that the reader is familiar with Docker, Kubernetes — combined they are ushering in a kind of containerisation revolution of the IT industry. I will also talk about other technologies, but I hope that from the context, it’s clear what these technologies do.
Microservices
Quick note: For the rest of the article, I will use component and service interchangeably.
My attempt at building a massively parallelised request engine became a classic Microservices architecture. Specifically, my take on Microservices is that it is nothing more than the transfer of the Unix philosophy of building a single tool to do one job and one job only, to a complete architecture. Ergo, for me, Unix was really the original Microservices architecture.
Debugging is arguably easier since problems can be localised faster at the price of many more moving parts, any of which may fail. As far as possible, the system should be self-healing. Self-healing represents, for me, the codification of fixes to known failures but since no component is perfect and not all use-cases can be covered, I need monitoring and traceability.
It is important to have a good view into what is happening. A Microservice architecture can’t be seen as a blackbox, I need to know what is going on within the entire system and each component. Bottlenecks need to be quickly identified. Thus relevant telemetrics are essential.
Another benefit of a Microservice architecture is being able to use and test different technologies for components. Since components are encapsulated pieces-of-software with clear boundaries, i.e., what comes in, what goes out and what has to be done, components can be replaced/reimplemented without modification to other components. This is a major benefit when building something where the unknowns aren’t known and most knowns change with architectural insights. Multiple implementations of components can be maintained, Docker/Kubernetes decide which components are deployed and used.
My starting point became Docker and Datadog. Docker compose is great for testing locally and is an ideal stepping stone towards Kubernetes. Datadog provides a solid monitoring and telemetrics platform with support for containerisation.
Supporting both Docker and Kubernetes is relatively straightforward since the underlying Docker images are used for both, just the configurations need to be kept in sync. Managing the configurations can be become tiresome but Makefiles or Rubys’ equivalent Rakefiles can help to encapsulate repetitive tasks.
Brief Clarification: I will use the term dynamic components for services that are spun-up by other components on the fly. These components are in part managed by Docker or Kubernetes, their lifetimes are managed by the system directly.
Implementation
So how to implement all this? I could probably have done all this very easily using Akka or Erlang since both are designed for massively threaded applications and this seems to be a perfect example of such a system. However that would be too simple! On the other hand, since we are building a Microservices system, we can always replace components as needed.
For me it came down to a question of whether the learning curve using a new technology is more valuable than the experience gained from building a new system? As described in the previous article, a lot assumptions are made to focus on a solution. These assumptions are largely independent of the technology used to build the system. So clarifying and testing the assumptions first and then optimising the system seems fair to me. In addition, having developed the first solution, I can iterate and make new assumptions to improve and extend.
Data Streams and Data Stores
Initial technological choice was made for Kafka and Redis, both are old friends of mine. Kafka provides the data stream required for sharing data between components. Redis is the data store glue that will help to buffer data flows and aid in ensuring that data isn’t lost on component failures.
Next is the development of a configuration component. The intention is to have a configuration and analytics website for end users. This won’t be ingesting any requests for the system to handle, it will just store the configuration details.
Brief clarification: I will be talking about two different endpoints, the endpoint to which users send their requests (request endpoints) and the endpoints provided by users to which responses are sent (response endpoint). Response endpoints are optional and defined by the user.
Architecturally I decided that the ingestion of requests and the user configuration of those requests were to be two different components. Since both of these are completely different responsibilities that only share the request configuration.
Configuration of requests includes an expires-at value (i.e., how long should a request be retried until it’s marked as expired), a timeout (how long should we wait for a response from an external request server) and response endpoint for sending final responses to. This configuration is common to a group of requests, so it makes no sense to send all this information with individual requests. Also the interface for submitting requests should be as simple as possible, ideally only the URL of the request should be required.
For requests to flow through the system, they need to be pushed into Kafka for other services to handle them. But instead of pushing requests directly into Kafka, requests are first stored in Redis. This also allows the ingestion service do one thing and one thing only: accept requests and push them into Redis.
The Kafka storage (kstore) component is responsible for taking those requests from Redis, annotating them with request configuration details, adding some IP-Geo lookup, deciding into which Kafka topic queue the request should be passed to and finally, passing batches of requests off to Kafka. Now the storage of requests into Kafka and ingestion of requests are completely decoupled.
Being a dynamic component, the kstore component is managed by the configuration service. A user creates a request endpoint to which they can send their requests and for each request endpoint, a kstore component is spun up.
Request Formatting
Requests are formatted using Space Separated Strings. Delimitation of values are spaces and there is no limit to the number of values. Values are positional, so each component needs to know the position and meaning of values in the string. The request become packets that are routed through the system, each with their own meta-data and payload.
Payload and response information are stored in a combination of CGI, JSON and Base64 encodings. Basically all programming languages have support for these three and this format is easily debuggable by non-machines. This structure is also used to store requests in Redis, so there isn’t a major transformation of data structures between components.
Requestor Component
Next step was building a dynamic requestor component that handled the requests and sent them to external servers. The only job this component has, is to take a request, construct a HTTP request, do the request to the external server and attach the response to the requests representation in the system. Then it determines whether the request was successful, has expired or should be retried.
A requestor component obtains its requests from an incoming Redis queue that is filled from Kafka topic queues. Again I’m buffering the data flow, partly to know when the requestor components need scaling, i.e., when the queue is backing up. Attempted requests are stored in Redis, either in a complete queue or a retry queue. A completed queue contains all the requests that have either expired or were successful.
Retry requests are stored in Redis until their back-off time frame has expired. Retries are done with exponential back-off until their expire or succeed. Once requests are ready to be retried, they pushed back to the request Kafka cluster. From there, the whole flow is rinsed and repeated.
Completed requests are pushed into a second Kafka cluster. This response Kafka cluster is consumed by responders that transmit the responses back to the response endpoints. Responder components are also dynamic and are spun up when the user activates request endpoints. One nice feature of this, is that a user can define multiple response endpoints to which responses should be sent to. A type of demultiplexing of responses from requests.
Scheduler
A scheduler component is responsible for scaling dynamic components according to traffic requirements. I won’t go into too much detail but the basic principle of the scaling is that if queue is filling up, then scale up the corresponding component. For example, the incoming queue is getting long, scale up the requestor components or incoming queue is empty and there are too many requestors, shut them down.
Scheduler is a good example of a component that, thanks to good monitoring, was rewritten and redesigned. Basically I realised that it was taking far too many resources (originally in Ruby), so I decided to rewrite it in Python. Then I was looking into the Datadog agent docker image and realised that Datadog use S6 to spin up and supervise multiple processes within a single Docker container (best practice assumes only one process runs in a Docker container). So I had the idea of using S6 to spin up separate Unix processes for managing each request Kafka topic.
This has the advantage that parallelisation is handled by Unix and not Python. Supervision of the processes is done by S6, so no need to do that programmatically. So the scheduler became a small script that checked the topics in the request Kafka. For each new topic, it spun up a new S6 service for that topic. Each topic service is responsible for scaling the infrastructure for the corresponding topic.
That is basically the core of my implementation. It’s definitely not complete nor finalised but I’ve already learned quite a lot along the way. I’ll now describe some of my learnings.
Learnings
The key to successful programming is to never stop learning. It’s important to start something new and get into the dirty details of the problem.
Personally, monitoring saw the biggest learning curve. How important monitoring is has been clear to me for while but how important it is right at the beginning of a project wasn’t clear. Even if you’re simply prototyping, think about doing monitoring. In my case, it allowed me to immediately take measurements of components, gather insights into the various queues and where data flow was backing up. This made doing end-to-end tests far more insightful.
Using the Datadog agent Docker image meant there was a statsd server running on each node of the cluster. Each service uses that statsd server to send their telemetrics to Datadog directly from their node.
In addition, I wanted to gather telemetrics on the Redis instances and wanted an external service to do that. So I extended the original Datadog Docker image with my own Python script that queried the Redis instances. To avoid collecting data twice, this script first queries Docker on the Kubernetes node, obtains a list of Redis instances running on the host node and only queries those Redis instances.
Having used Kubernetes on a previous project, I wanted to try out Docker Swarm. In the end, though, I gave up. Swarm just doesn’t have the same holistic feel as Kubernetes. With Docker Swarm, I was constantly switching between docker stack
and docker swarm
. It wasn't intuitive in the same way that Kubernetes is. However Kubernetes isn't perfect either and also has its shortcomings.
Isolation of services provides walled-gardens of services. In Kubernetes this is done using namespaces and in Docker, it’s networks. This was a two-way street: not only are services isolated from one another but also they are grouped together. This is also especially useful for Kubernetes since components that spin-up dynamic services can only do this within specific namespaces. This meant that components aren’t able to interfere with services in other namespaces.
Maintain statelessness and instead retrieve state. This was important when dealing with the scaling, the state of the cluster is always queried from Docker and Kubernetes. Both are databases to query the state of the cluster. Which means, for example, the scheduler can be shut down, replaced and spun up in a live system without affecting the cluster.
There are many more tacit learnings which will remain unspoken, which in itself is a learning: make tacit knowledge known. And this is my small attempt at that!
So the system built here works, but of course, it’s not perfect. It won’t magically handle all the traffic loads imaginable however it will scale, partly automagically and partly humagically. I will continue to work on it and make it as automagical as possible.