1 minute read

Server-sent events(SSE) is a powerful Web technology that allows to push data from the server directly to the browser.

Since Ring 1.6.0 thanks to the support for async handlers is possible to use SSE from your Clojure Ring/Compojure app.

A walking skeleton could be as simple as a background thread producing data pushed into a core.async channel that is pushed to the browser via SSE.

To enable the async handler we have to pass the new :async? flag to the adapter:

(jetty/run-jetty api/app {:port port :join? false :async? true})

Two core building blocks are then needed to use SSE with Ring: the newly introduced StreamableResponseBody protocol and a new middleware handler arity.

We’re going to extend a core.async channel with StreamableResponseBody:

(extend-type clojure.core.async.impl.channels.ManyToManyChannel
StreamableResponseBody
(write-body-to-stream [channel response output-stream]
  (async/go (with-open [writer (io/writer output-stream)]
              (async/loop []
                (when-let [msg (async/<! channel)]
                  (doto writer (.write msg) (.flush))
                  (recur)))))))

Everything that is pushed into the channel will be now pushed down to the browser.

The route handler is also returning the asynchronous handler(with 3 parameters):

(defroutes app-routes
  (GET "/" [] (index))
  (GET "/async" []
       (fn [req res raise]
         (let [ch (async/chan)]
           (res (stream-response ch))
           (async/go (async/>! ch (stream-msg {:val 42}))
                     (async/<! (async/timeout 1000))
                     (async/>! ch (stream-msg {:val 100}))
                     (async/close! ch)))))

)

Where stream-msg and stream-response are helper functions to create the correct SSE response headers.

(def stream-response
  (partial assoc {:status 200, :headers {"Content-Type" "text/event-stream"}} :body))

(def EOL "\n")

(defn stream-msg [payload]
  (str "data:" (json/write-str payload) EOL EOL))

The client side Javascript code is straightforward:

var source = new EventSource("/async");

source.onmessage= (e) => {
    const data = JSON.parse(e.data);
    console.log(data);

    if (data.msg == "end") {
        console.log("Closing the stream.");
        source.close();
    }
    //update time series graph, tabular data etc
};

source.onopen = (e) => {
    console.log("connection opened:" + e);
};

source.onerror = (e) => {
    console.log("error:" + e);
    if (e.readyState == EventSource.CLOSED) {
        console.log("connection closed:" + e);
    }
    source.close();
};

Updated: