Day 8: I’ll Let You Know Later

Back when the web was young the only way that you could know whether a resource had changed its state was to manually re-request the page, this wasn’t really too much of a problem when there was only static pages which didn’t change all that often. Then along came server-side applications, the CGI and the like, these could change their state more frequently in ways that you might be interested in, but in effect you were still stuck with some variation of refreshing the page (albeit possibly initiated by the browser under the instruction of some tag in the page), so if, say, you had an application that kicked off a long running background task it might redirect you to another page that checked the status of the job that would periodically refresh itself then redirect to the results when the task was complete, (in fact I know of at least one reasonably well known reporting application that does just this still in 2022.)

Then sometime around the turn of century things started to get a lot more interactive with the introduction of the XMLHttpRequest API which allowed a script in a web page to make requests to the server and, based on the response, update the view appropriately, thus making it possible for a web page to reflect a change in state in the server without any refreshing ( though still with some polling of the server in the background by the client-side script.) Then along came the WebSocket API which provides for bi-directional communication between the client and server, and Server-Sent Events which provides for server push of events (with associated data.) These technologies provide means to reflect changes in an application state in a web page without needing a page refresh.

Here I’m going to describe a way of implementing client side notifications from a Raku web application using Server-sent Events.

Server-sent Events

The Server-sent Events provide a server to client push mechanism implemented using a persistent but otherwise standard HTTP connection with Chunked transfer encoding and typically a Content-Type of text/event-stream. The client-side API is EventSource and is supported by most modern browsers, there are also client libraries ( including EventSource::Client,) allowing non-web applications to consume an event stream (but that will be for another time.)

On the server side I have implemented the EventSource::Server; while the examples here are using Cro it could be used with any HTTP server framework that will accept a Supply as the response data and emit chunked data to the client until Supply is done.

Conceptually the EventSource::Server is very simple: it takes a Supply of events and transforms them into properly formatted EventSource events which can be transmitted to the client in a stream of chunked data.

The Client side part

This is the index.html that will be served as static content from our server, it’s about the simplest I could come up with (using jQuery and Bootstrap for simplicity.) Essentially it’s a button that will make a request to the server, a space to put our “notifications” and the Javascript to consume the events from the server and display the notifications.

I don’t consider client side stuff as one of my core competencies, so forgive me for this.

<!DOCTYPE html>
<html lang="en">
 <head>
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width, initial-scale=1">
  <title>Bootstrap 101 Template</title>
  <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css">
  <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap-theme.min.css">
 </head>
 <body>
  <main role="main" class="container-fluid">
   <div class="row">
    <div class="col"></div>
    <div class="col-8 text-center">
     <a href="button-pressed" class="btn btn-danger btn-lg active" role="button" aria-pressed="true">Press Me!</a>
    </div>
    <div class="col" id="notification-holder"></div>
   </div>
  </main>
  <script src="https://ajax.googleapis.com/ajax/libs/jquery/3.2.1/jquery.min.js"></script>
  <script src="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/js/bootstrap.min.js"></script>
  <script>
    var sse;
    function createNotification(message, type) {
      var html = '<div class="shadow bg-body rounded alert alert-' + type + ' alert-dismissable page-alert">';
      html += '<button type="button" data-dismiss="alert" class="close"><span aria-hidden="true">×</span><span class="sr-only">Close</span></button>';
      html += message;
      html += '</div>';
      $(html).hide().prependTo('#notification-holder').slideDown();
    };
    function notificationHandler(e) {
      const message = JSON.parse(event.data);
      createNotification(message.message, message.type);
    };
    function setupNotifications() {
      if ( sse ) {
        sse.removeEventListener("notification", notificationHandler);
        sse.close;
      }
 	 
      sse = new EventSource('/notifications');
      sse.addEventListener("notification", notificationHandler );
      $('.page-alert .close').click(function(e) {
        e.preventDefault();
        $(this).closest('.page-alert').slideUp();
      });
      return sse
    };
    setupNotifications();
  </script>
 </body>
