When someone told me about beanstalkd, the seed of an idea was planted in my mind. I had some free time on my hands and decided that I wanted to implement it in Clojure to learn a thing or two. The simplicity of the beanstalkd protocol appealed to me, and it seemed like something I could implement. In addition to this, a preview build of Project Loom was made available, and I wanted an opportunity to try out virtual threads. Over two months, this germ of an idea sprouted into the 2400 line sapling it is today. I referred only to the beanstalkd protocol, and didn’t read any of the beanstalkd code. Here’s how it works:
Clients, Jobs and Commands
A job is a description of some task to be done. In beanstalkd, jobs are binary blobs. For simplicity, jobs are strings in small-stalk.
Here's how jobs are processed in a system using small-stalk:
There can be two types of clients of small-stalk: producers and workers. Producers submit jobs to be executed, and workers consume and execute these jobs. Note that small-stalk itself makes no distinction between clients, and any client can run any command.
small-stalk implements a subset of beanstalkd’s commands:
- put <priority> <time to run in seconds>: Puts a job into small-stalk’s queue. The job string should be sent on the next line.
- peek-ready: Returns the next available job without removing it from the queue.
- reserve: Pops the next ready job and assigns it to the client to be run. If no job is available, it blocks until a job becomes available.
- reserve-with-timeout <timeout in seconds>: Like reserve, but blocking times out after the given number of seconds.
- delete: Removes a job from small-stalk. Typically used on a reserved job when a worker has finished running it. Reserved jobs can only be deleted by the client that reserved them.
- release: Un-reserves a job and puts it back into small-stalk’s queue. Typically used when a worker fails to complete a job and wants it to be retried.
small-stalk’s acceptor thread listens on a port for incoming connections. When a connection is made, a connection thread is spawned to process commands sent by that client. The cheapness of virtual threads makes it possible to spawn a thread per connection, even if there are thousands of clients. Each connection thread reads commands, parses them and hands them off to the Queue Service for execution. The Queue Service uses the Persistence Service to persist the queue state to disk. The compactor thread regularly tells the Persistence Service to clean up data files on disk to save space.
Virtual threads are the JVM’s answer to goroutines and Erlang processes. They can be created much more cheaply than conventional OS threads, making it possible to create a thread per connection even if there are thousands and thousands of clients. Virtual threads are not available in a stable JDK yet, and I used a preview build of the JDK in order to try them out in small-stalk.
Connection Registry and Connection Threads
When a client connects to small-stalk, it is assigned an ID and its socket is stored in a registry. Identifying clients is necessary for some commands like reserve and delete, since if a job is reserved, only the client that reserved a job is allowed to delete it.
A virtual thread is spawned for every incoming connection. These connection threads take care of reading and writing data from and to the connection socket. The connection threads parse commands from text and hand them over to the queue service for execution. small-stalk does not support pipelining, which means that clients are expected to read the response of the previous command before sending a new command. Connection threads block until the Queue Service is done executing a command, because the Queue Service exposes a synchronous API.
The Queue Service is a combination of:
- Data structures that maintain the state of jobs,
- A mutation queue into which incoming mutations are queued, and
- A mutation thread which reads mutations off of the queue and executes them.
I knew that I wanted to use some kind of append-only log for persistence, since I thought this would be easier to implement than complex data structures on disk. This led me to the idea of serializing all data changes in a queue.
A mutation is a data representation of a change that must be made to the queue data structure. All mutating commands (i.e. commands that are not simple reads like peek) map to a mutation. There are also other mutations (such as ::time-to-run-expired) that are triggered by timers rather than by a connection thread.
All mutating commands made by connection threads are converted into mutations and enqueued into the mutation queue, to be later executed by a single mutation thread. This means that all state changes are serialised. While this may seem like an unnecessary performance bottleneck, it affords several benefits:
- The state manipulation code is greatly simplified, because synchronisation isn’t a concern.
- Having a log of mutations means that we can serialise mutations, send them elsewhere and then replay them to reproduce the server’s exact state. This is how AOF persistence is implemented, and we could also ship them over the network to a replica server to maintain as a hot standby.
Timers and Blocking
Since the Queue Service needs to expose a synchronous API, mutations are enqueued along with promises to which the mutation thread can deliver return values. reserve, in particular, needs to block until a ready job is available. To facilitate this, if a reserve mutation is processed and no job is available, the reserve mutation along with its return promise is enqueued into a separate queue for waiting reserves. When a new job is added, the mutation thread checks the waiting reserves queue and fulfils a waiting reserve, if any.
The reserve-with-timeout command offers a timeout on the reserve blocking. To implement this, a separate timer thread to cancel the reserve is launched. This thread does not mutate the state directly, but instead enqueues a special mutation indicating that the client’s timeout has expired. All state modification, including cleaning up and cancelling the reserve, is done by the mutation thread.
The time-to-reserve timeout works in a similar way. When enqueueing a job, the client can specify how long a worker should be allowed to keep a job reserved. After this timeout expires, small-stalk will automatically release the job. Again, a separate timer thread is launched when a client reserves a job. This thread enqueues a time-to-reserve-expired mutation, which is then handled by the mutation thread.
The Persistence Service is a collection of functions and data used to persist mutations processed by the Queue Service. Mutations are persisted before they are processed. The file format is a simple newline-separated list of EDN maps. Each map is a mutation. Mutations are appended to the file, one after another. This file is called an AOF (append-only file).
To prevent disk usage from getting out of hand, every so often, mutations in the AOF file are processed and replaced with a snapshot of the Queue Service’s state. This is done by the compactor thread. To facilitate this process (called checkpointing), mutations are actually split over multiple AOF files. One file can hold at most a configurable number of mutations. Once this limit is reached, a new file is created and new mutations are appended there. The compactor thread reads mutations from all but the newest file (to which mutations are currently being written), replays the mutations to build a state snapshot, then persists that snapshot to disk. Once this is done, the older AOF files are deleted.
To facilitate testing, the Persistence Service is stubbed out with a service that simply writes mutations to a string instead of the disk, and the compactor thread is not started.
Persistent Priority Queue
In order to work with Clojure’s reference types, the queue data structure is immutable, implemented using a combination of Clojure’s immutable data structures.
Clojure offers an obscure but useful PersistentQueue data structure. However, this doesn’t support priorities. small-stalk’s Persistent Priority Queue is implemented as a map of priority numbers to PersistentQueues. Popping an element from the queue involves finding the smallest key, and popping an item from the corresponding PersistentQueue. Unless the number of different priorities grows to be very large (which is unlikely for most use cases), this is a fairly simple and efficient implementation. The map could be replaced by a sorted-map to make this even more efficient.
Separating the networking layer from the queueing system allowed me to easily unit test the Queue Service without having to send CRLF-terminated strings over a socket. Using a dependency injection framework like integrant also allowed me to construct and stub out stateful components as necessary in tests. Although I didn’t take the time to write end-to-end integration tests, those would be very helpful as well.
Possible Fixes and Improvements
small-stalk is not without issues, which I haven’t had time to address yet.
- What happens if a client reserves a job and then disconnects? If the job has no time-to-reserve, it will live in memory forever. When a client disconnects, its reserved jobs should be automatically released.
- What happens if the server crashes during compaction, after the new snapshot has been written but before the old AOF files have been deleted? small-stalk would replay the old AOF files on top of the new snapshot. Each mutation should have a monotonically increasing ID and the snapshot should have the last ID of the mutations used to build it, so that older mutations can be skipped if necessary.
- What happens if the server crashes during compaction while the snapshot file is being overwritten? The snapshot file would probably be corrupted and there would be no way to recover. small-stalk should write a new snapshot file and delete the old one later, to allow for a recovery path in case of corruption.