Back when the web was young the only way that you could know whether a resource had changed its state was to manually re-request the page, this wasn’t really too much of a problem when there was only static pages which didn’t change all that often. Then along came server-side applications, the CGI and the like, these could change their state more frequently in ways that you might be interested in, but in effect you were still stuck with some variation of refreshing the page (albeit possibly initiated by the browser under the instruction of some tag in the page), so if, say, you had an application that kicked off a long running background task it might redirect you to another page that checked the status of the job that would periodically refresh itself then redirect to the results when the task was complete, (in fact I know of at least one reasonably well known reporting application that does just this still in 2022.)
Then sometime around the turn of century things started to get a lot more interactive with the introduction of the XMLHttpRequest API which allowed a script in a web page to make requests to the server and, based on the response, update the view appropriately, thus making it possible for a web page to reflect a change in state in the server without any refreshing ( though still with some polling of the server in the background by the client-side script.) Then along came the WebSocket API which provides for bi-directional communication between the client and server, and Server-Sent Events which provides for server push of events (with associated data.) These technologies provide means to reflect changes in an application state in a web page without needing a page refresh.
Here I’m going to describe a way of implementing client side notifications from a Raku web application using Server-sent Events.
Server-sent Events
The Server-sent Events provide a server to client push mechanism implemented using a persistent but otherwise standard HTTP connection with Chunked transfer encoding and typically a Content-Type
of text/event-stream
. The client-side API is EventSource and is supported by most modern browsers, there are also client libraries ( including EventSource::Client,) allowing non-web applications to consume an event stream (but that will be for another time.)
On the server side I have implemented the EventSource::Server; while the examples here are using Cro it could be used with any HTTP server framework that will accept a Supply as the response data and emit chunked data to the client until Supply is done.
Conceptually the EventSource::Server
is very simple: it takes a Supply of events and transforms them into properly formatted EventSource
events which can be transmitted to the client in a stream of chunked data.
The Client side part
This is the index.html
that will be served as static content from our server, it’s about the simplest I could come up with (using jQuery and Bootstrap for simplicity.) Essentially it’s a button that will make a request to the server, a space to put our “notifications” and the Javascript to consume the events from the server and display the notifications.
I don’t consider client side stuff as one of my core competencies, so forgive me for this.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Bootstrap 101 Template</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap-theme.min.css">
</head>
<body>
<main role="main" class="container-fluid">
<div class="row">
<div class="col"></div>
<div class="col-8 text-center">
<a href="button-pressed" class="btn btn-danger btn-lg active" role="button" aria-pressed="true">Press Me!</a>
</div>
<div class="col" id="notification-holder"></div>
</div>
</main>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/3.2.1/jquery.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/js/bootstrap.min.js"></script>
<script>
var sse;
function createNotification(message, type) {
var html = '<div class="shadow bg-body rounded alert alert-' + type + ' alert-dismissable page-alert">';
html += '<button type="button" data-dismiss="alert" class="close"><span aria-hidden="true">×</span><span class="sr-only">Close</span></button>';
html += message;
html += '</div>';
$(html).hide().prependTo('#notification-holder').slideDown();
};
function notificationHandler(e) {
const message = JSON.parse(event.data);
createNotification(message.message, message.type);
};
function setupNotifications() {
if ( sse ) {
sse.removeEventListener("notification", notificationHandler);
sse.close;
}
sse = new EventSource('/notifications');
sse.addEventListener("notification", notificationHandler );
$('.page-alert .close').click(function(e) {
e.preventDefault();
$(this).closest('.page-alert').slideUp();
});
return sse
};
setupNotifications();
</script>
</body>
</html>
Essentially the Javascript sets up the EventSource
client to consume the events we will publish on /notifications
and adds a Listener which parses the JSON data in the event (it doesn’t have to be JSON, but I find this most convenient,) and then insert the “notification” in the DOM. The rest is mostly Bootstrap stuff for dismissing the notification.
You could of course implement this in any other client-side framework (Angular, React or whatever the new New Hotness is,) but we’re here for the Raku not the Javascript.
Anyway this isn’t going to change at all, so if you actually want to run the examples, you can save it and forget about it.
The Server Side
The server part of our application is, largely, a simple Cro::HTTP application with three routes : one to serve up our index.html
from above, another to handle the button push request and obviously a route to serve up the event stream on /notifications
.
This is all bundled up in a single script for convenience of exposition, in a real world application you’d almost certainly want to split it up into several files.
class NotificationTest {
use Cro::HTTP::Server;
has Cro::Service $.http;
class Notifier {
use EventSource::Server;
use JSON::Class;
has Supplier::Preserving $!supplier = Supplier::Preserving.new;
enum AlertType is export (
Info => "info",
Success => "success",
Warning => "warning",
Danger => "danger"
);
class Message does JSON::Class {
has AlertType $.type is required is marshalled-by('Str');
has Str $.message is required;
has Str $.event-type = 'notification';
}
method notify(
AlertType $type,
Str() $message,
Str :$event-type = 'notification'
--> Nil ) {
$!supplier.emit:
Message.new(:$type, :$message :$event-type );
}
multi method event-stream( --> Supply) {
my $supply = $!supplier.Supply.map: -> $m {
EventSource::Server::Event.new(
type => $m.event-type,
data => $m.to-json(:!pretty)
)
}
EventSource::Server.new(
:$supply,
:keepalive,
keepalive-interval => 10
).out-supply;
}
}
class Routes {
use Cro::HTTP::Router;
has Notifier $.notifier
handles <notify event-stream> = Notifier.new;
method routes() {
route {
get -> {
static $*PROGRAM.parent, 'index.html';
}
get -> 'notifications' {
header 'X-Accel-Buffering', 'no';
content 'text/event-stream', $.event-stream();
}
get -> 'button-pressed' {
$.notify(Notifier::Info, 'Someone pressed the button');
}
}
}
}
has $.routes-object;
method routes-object( --> Routes ) handles <routes> {
$!routes-object //= Routes.new();
}
method http( --> Cro::Service ) handles <start stop> {
$!http //= Cro::HTTP::Server.new(
http => <1.1>,
host => '0.0.0.0',
port => 9999,
application => $.routes,
);
}
}
multi sub MAIN() {
my NotificationTest $http = NotificationTest.new;
$http.start;
say "Listening at https://127.0.0.1:9999";
react {
whenever signal(SIGINT) {
$http.stop;
done;
}
}
}
There’s nothing particularly unusual about this, but you’ll probably see that nearly everything is happening in the Notifier
class. The routes are defined within a method within a Routes
class so that the key methods of Notifier
can be delegated from an instance of that class, which makes it nicer than having a global object, but also makes it easier to refactor or even replace the Notifier
at run time (perhaps to localise the messages for example.)
The Notifier
class itself can be thought of as a wrapper for the EventSource::Server
, there is a Supplier
(here a Supplier::Preserving
which works better for this scenario,) onto which objects of Message
or emitted by notify
method, the Message
class consumes JSON::Class
so that it can easily be serialized as JSON when creating the final event that will be output onto the event stream. The EventType
enumeration here maps to the CSS classes in the resulting notification HTML that influence the colour of the notification as displayed.
Most of the action here is actually going on in the event-stream
method, which constructs the stream that is output to the client:
multi method event-stream( –> Supply) {
my $supply = $!supplier.Supply.map: -> $m {
EventSource::Server::Event.new(
type => $m.event-type,
data => $m.to-json(:!pretty)
)
}
EventSource::Server.new(
:$supply,
:keepalive,
keepalive-interval => 10
).out-supply;
}
This maps the Supply
derived from our Supplier
such that the Message
objects are serialized and wrapped in an EventSource::Server::Event
object, the resulting new Supply
is then passed to the EventSource::Server
. The out-supply
returns a further Supply
which emits the encode event stream data suitable for being passed as content in the Cro route. The wrapping of the Message
in the Event
isn’t strictly necessary here as EventSource::Server
will do it internally if necessary, but doing so allows control of the type
which is the event type that will be specified when adding the event listener in your Javascript, so, for instance, you could emit events of different types on your stream and have different listeners for each in your Javascript, each having a different effect on your page.
The route for /notifications
probably warrants closer inspection:
get -> 'notifications' {
header 'X-Accel-Buffering', 'no';
content 'text/event-stream', $.event-stream();
}
Firstly, unless you have a particular reason, the Content Type should always be text/event-stream
otherwise the client won’t recognise the stream, and, in all the implementations I have tried at least, will just sit there annoyingly doing nothing. The header here isn’t strictly necessary for this example, however if your clients will be accessing your application via a reverse proxy such as nginx
then you may need to supply this (or one specific to your proxy,) in order to prevent the proxy buffering your stream which may lead to the events never being delivered to the client.
But what if don’t want everyone to get the same notifications?
This is all very well but for the majority of applications you probably want to send notifications to specific users (or sessions,) it’s unlikely that all the users of our application are interested that someone pressed the button, so we’ll introduce the notion of a session using Cro:HTTP::Session::InMemory, this has the advantage of being very simple to implement (and built-in.)
The changes to our original example are really quite small (I’ve omitted any authentication to keep it simple:)
class NotificationTest {
use Cro::HTTP::Server;
use Cro::HTTP::Auth;
has Cro::Service $.http;
class Session does Cro::HTTP::Auth {
has Supplier $!supplier handles <emit Supply> = Supplier.new;
}
class Notifier {
use EventSource::Server;
use JSON::Class;
enum AlertType is export (
Info => "info",
Success => "success",
Warning => "warning",
Danger => "danger"
);
class Message does JSON::Class {
has AlertType $.type is required is marshalled-by('Str');
has Str $.message is required;
has Str $.event-type = ‘notification‘;
}
method notify(
Session $session,
AlertType $type,
Str() $message,
Str :$event-type = 'notification'
–> Nil) {
$session.emit: Message.new(:$type, :$message :$event-type );
}
multi method event-stream(Session $session, –> Supply) {
my $supply = $session.Supply.map: -> $m {
EventSource::Server::Event.new(
type => $m.event-type,
data => $m.to-json(:!pretty)
)
}
EventSource::Server.new(
:$supply,
:keepalive,
keepalive-interval => 10
).out-supply;
}
}
class Routes {
use Cro::HTTP::Router;
use Cro::HTTP::Session::InMemory;
has Notifier $.notifier handles <notify event-stream> = Notifier.new;
method routes() {
route {
before Cro::HTTP::Session::InMemory[Session].new;
get -> Session $session {
static $*PROGRAM.parent, ‘index.html‘;
}
get -> Session $session, ‘notifications‘ {
header ‘X-Accel-Buffering‘, ‘no‘;
content ‘text/event-stream‘, $.event-stream($session);
}
get -> Session $session, ‘button-pressed‘ {
$.notify($session, Notifier::Info, ‘You pressed the button‘);
}
}
}
}
has $.routes-object;
method routes-object( –> Routes ) handles <routes> {
$!routes-object //= Routes.new();
}
method http( –> Cro::Service ) handles <start stop> {
$!http //= Cro::HTTP::Server.new(
http => <1.1>,
host => ‘0.0.0.0‘,
port => 9999,
application => $.routes,
);
}
}
multi sub MAIN() {
my NotificationTest $http = NotificationTest.new;
$http.start;
say “Listening at https://127.0.0.1:9999“;
react {
whenever signal(SIGINT) {
$http.stop;
done;
}
}
}
As you can see much of the code remains unchanged, we’ve introduced a new Session
class and made some changes to the Notifier
methods and the routes.
The Session
class is instantiated on the start of a new session and will be kept in memory until the session expires:
class Session does Cro::HTTP::Auth {
has Supplier $!supplier
handles <emit Supply> = Supplier.new;
}
Because the same object stays in memory we can replace the single Supplier
of the Notifier
object with a per-session one, the same Session
object being passed to the routes during the lifetime of the session:
method routes() {
route {
before Cro::HTTP::Session::InMemory[Session].new;
get -> Session $session {
static $*PROGRAM.parent, 'index.html';
}
get -> Session $session, 'notifications' {
header 'X-Accel-Buffering', 'no';
content 'text/event-stream', $.event-stream($session);
}
get -> Session $session, 'button-pressed' {
$.notify($session, Notifier::Info, 'You pressed the button');
}
}
}
The Cro::HTTP::Session::InMemory
is introduced as a Middleware
that handles the creation or retrieval of a session, setting the session cookie and so forth before the request is passed to the appropriate route. Where the first argument to a route block has a type that does Cro::HTTP::Auth
then the session object will be passed, you can do interesting things with authentication and authorization by using more specific subsets of your Session
class but we won’t need that here and we’ll just pass the session object to the modified Notifier
methods:
method notify(
Session $session,
AlertType $type,
Str() $message,
Str :$event-type = 'notification'
–> Nil) {
$session.emit: Message.new(:$type, :$message :$event-type );
}
multi method event-stream( Session $session, –> Supply) {
my $supply = $session.Supply.map: -> $m {
EventSource::Server::Event.new(
type => $m.event-type,
data => $m.to-json(:!pretty)
)
}
EventSource::Server.new(
:$supply,
:keepalive,
keepalive-interval => 10
).out-supply;
}
Both the notify
and event-stream
are simply amended to take the Session
object as the first argument and to use the (delegated) methods on the Session’s own Supplier
rather than the shared one from Notifier
.
And now each ‘user’ can get their own notifications, the button could be starting a long running job and they could be notified when it’s done. You could extend to do “broadcast” notifications by putting back the shared Supplier
in Notifier
, make a second multi candidate of notify
which doesn’t take the Session
and which would emit to that Supplier, then merge
the shared and instance specific Supplies in the event-stream
method.
But what if I have more than instance of my application?
You’ve probably worked out by now that using the “in-memory” session won’t work if you have more than one instance of your application, you might be able to get away with setting up “sticky sessions” on a load balancer at a push, but probably not something you’d want to rely on.
What we need is a shared source of notifications to which all the new notifications can be added and from which each instance will retrieve the notifications to be sent.
For this we can use a PostgreSQL database, which handily has a NOTIFY which allows the server to send a notification to all the connected clients that have requested to receive them.
In the amended application we will use Red to access the database ( plus a feature of DB::Pg to consume notifications from the server.)
For our simple application we only a table to hold the notifications, and a table in which to persist the sessions ( using Cro::HTTP::Session::Red ,) so let’s make them upfront:
CREATE FUNCTION public.new_notification() RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
PERFORM pg_notify(‘notifications‘, ‘‘ || NEW.id || ‘‘);
RETURN NEW;
END;
$$;
CREATE TABLE public.notification (
id uuid NOT NULL,
session_id character varying(255),
type character varying(255) NOT NULL,
message character varying(255) NOT NULL,
event_type character varying(255) NOT NULL
);
CREATE TABLE public.session (
id character varying(255) NOT NULL
);
ALTER TABLE ONLY public.notification
ADD CONSTRAINT notification_pkey PRIMARY KEY (id);
ALTER TABLE ONLY public.session
ADD CONSTRAINT session_pkey PRIMARY KEY (id);
CREATE TRIGGER notification_trigger AFTER INSERT ON public.notification FOR EACH ROW EXECUTE PROCEDURE public.new_notification();
ALTER TABLE ONLY public.notification
ADD CONSTRAINT notification_session_id_fkey FOREIGN KEY (session_id) REFERENCES public.session(id);
I’ve used a database called notification_test
in the example. The notification
table has similar columns to the attributes of the Message
class with the addition of the id
and session_id
, there is a trigger on insert that sends the Pg notification with the id of the new row, which will be consumed by the application.
The session
table only has the required id
column that will be populated by the session middleware when the new session is created.
The code has a few more changes than between the first examples, but the majority of the changes are to introduce the Red
models for the two DB tables and to rework the way that the Notifier
works:
class NotificationTest {
use Cro::HTTP::Server;
use Cro::HTTP::Auth;
use UUID;
use Red;
use Red::DB;
need Red::Driver;
use JSON::Class;
use JSON::OptIn;
has Cro::Service $.http;
model Message {
…
}
model Session is table('session') does Cro::HTTP::Auth {
has Str $.id is id;
has @.messages
is relationship({ .session-id }, model => Message )
is json-skip;
}
enum AlertType is export (
Info => "info",
Success => "success",
Warning => "warning",
Danger => "danger"
);
model Message is table('notification') does JSON::Class {
has Str $.id is id is marshalled-by('Str') = UUID.new.Str;
has Str $.session-id is referencing(model => Session, column => 'id' ) is json-skip;
has AlertType $.type is column is required is marshalled-by('Str');
has Str $.message is column is required is json;
has Str $.event-type is column is json = 'notification';
}
has Red::Driver $.database = database 'Pg', dbname => 'notification_test';
class Notifier {
use EventSource::Server;
has Red::Driver $.database;
method database(–> Red::Driver) handles <dbh> {
$!database //= get-RED-DB();
}
has Supply $.message-supply;
method message-supply( –> Supply ) {
$!message-supply //= supply {
whenever $.dbh.listen('notifications') -> $id {
if Message.^rs.grep(-> $v { $v.id eq $id }).head -> $message {
emit $message;
}
}
}
}
method notify(
Session $session,
AlertType $type,
Str() $message,
Str :$event-type = 'notification'
–> Nil ) {
Message.^create(
session-id => $session.id, :$type, :$message :$event-type
);
}
multi method event-stream( Session $session, –> Supply) {
my $supply = $.message-supply.grep( -> $m {
$m.session-id eq $session.id
}).map( -> $m {
EventSource::Server::Event.new(
type => $m.event-type,
data => $m.to-json(:!pretty)
)
});
EventSource::Server.new(
:$supply,
:keepalive,
keepalive-interval => 10
).out-supply;
}
}
class Routes {
use Cro::HTTP::Router;
use Cro::HTTP::Session::Red;
has Notifier $.notifier
handles <notify event-stream> = Notifier.new;
method routes() {
route {
before Cro::HTTP::Session::Red[Session].new: cookie-name => 'NTEST_SESSION';
get -> Session $session {
static $*PROGRAM.parent, 'index.html';
}
get -> Session $session, 'notifications' {
header 'X-Accel-Buffering', 'no';
content 'text/event-stream', $.event-stream($session);
}
get -> Session $session, 'button-pressed' {
$.notify($session, Info, 'You pressed the button');
}
}
}
}
has $.routes-object;
method routes-object( –> Routes ) handles <routes> {
$!routes-object //= Routes.new();
}
method http( –> Cro::Service ) handles <start stop> {
$!http //= Cro::HTTP::Server.new(
http => <1.1>,
host => '0.0.0.0',
port => 9999,
application => $.routes,
);
}
}
multi sub MAIN() {
my NotificationTest $http = NotificationTest.new;
$GLOBAL::RED-DB = $http.database;
$http.start;
say "Listening at https://127.0.0.1:9999";
react {
whenever signal(SIGINT) {
$http.stop;
done;
}
}
}
I’ll gloss over the definition of the Red
models as that should be mostly obvious, except to note that the Message
model also does JSON::Class
which allows the instances to be serialized as JSON (just like the original example,) so no extra code is required to create the events that are sent to the client.
The major changes are to the Notifier
class which introduces message-supply
which creates an on-demand supply) replacing the shared Supplier
of the first example, and the per-session Supplier
of the second:
has Supply $.message-supply;
method message-supply( –> Supply ) {
$!message-supply //= supply {
whenever $.dbh.listen(‘notifications‘) -> $id {
if Message.^rs.grep(-> $v { $v.id eq $id }).head -> $message {
emit $message;
}
}
}
}
This taps the Supply
of Pg notifications provided by the underlying DB::Pg
, which ( referring back to the SQL trigger described above,) emits the id
of the newly created notification rows in the database, the notification row is then retrieved and then emitted onto the message-supply
.
The notify
method is altered to insert the Message
to the notification
table:
method notify(
Session $session,
AlertType $type,
Str() $message,
Str :$event-type = 'notification'
–> Nil ) {
Message.^create(session-id => $session.id, :$type, :$message :$event-type );
}
The signature of the method is unchanged and the session-id
from the supplied Session
is inserted into the Message
.
The event-stream
method needs to be altered to process the Message
objects from the message-supply
and select only those for the requested Session
:
multi method event-stream( Session $session, –> Supply) {
my $supply = $.message-supply.grep( -> $m {
$m.session-id eq $session.id
}).map( -> $m {
EventSource::Server::Event.new(
type => $m.event-type,
data => $m.to-json(:!pretty)
)
});
EventSource::Server.new(
:$supply,
:keepalive,
keepalive-interval => 10
).out-supply;
}
And that’s basically it, there’s a little extra scaffolding to deal with the database but not a particulary large change.
What else?
I’ve omitted any authentication from these example for brevity, but if you wanted to have per-user notifications then, if you have authenticated users, you could add the user id to the Message
and filter where the user matches that of the Session
.
Instead of using the Pg notifications, if you want to still use a database, you could repeatedly query the notifications table for new notifications as a background task. Or you could use some message queue to convey the notifications (ActiveMQ topics or a RabbitMQ fanout exchange for example).
But now you can tell your users what is going on in the application without them having to do anything.
One thought on “Day 8: I’ll Let You Know Later”