</html>

Essentially the Javascript sets up the EventSource client to consume the events we will publish on /notifications and adds a Listener which parses the JSON data in the event (it doesn’t have to be JSON, but I find this most convenient,) and then insert the “notification” in the DOM. The rest is mostly Bootstrap stuff for dismissing the notification.

You could of course implement this in any other client-side framework (Angular, React or whatever the new New Hotness is,) but we’re here for the Raku not the Javascript.

Anyway this isn’t going to change at all, so if you actually want to run the examples, you can save it and forget about it.

The Server Side

The server part of our application is, largely, a simple Cro::HTTP application with three routes : one to serve up our index.html from above, another to handle the button push request and obviously a route to serve up the event stream on /notifications.

This is all bundled up in a single script for convenience of exposition, in a real world application you’d almost certainly want to split it up into several files.

class NotificationTest {
    use Cro::HTTP::Server;
    has Cro::Service $.http;

    class Notifier {
        use EventSource::Server;
        use JSON::Class;

        has Supplier::Preserving $!supplier = Supplier::Preserving.new;

        enum AlertType is export (
          Info    => "info",
          Success => "success",
          Warning => "warning",
          Danger  => "danger"
        );

        class Message does JSON::Class {
            has AlertType $.type is required is marshalled-by('Str');
            has Str $.message is required;
            has Str $.event-type = 'notification';
        }

        method notify(
          AlertType  $type,
              Str()  $message,
              Str  :$event-type = 'notification'
        --> Nil ) {
            $!supplier.emit:
              Message.new(:$type, :$message :$event-type );
        }

        multi method event-stream( --> Supply) {
            my $supply = $!supplier.Supply.map: -> $m {
                EventSource::Server::Event.new(
                  type => $m.event-type,
                  data => $m.to-json(:!pretty)
                )
            }
            EventSource::Server.new(
              :$supply,
              :keepalive,
              keepalive-interval => 10
            ).out-supply;
        }
    }

    class Routes {
        use Cro::HTTP::Router;

        has Notifier $.notifier
          handles <notify event-stream> = Notifier.new;

        method routes() {
            route {
                get -> {
                    static $*PROGRAM.parent, 'index.html';
                }
                get -> 'notifications' {
                    header 'X-Accel-Buffering', 'no';
                    content 'text/event-stream', $.event-stream();
                }
                get -> 'button-pressed' {
                    $.notify(Notifier::Info, 'Someone pressed the button');
                }
            }
        }
    }

    has $.routes-object;

    method routes-object( --> Routes ) handles <routes> {
        $!routes-object //= Routes.new();
    }

    method http( --> Cro::Service ) handles <start stop> {
        $!http //= Cro::HTTP::Server.new(
          http => <1.1>,
          host => '0.0.0.0',
          port => 9999,
          application => $.routes,
        );
    }
}

multi sub MAIN() {
    my NotificationTest $http = NotificationTest.new;
    $http.start;
    say "Listening at https://127.0.0.1:9999";
    react {
        whenever signal(SIGINT) {
            $http.stop;
            done;
        }
    }
} 

There’s nothing particularly unusual about this, but you’ll probably see that nearly everything is happening in the Notifier class. The routes are defined within a method within a Routes class so that the key methods of Notifier can be delegated from an instance of that class, which makes it nicer than having a global object, but also makes it easier to refactor or even replace the Notifier at run time (perhaps to localise the messages for example.)

The Notifier class itself can be thought of as a wrapper for the EventSource::Server, there is a Supplier (here a Supplier::Preserving which works better for this scenario,) onto which objects of Message or emitted by notify method, the Message class consumes JSON::Class so that it can easily be serialized as JSON when creating the final event that will be output onto the event stream. The EventType enumeration here maps to the CSS classes in the resulting notification HTML that influence the colour of the notification as displayed.

Most of the action here is actually going on in the event-stream method, which constructs the stream that is output to the client:

multi method event-stream( –> Supply) {
    my $supply = $!supplier.Supply.map: -> $m {
        EventSource::Server::Event.new(
          type => $m.event-type,
          data => $m.to-json(:!pretty)
        )
    }
    EventSource::Server.new(
      :$supply,
      :keepalive,
      keepalive-interval => 10
    ).out-supply;
} 

This maps the Supply derived from our Supplier such that the Message objects are serialized and wrapped in an EventSource::Server::Event object, the resulting new Supply is then passed to the EventSource::Server. The out-supply returns a further Supply which emits the encode event stream data suitable for being passed as content in the Cro route. The wrapping of the Message in the Event isn’t strictly necessary here as EventSource::Server will do it internally if necessary, but doing so allows control of the type which is the event type that will be specified when adding the event listener in your Javascript, so, for instance, you could emit events of different types on your stream and have different listeners for each in your Javascript, each having a different effect on your page.

The route for /notifications probably warrants closer inspection:

get -> 'notifications' {
    header 'X-Accel-Buffering', 'no';
    content 'text/event-stream', $.event-stream();
} 

Firstly, unless you have a particular reason, the Content Type should always be text/event-stream otherwise the client won’t recognise the stream, and, in all the implementations I have tried at least, will just sit there annoyingly doing nothing. The header here isn’t strictly necessary for this example, however if your clients will be accessing your application via a reverse proxy such as nginx then you may need to supply this (or one specific to your proxy,) in order to prevent the proxy buffering your stream which may lead to the events never being delivered to the client.

But what if don’t want everyone to get the same notifications?

This is all very well but for the majority of applications you probably want to send notifications to specific users (or sessions,) it’s unlikely that all the users of our application are interested that someone pressed the button, so we’ll introduce the notion of a session using Cro:HTTP::Session::InMemory, this has the advantage of being very simple to implement (and built-in.)

The changes to our original example are really quite small (I’ve omitted any authentication to keep it simple:)

class NotificationTest {
    use Cro::HTTP::Server;
    use Cro::HTTP::Auth;

    has Cro::Service $.http;

    class Session does Cro::HTTP::Auth {
        has Supplier $!supplier handles <emit Supply> = Supplier.new;
    }

    class Notifier {
        use EventSource::Server;
        use JSON::Class;

        enum AlertType is export (
          Info    => "info",
          Success => "success",
          Warning => "warning",
          Danger  => "danger"
        );

        class Message does JSON::Class {
            has AlertType $.type is required is marshalled-by('Str');
            has Str $.message is required;
            has Str $.event-type = ‘notification‘;
        }

        method notify(
          Session   $session,
          AlertType $type,
              Str() $message,
              Str  :$event-type = 'notification'
        –> Nil) {
            $session.emit: Message.new(:$type, :$message :$event-type );
        }

        multi method event-stream(Session $session, –> Supply) {
            my $supply = $session.Supply.map: -> $m {
                EventSource::Server::Event.new(
                  type => $m.event-type,
                  data => $m.to-json(:!pretty)
                )
            }
            EventSource::Server.new(
              :$supply,
              :keepalive,
              keepalive-interval => 10
            ).out-supply;
        }
    }

    class Routes {
        use Cro::HTTP::Router;
        use Cro::HTTP::Session::InMemory;

        has Notifier $.notifier handles <notify event-stream> = Notifier.new;

        method routes() {
            route {
                before Cro::HTTP::Session::InMemory[Session].new;
                get -> Session $session {
                    static $*PROGRAM.parent, ‘index.html‘;
                }
                get -> Session $session, ‘notifications‘ {
                    header ‘X-Accel-Buffering‘, ‘no‘;
                    content ‘text/event-stream‘, $.event-stream($session);
                }
                get -> Session $session, ‘button-pressed‘ {
                    $.notify($session, Notifier::Info, ‘You pressed the button‘);
                }
            }
        }
    }

    has $.routes-object;

    method routes-object( –> Routes ) handles <routes> {
        $!routes-object //= Routes.new();
    }

    method http( –> Cro::Service ) handles <start stop> {
        $!http //= Cro::HTTP::Server.new(
          http => <1.1>,
          host => ‘0.0.0.0‘,
          port => 9999,
          application => $.routes,
        );
    }
}

multi sub MAIN() {
    my NotificationTest $http = NotificationTest.new;
    $http.start;
    say “Listening at https://127.0.0.1:9999“;
    react {
        whenever signal(SIGINT) {
            $http.stop;
            done;
        }
    }
} 

As you can see much of the code remains unchanged, we’ve introduced a new Session class and made some changes to the Notifier methods and the routes.

The Session class is instantiated on the start of a new session and will be kept in memory until the session expires:

class Session does Cro::HTTP::Auth {
    has Supplier $!supplier
      handles <emit Supply> = Supplier.new;
} 

Because the same object stays in memory we can replace the single Supplier of the Notifier object with a per-session one, the same Session object being passed to the routes during the lifetime of the session:

method routes() {
    route {
        before Cro::HTTP::Session::InMemory[Session].new;
        get -> Session $session {
            static $*PROGRAM.parent, 'index.html';
        }
        get -> Session $session, 'notifications' {
            header 'X-Accel-Buffering', 'no';
            content 'text/event-stream', $.event-stream($session);
        }
        get -> Session $session, 'button-pressed' {
            $.notify($session, Notifier::Info, 'You pressed the button');
        }
    }
} 

The Cro::HTTP::Session::InMemory is introduced as a Middleware that handles the creation or retrieval of a session, setting the session cookie and so forth before the request is passed to the appropriate route. Where the first argument to a route block has a type that does Cro::HTTP::Auth then the session object will be passed, you can do interesting things with authentication and authorization by using more specific subsets of your Session class but we won’t need that here and we’ll just pass the session object to the modified Notifier methods:

method notify(
    Session $session,
  AlertType $type,
      Str() $message,
       Str :$event-type = 'notification'
–> Nil) {
        $session.emit: Message.new(:$type, :$message :$event-type );
}
     
multi method event-stream( Session $session, –> Supply) {
    my $supply = $session.Supply.map: -> $m {
        EventSource::Server::Event.new(
          type => $m.event-type,
          data => $m.to-json(:!pretty)
        )
    }
    EventSource::Server.new(
      :$supply,
      :keepalive,
      keepalive-interval => 10
    ).out-supply;
} 

Both the notify and event-stream are simply amended to take the Session object as the first argument and to use the (delegated) methods on the Session’s own Supplier rather than the shared one from Notifier.

And now each ‘user’ can get their own notifications, the button could be starting a long running job and they could be notified when it’s done. You could extend to do “broadcast” notifications by putting back the shared Supplier in Notifier, make a second multi candidate of notify which doesn’t take the Session and which would emit to that Supplier, then merge the shared and instance specific Supplies in the event-stream method.

But what if I have more than instance of my application?

You’ve probably worked out by now that using the “in-memory” session won’t work if you have more than one instance of your application, you might be able to get away with setting up “sticky sessions” on a load balancer at a push, but probably not something you’d want to rely on.

What we need is a shared source of notifications to which all the new notifications can be added and from which each instance will retrieve the notifications to be sent.

For this we can use a PostgreSQL database, which handily has a NOTIFY which allows the server to send a notification to all the connected clients that have requested to receive them.

In the amended application we will use Red to access the database ( plus a feature of DB::Pg to consume notifications from the server.)

For our simple application we only a table to hold the notifications, and a table in which to persist the sessions ( using Cro::HTTP::Session::Red ,) so let’s make them upfront:

CREATE FUNCTION public.new_notification() RETURNS trigger LANGUAGE plpgsql AS $$
    BEGIN
    PERFORM pg_notify(‘notifications‘, ‘‘ || NEW.id || ‘‘);
    RETURN NEW;
    END;
    $$;
     
    CREATE TABLE public.notification (
      id uuid NOT NULL,
      session_id character varying(255),
      type character varying(255) NOT NULL,
      message character varying(255) NOT NULL,
      event_type character varying(255) NOT NULL
    );
     
    CREATE TABLE public.session (
      id character varying(255) NOT NULL
    );
     
    ALTER TABLE ONLY public.notification
    ADD CONSTRAINT notification_pkey PRIMARY KEY (id);
     
    ALTER TABLE ONLY public.session
    ADD CONSTRAINT session_pkey PRIMARY KEY (id);
     
    CREATE TRIGGER notification_trigger AFTER INSERT ON public.notification FOR EACH ROW EXECUTE PROCEDURE public.new_notification();
     
    ALTER TABLE ONLY public.notification
    ADD CONSTRAINT notification_session_id_fkey FOREIGN KEY (session_id) REFERENCES public.session(id); 

I’ve used a database called notification_test in the example. The notification table has similar columns to the attributes of the Message class with the addition of the id and session_id, there is a trigger on insert that sends the Pg notification with the id of the new row, which will be consumed by the application.

The session table only has the required id column that will be populated by the session middleware when the new session is created.

The code has a few more changes than between the first examples, but the majority of the changes are to introduce the Red models for the two DB tables and to rework the way that the Notifier works:

class NotificationTest {
    use Cro::HTTP::Server;
    use Cro::HTTP::Auth;
    use UUID;
    use Red;
    use Red::DB;
    need Red::Driver;
    use JSON::Class;
    use JSON::OptIn;
     
    has Cro::Service $.http;
     
    model Message {
        …
    }
     
    model Session is table('session') does Cro::HTTP::Auth {
        has Str $.id is id;
        has @.messages
          is relationship({ .session-id }, model => Message )
          is json-skip;
    }
     
    enum AlertType is export (
      Info    => "info",
      Success => "success",
      Warning => "warning",
      Danger  => "danger"
    );
     
    model Message is table('notification') does JSON::Class {
        has Str $.id is id is marshalled-by('Str') = UUID.new.Str;
        has Str $.session-id is referencing(model => Session, column => 'id' ) is json-skip;
        has AlertType $.type is column is required is marshalled-by('Str');
        has Str $.message is column is required is json;
        has Str $.event-type is column is json = 'notification';
    }
     
    has Red::Driver $.database = database 'Pg', dbname => 'notification_test';
     
    class Notifier {
        use EventSource::Server;

        has Red::Driver $.database;
     
        method database(–> Red::Driver) handles <dbh> {
            $!database //= get-RED-DB();
        }
     
        has Supply $.message-supply;
     
        method message-supply( –> Supply ) {
            $!message-supply //= supply {
                whenever $.dbh.listen('notifications') -> $id {
                    if Message.^rs.grep(-> $v { $v.id eq $id }).head -> $message {
                        emit $message;
                    }
                }
            }
        }
     
        method notify(
          Session   $session,
          AlertType $type,
              Str() $message,
               Str :$event-type = 'notification'
        –> Nil ) {
            Message.^create(
              session-id => $session.id, :$type, :$message :$event-type
            );
        }
     
