Howto: multi-threaded TCP server in Common Lisp
I'm writing this howto because I spent some considerable time on implementing a TCP server in CL; my lack of Lisp experience contributed to this, but also no quick guides or howtos seem to exist in this area. And an undocumented argument to a function in usocket has a default value which defeats the documented behavior, making me spend lots of time to figure out what's broken (:ready-only in usocket:wait-for-input). Poor documentation and lack of resources is a pretty bad situation for Common Lisp. :-(
The task is to write a simple TCP server class using a worker thread pool for handling the requests. There is one listener thread that clients connect to. The listener would read data from the socket, then when a full command is available for a certain client, it will send it to a worker thread to be handled. (The idea was that there could be multiple listeners as well, but at this time it's buggy.)
This is not targeted to Lisp experts, but not to absolute beginners either.
We're going to use two libraries that expose an uniform API for threads and sockets across CL implementations: usocket and bordeaux-threads. Also, I'm using some SBCL-specific stuff (sb-queue and semaphores).
1. Listener thread
We setup a server socket like this:
(setf master-socket (usocket:socket-listen host port :reuse-address t :element-type 'unsigned-byte))
After this call, we can use wait-for-input on that socket to wait for an incoming connection:
This call will return when there's data available on the socket. Being a server socket, it means "incoming connection". Next, to create a client socket we need to use socket-accept:
(setf new-client (usocket:socket-accept master-socket))
Following this, the new-client socket can be used to send data to, or receive data from, the client. The sockets are actually objects that include a stream, which you can get using socket-stream. To send "Hello" to the client, you can do:
(format (usocket:socket-stream new-client) "Hello~%")
(although this won't work for the example above because we opened a binary, not character, stream—via :element-type 'unsigned-byte above; socket-accept can take an :element-type argument as well, but if not present the new client socket will use the type of the server socket).
[ note: we can call socket-accept without a prior call to wait-for-input; socket-accept will simply block the thread until an incoming connection is available. If we want a single thread to listen to multiple sockets, though, we really need wait-for-input ]
1.1 Waiting for data on multiple sockets
So far so good, but how do we receive data from the new client without blocking the server from accepting new connections? Using a thread dedicated to each client is one idea, but it won't scale to more than a few hundred simultaneous clients. For this reason, wait-for-input can take a list of sockets as well, and will return when any of those sockets have activity:
(setf sockets (list master-socket)) ;; see below for the :ready-only description (loop (loop :for s :in (wait-for-input sockets :ready-only t) :do (if (eq s master-socket) ;; THEN: new connection (let ((new (usocket:socket-accept s))) ;; add the new socket to the list of sockets that we're listening to (setf sockets (nconc sockets `(,new))) (handle-client-connect new)) ;; ELSE: data from a connected client (handle-client-input s))))
This way a single listener thread can handle data from multiple clients, and accept new clients as well. The :ready-only t argument to wait-for-input is required because otherwise, wait-for-input will return the original list (sockets, in this case) instead of "consing" a new one. The caller should then walk the list and interrogate the value of the usocket::state slot on each socket in order to find out if there is data available or not. Sadly, all this is not mentioned in the usocket API documentation; of course, the program would behave badly when trying to read data from a socket which isn't ready. I had to dig through the sources to find the ready-only argument, and noticed that it works fine if I pass it. Someone cleared my confusion on the mailing list.
1.2 Reading data from clients
One other thing we should be careful about is that the data doesn't come all at once. Suppose we have a line-based protocol and that we should execute a command once the client sent a newline, we could be tempted to say (read-line (socket-stream s)), but that will block until the client sent the newline. Aside from slow clients, someone mean could block our server by just connecting to it and sending a single character which isn't a newline.
So the handle-client-input function above should read from the stream whatever bytes are available and store them in a buffer; on the next invocation for the same socket, it should append to that buffer, and so on, until it finished reading a full command. At that point the buffer should be cleared and the command executed (by sending it to a worker thread, so we don't block the listener).
Here is a helper function for reading data from a socket. It receives the socket and a buffer (an adjustable unsigned-byte array):
(defun collect-input (socket buffer &optional (end-char 0)) (loop :with stream = (socket-stream socket) :with byte :while (listen stream) :doing (setq byte (read-byte stream)) (when (= byte end-char) (return t)) (vector-push-extend byte buffer)))
It also takes an optional end-char argument which specifies the byte that ends the command. The end-char byte is discarded. In my stuff, commands are ended with a null byte (default value for end-char). This function returns T when it finished reading a whole command. If it returns T, more data might be available on the stream.
To use this function you need to associate a buffer array with each socket. I defined a "client" class which holds the socket and the buffer, and have in the server a "connections" hash table that maps sockets to client objects. In the client class I have this code:
(defun reset-buffer (client) (setf (fill-pointer (client-buffer client)) 0)) (defgeneric client-read (client socket) (:method ((client client) socket) (with-slots (buffer) client (when (collect-input socket buffer) (prog1 (utf-8-bytes-to-string buffer) (reset-buffer client))))))
When a full command has been read, the client-read method creates an UTF8 string with it (utf-8-bytes-to-string is from trivial-utf-8). So we can finally get to that handle-client-input function that I mentioned in 1.1:
(defgeneric handle-client-input (server socket) (:method ((server server) socket) (with-slots (connections) server (let ((client (gethash socket connections))) (awhen (client-read client socket) (send-to-workers server (curry #'client-on-command client it)))))))
A bit more to describe here:
I also have a server class, which holds the connections hash (map socket -> client object).
awhen is a handy macro from anaphora. It behaves like the standard when, that is it takes a condition and a block of code that is executed if that condition is not false; the nice thing about it is that within that block of code, the variable it is bound to the value of condition. Here's a simple definition, so you don't need to load anaphora:
(defmacro awhen (cond &body body) `(let ((it ,cond)) (when it ,@body)))
curry is a helper function for "partial application"—it takes a function and any number of arguments, and returns a new function that would call the given function passing it those arguments:
(defun curry (fun &rest args1) (lambda (&rest args2) (apply fun (append args1 args2))))
send-to-workers receives a function and schedules it to be executed in some worker thread.
As I mentioned, when collect-input returns T, more data could be available on the stream; however in this case wait-for-input would return immediately, so there's no point to check that in handle-client-input.
client-on-command is a generic function that will get called when there is a command available. It will be called from a worker thread, so it needs to take care about concurrency when accessing shared variables.
2. Worker thread pool
Worker threads should execute commands from the clients (as enqueued by send-to-workers). This wasn't exactly trivial to implement right. My first take was to use condition variables as documented in bordeaux-threads:
A condition variable provides a mechanism for threads to put themselves to sleep while waiting for the state of something to change, then to be subsequently woken by another thread which has changed the state.
A condition variable must be used in conjunction with a lock to protect access to the state of the object of interest. The procedure is as follows:
Suppose two threads A and B, and some kind of notional event channel C. A is consuming events in C, and B is producing them. CV is a condition-variable
- A acquires the lock that safeguards access to C
- A threads and removes all events that are available in C
- When C is empty, A calls CONDITION-WAIT, which atomically releases the lock and puts A to sleep on CV
- Wait to be notified; CONDITION-WAIT will acquire the lock again before returning
- Loop back to step 2, for as long as threading should continue
When B generates an event E, it
- acquires the lock guarding C
- adds E to the channel
- calls CONDITION-NOTIFY on CV to wake any sleeping thread
- releases the lock
The above sounds well if there is one producer thread (B) and one consumer thread (A), but what if we have multiple consumers? Because A has to acquire a lock while consuming the events, it means that any other threads that would have the same purpose would simply be blocked at that moment. All commands would actually be executed by a single thread, sequentially. Perhaps condition variables are simply intended for two threads to collaborate.
In any case, I solved this problem with semaphores and a thread-safe queue. I found both implemented in SBCL, but I'm not sure about how to get them in other CL implementations, so what follows is SBCL-specific.
2.1 Worker thread function
An worker thread executes the following code:
(defgeneric worker-thread (server) (:method ((server server)) (handler-case (with-slots (cmdqueue-sem cmdqueue) server (loop (loop :for event = (sb-queue:dequeue cmdqueue) :while event :do (funcall event)) (sb-thread:wait-on-semaphore cmdqueue-sem))) (shutting-down () ;; anything to do here? ) (error (condition) (bt:with-lock-held ((server-workers-mutex server)) (delete (bt:current-thread) (server-workers server))) (format t "~A" condition) ;; XXX: should start another worker here, or we'll run out ))))
So it simply takes events (actually, functions) from a common queue and executes them. Because sb-queue is thread-safe, we don't need to explicitly hold any locks. If there are multiple events and multiple worker threads were awaken, then they can consume events simultaneously—while some thread is handling an event, other can dig the queue.
When the queue is empty, it calls wait-on-semaphore which will put the thread to sleep until more events are available.
2.2 Enqueue jobs for worker threads
Now we can get to the send-to-workers function mentioned above (which is called from the listener thread):
(defgeneric send-to-workers (server event) (:method ((server server) event) (with-slots (cmdqueue-sem cmdqueue) server (sb-queue:enqueue event cmdqueue) (sb-thread:signal-semaphore cmdqueue-sem))))
If no worker thread is immediately available, the queue will end up containing multiple events. However, there will be no “lost wakeup” problem because once some thread will finish executing (funcall event), it will continue the loop and dequeue more events. On the other hand, semaphores are not “binary” (on or off), but they hold an integer value. signal-semaphore increments the value, while wait-for-semaphore decrements it (unless it's zero, in which case it goes to sleep).
2.3 Server shutdown
The worker thread function handles a shutting-down condition (actually ignores it, at the moment). To shut down the worker threads, one can simply say:
(loop :repeat *number-of-workers* :doing (send-to-workers server (curry #'signal 'shutting-down))) ;; need to wait for them too (loop :for th :in *worker-threads* :doing (bt:join-thread th))
When such event reaches an worker thread, it will stop, so if we enqueue sufficient such events then all worker threads will eventually stop. All events that are already enqueued will finish execution.
To shut down the listener thread, I preferred a less than ideal solution (because I'm not exactly sure how to stop wait-for-input, other than passing it a :timeout but I don't like that either):
(bt:interrupt-thread *listener-thread* #'(lambda () (signal 'shutting-down))) (bt:join-thread *listener-thread*)
This effectively stops the thread from whatever it was doing (probably in wait-for-input, but it could do other things too) and signals the shutting-down condition. It's unsafe to interrupt a thread from whatever it was doing :-) but for the server shutdown it will do.
3. Other stuff
Because we have multiple worker threads and commands are executed simultaneously, it is possible that a client will receive the responses in a different order. You need to design your protocol to account for this. When you send a request, you can't be sure that the next answer from the server is for that particular request—so include a request ID, which the server will send back.
Each client object should lock a mutex before writing data to the socket. That's because data will be sent from worker threads, which execute concurrently; without the mutex, the other side will get garbage.
There is no code above to handle a sudden client disconnect, but it's easy to write—in code from 1.1 we should include an EOF check for the client socket before calling handle-client-input; if no data is available, that client should be destroyed. We can do that with listen. Here is my complete code for the listener thread (called provider-thread because it actually provides commands to worker threads):
(defgeneric provider-thread (server master-socket) (:method ((server server) master-socket) (let ((sockets (list master-socket)) (connlock (server-connections-mutex server))) (handler-case (loop (loop :for s :in (wait-for-input sockets :ready-only t) :doing (handler-case (if (eq s master-socket) ;; THEN: we have new connection (progn (bt:with-lock-held (connlock) (unless (null (slot-value s 'usocket::state)) (let ((new (socket-accept s))) (setf sockets (push new sockets)) (handle-client-connect server new))))) ;; ELSE: client socket (if (listen (socket-stream s)) ;; THEN: input available (handle-client-input server s) ;; ELSE: EOF, lost connection (progn (bt:with-lock-held (connlock) (handle-client-disconnect server s)) (setf sockets (delete s sockets)) (socket-close s)))) (end-of-file () ;; not sure we ever get here )))) (shutting-down () ;; anything to do here? )))))
I'd put the whole code here as well, but it's a bit hairy and should be cleaned up. In any case, I hope that at least these notes will help someone.