        multi method event-stream( Session $session, –> Supply) {
            my $supply = $.message-supply.grep( -> $m {
                $m.session-id eq $session.id
            }).map( -> $m {
                EventSource::Server::Event.new(
                  type => $m.event-type,
                  data => $m.to-json(:!pretty)
                )
            });
            EventSource::Server.new(
              :$supply,
              :keepalive,
              keepalive-interval => 10
            ).out-supply;
        }
    }
         
    class Routes {
        use Cro::HTTP::Router;
        use Cro::HTTP::Session::Red;
     
        has Notifier $.notifier
          handles <notify event-stream> = Notifier.new;
     
        method routes() {
            route {
                before Cro::HTTP::Session::Red[Session].new: cookie-name => 'NTEST_SESSION';
                get -> Session $session {
                    static $*PROGRAM.parent, 'index.html';
                }
                get -> Session $session, 'notifications' {
                    header 'X-Accel-Buffering', 'no';
                    content 'text/event-stream', $.event-stream($session);
                }
                get -> Session $session, 'button-pressed' {
                    $.notify($session, Info, 'You pressed the button');
                }
            }
        }
    }
     
    has $.routes-object;
     
    method routes-object( –> Routes ) handles <routes> {
        $!routes-object //= Routes.new();
    }
     
    method http( –> Cro::Service ) handles <start stop> {
        $!http //= Cro::HTTP::Server.new(
          http => <1.1>,
          host => '0.0.0.0',
          port => 9999,
          application => $.routes,
        );
    }
}
     
multi sub MAIN() {
    my NotificationTest $http = NotificationTest.new;
    $GLOBAL::RED-DB = $http.database;
    $http.start;
    say "Listening at https://127.0.0.1:9999";
    react {
        whenever signal(SIGINT) {
            $http.stop;
            done;
        }
    }
} 

I’ll gloss over the definition of the Red models as that should be mostly obvious, except to note that the Message model also does JSON::Class which allows the instances to be serialized as JSON (just like the original example,) so no extra code is required to create the events that are sent to the client.

The major changes are to the Notifier class which introduces message-supply which creates an on-demand supply) replacing the shared Supplier of the first example, and the per-session Supplier of the second:

has Supply $.message-supply;
 	 
method message-supply( –> Supply ) {
    $!message-supply //= supply {
        whenever $.dbh.listen(‘notifications‘) -> $id {
            if Message.^rs.grep(-> $v { $v.id eq $id }).head -> $message {
                emit $message;
            }
        }
    }
} 

This taps the Supply of Pg notifications provided by the underlying DB::Pg, which ( referring back to the SQL trigger described above,) emits the id of the newly created notification rows in the database, the notification row is then retrieved and then emitted onto the message-supply.

The notify method is altered to insert the Message to the notification table:

method notify(
  Session $session,
  AlertType $type,
  Str() $message,
  Str :$event-type = 'notification'
–> Nil ) {
        Message.^create(session-id => $session.id, :$type, :$message :$event-type );
} 

The signature of the method is unchanged and the session-id from the supplied Session is inserted into the Message.

The event-stream method needs to be altered to process the Message objects from the message-supply and select only those for the requested Session:

multi method event-stream( Session $session, –> Supply) {
    my $supply = $.message-supply.grep( -> $m {
        $m.session-id eq $session.id
    }).map( -> $m {
        EventSource::Server::Event.new(
          type => $m.event-type,
          data => $m.to-json(:!pretty)
        )
    });
    EventSource::Server.new(
      :$supply,
      :keepalive,
      keepalive-interval => 10
    ).out-supply;
} 

And that’s basically it, there’s a little extra scaffolding to deal with the database but not a particulary large change.

What else?

I’ve omitted any authentication from these example for brevity, but if you wanted to have per-user notifications then, if you have authenticated users, you could add the user id to the Message and filter where the user matches that of the Session.

Instead of using the Pg notifications, if you want to still use a database, you could repeatedly query the notifications table for new notifications as a background task. Or you could use some message queue to convey the notifications (ActiveMQ topics or a RabbitMQ fanout exchange for example).

But now you can tell your users what is going on in the application without them having to do anything.

Published by Jonathan Stowe

I'm a computer programmer and a musician.

One thought on “Day 8: I’ll Let You Know Later

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: