目次


WebSocket と SSE による HTTP サーバー Push

Web およびモバイル・アプリケーションのためのリアルタイム・データ・ストリーミング

Comments

HTTP ストリーミングとも呼ばれている HTTP サーバー Push は、クライアントとサーバー間の通信パターンの 1 つです。このパターンでは、クライアントからのリクエストがなくても、HTTP サーバーからクライアントに非同期で情報を送信します。サーバー Push のアーキテクチャーが特に効果を発揮するのは、1 つ以上のクライアントがサーバーから継続的に情報を受信することが要件となる、極めてインタラクティブな Web アプリケーションやモバイル・アプリケーションです。この記事では、HTTP サーバー Push を実装するテクノロジーとして、WebSocket と SSE (Server-Sent Events) の 2 つを取り上げます。

記事ではまず、この 2 つのソリューションの技術的な違いと、それぞれのソリューションが Web アーキテクチャーの中でどのように実装されるのかについて概説します。その上で、サンプル・アプリケーションを使用して SSE 実装をセットアップする方法を説明し、続いて WebSocket を使用した場合の同様の実装を説明します。最後にこの 2 つのテクノロジーを比較し、私の結論として、それぞれのテクノロジーを適用するのに適したタイプの Web アプリケーションまたはモバイル・アプリケーションを指摘します。

この記事では、読者が HTTP サーバー Push の言語と概念を理解していることを前提とします。記事で取り上げる 2 つのサンプル・アプリケーションは、どちらも Python で CherryPy を使用して作成されています。

リクエスト/レスポンス・モデルの限界

Web 上でのクライアントとサーバー間の通信は、従来からリクエスト/レスポンス・モデルに従っています。リクエスト/レスポンス・モデルでは、クライアント (Web ブラウザーなど) がサーバーに対してリソースを要求し、サーバーがそのリクエストに対するレスポンスとして、要求されたリソースを送信します。リソースが利用できない場合や、クライアントがそのリソースに対するアクセス権限を持っていなければ、サーバーはエラー・メッセージを返します。リクエスト/レスポンス・アーキテクチャーでは、クライアントからのリクエストがなければ、サーバーがクライアントにメッセージを送信することは決してありません。

Web アプリケーションがより強力で、よりインタラクティブな形に進化するにつれ、リクエスト/レスポンス・モデルに伴ういくつかの限界が見え始めてきました。インタラクティブなアプリケーションでは、クライアントが更新処理を行わなければならない頻度が増えます。したがって、クライアントはそれだけ頻繁に GET リクエストを送信しなければなりません。ポーリングと呼ばれるこの手法では、ピーク時にサーバーが過負荷状態に陥ると、パフォーマンスの問題が発生します。また、クライアントが送信するリクエストの多くは更新データを要求するものではないため、効率的な手法とは言えません。さらに、クライアントは指定された間隔でしかポーリングできないことから、クライアントの応答性が低下する可能性もあります。

HTTP サーバー Push は、このようなパフォーマンスの問題やポーリングに伴うその他の制限を解決するために出現したテクノロジーです。画像共有サービスなどのインタラクティブな Web アプリケーションでは尚更のこと、新しい更新データが利用可能になるたびに Web サーバーがその更新データをクライアントに送信したほうが効率的です。

WebSocket と SSE の比較

WebSocket と SSE はどちらも従来型のリクエスト/レスポンス・モデルの Web アーキテクチャーに代わる手法ですが、この 2 つは競合するテクノロジーというわけではありません。WebSocket アーキテクチャーでは、クライアントとサーバーとの間でソケットを開き、全二重 (双方向) 通信を可能にします。クライアントは GET メッセージを送信してサーバーからのレスポンスを待つのではなく、単にソケットを listen し続けます。サーバーからの更新データを受信すると、クライアントはそのデータを使って各種のやりとりを開始またはサポートします。クライアントがソケットを使用して、サーバーと通信することもできます。例えば、更新データを正常に受信したときに、クライアントはソケットを使ってサーバーに ACK メッセージを送信します。

SSE は、HTML5 の拡張機能として開発された、HTTP サーバー Push 方式を単純化した標準です。SSE の場合、サーバーからクライアントに非同期メッセージを送信することはできますが、クライアントがサーバーにメッセージを送信することはできません。SSE の半二重通信モデルは、クライアントがサーバーからストリーミング配信される更新データを受信すればよいだけのアプリケーションに最適です。SSE が WebSocket に勝る点として、SSE を HTTP で動作させるために追加のコンポーネントが不要である点があります。

一方、さまざまな目的でクライアントとサーバー間の通信が必要となる多目的 Web アプリケーションには、明らかに WebSocket が適しています。SSE が適しているのは、サーバーからクライアントへの方向にだけ非同期データをストリーミング配信し、クライアントからのレスポンスは必要のないアプリケーションです。

ブラウザー・サポート

HTTP プロトコルを比較する場合、ブラウザー・サポートは欠かすことのできない比較基準です。表 1 のデータを参照すると、WebSocket プロトコルは最近のすべてのブラウザーでサポートされていることがわかります。これには、モバイル・ブラウザーも含まれます。SSE は、Microsoft IE および Edge ではサポートされていません。

表 1. 2017 年 1 月の時点でのブラウザー使用状況
2017ChromeIE/EdgeFirefoxSafariOpera
1 月73.7%4.9%15.4%3.6%1.0%

考えられるすべてのブラウザーで動作する必要があるアプリケーションには、現在のところ、WebSocket のほうが有効な選択肢です。

開発の労力

プロトコルを比較する際は開発の労力も比較基準になります。これは、WebSocket や SSE のような新しいプロトコルでは特に言えることです。ここで言う「労力」とは、その特定のプロトコルを使用してアプリをコーディングするために必要なコードの行数、あるいは時間を指します。この比較基準は、時間的に厳しい制約があるプロジェクトや開発予算が限られているプロジェクトには特に重要です。

SSE を実装する際の開発の労力は、WebSocket に比べるとわずかです。HTML5 で作成されているアプリであればどのアプリでも、SSE を有効にするための主な作業は、サーバーからクライアントに送信するメッセージに HTTP ヘッダーを追加することだけです。正しいヘッダーであれば、クライアントは自動的にそのメッセージを SSE として認識します。WebSocket とは異なり、SSE を使用する場合は、サーバーとクライアントの間でソケット接続を確立して維持する必要はありません。

WebSocket プロトコルを使用する場合は、クライアント接続を listen するサーバー・サイドのソケットを構成する必要があります。ただし、ソケットを構成しさえすれば、クライアントはサーバー・サイドのソケットを自動的に開き、非同期で送信される可能性があるメッセージを待機します。また、それぞれのアプリケーションで独自のメッセージ・フォーマット、キープ・アライブ (ハートビート) などを定義することもできます。

表 2 に、SSE および WebSocket プロトコルの利点と欠点を要約します。

表 2. SSE と WebSocket の比較
SSEWebSocket
通信のタイプ半二重 (片方向)全二重 (双方向)
ブラウザー・サポート現在、Microsoft 製ブラウザーでは使用不可主要なすべてのブラウザーで使用可能
開発の労力軽度: 特定のヘッダーを追加した HTTP メッセージを送信するだけです。中度: TCP ソケット通信を確立して維持する必要があります。さらに、サーバー・サイドのリスナー・ソケットも必要になります。

次は、単純なサンプル Web アプリケーションを用いて、この 2 つのテクノロジーのそれぞれがどのように機能するのかを見ていきましょう。

SSE アプリケーションの開発

SSE は、HTTP だけを使用して非同期メッセージを配信する HTML5 標準です。WebSocket とは異なり、SSE ではバックエンドでサーバー・ソケットを作成する必要も、フロントエンドで接続を開く必要もありません。このことが、複雑さの大幅な削減に直結します。

SSE フロントエンド

リスト 1 に、SSE を使用した単純な HTTP サーバー Push アプリケーションの UI コードを記載します。

リスト 1. SSE フロントエンド
var source = new EventSource(“/user-log-stream”);
source.onmessage = function(event) {
    var message = event.data;
    // do stuff based on received message
};

EventSource は、サーバーへの HTTP 接続を作成するインターフェースです。サーバーはイベント・ストリーム (UTF-8 でエンコードされた単純なテキスト・データのストリーム) という形で、クライアントにメッセージを送信します。HTTP 接続が確立されると、サーバーが更新データを送信できるように接続が特定のメッセージ・ストリームに対して開いたままの状態に維持されます。リスト 1 で作成している HTTP 接続は、/#tabs/user-log-stream URI に関連するイベントを受信するためのものです。

SSE バックエンド

バックエンド上には、URL /user-log-stream のディスパッチャーを作成します。このディスパッチャーが、フロントエンドからの接続リクエストを受信し、通信を開始するためにフロントエンドに非同期メッセージを送信します。SSE クライアントがサーバーにメッセージを送信することはできません。

リスト 2 のコード・スタブが、このバックエンドを説明しています。

リスト 2. SSE バックエンド
import cherrypy

class UserLogStream
    messages = []

    @cherrypy.expose
    def stream(self):
        cherrypy.response.headers["Content-Type"] = "text/event-stream"
        while True:
            if len(messages) > 0:
                for msg in messages:
                    data = “data:” + msg + “\n\n”
                    yield data
                 messages = []

routes_dispatcher = cherrypy.dispatch.RoutesDispatcher()
routes_dispatcher.connect(‘user-log-stream’, ‘/’, controller = UserLogStream(), action=’stream’)

/user-log-stream URI に接続するすべての EventSource には、UserLogStream クラスの stream メソッドが割り当てられます。保留中のメッセージはすべて、その宛先に接続されている EventSource に送信されます。注意する点として、メッセージ・フォーマットは任意ではない点があります。SSE プロトコルでは、メッセージが data: で始まり、\n\n で終わることを要件としています。このチュートリアルの例で使用しているメッセージ・フォーマットは HTTP ですが、JSON やその他のフォーマットを使ったメッセージを送信することも可能です。

以上は非常に基本的な SSE の実装例ですが、このプロトコルの単純さがわかります。

WebSocket アプリケーションの開発

SSE の例と同じく、この WebSocket アプリケーションも CherryPy という Python Web フレームワークをベースに作成されています。バックエンド Web サーバーは、このリンク先のプロジェクト WoK です。ソケット接続を処理するために使用する python-websockify ライブラリー・プラグインは、このリンク先のプロジェクト Kimchi の一部となっています。図 1 に、サンプル WebSocket アプリケーションのアーキテクチャーを示します。ご覧のように、SSE よりも遥かに複雑なアーキテクチャーです。

図 1. WebSocket アプリケーションのアーキテクチャー

このアーキテクチャーを構成するコンポーネントは、以下のとおりです。

  • プロジェクト WoK: Push サーバーによってブロードキャストされるメッセージを提供する、バックエンドの Python ロジック。
  • Websockify プロキシー: python-websockify ライブラリーによって実装されるプロキシー。このプロキシーが、Unix ソケットを使用して Push サーバーからメッセージを受信し、UI 上で接続先 WebSocket の代わりを務めます。
  • Push サーバー: 接続されているクライアントにバックエンドのメッセージをブロードキャストする、通常の Unix ソケット・サーバー。
  • フロントエンド: Websockify プロキシーとの WebSocket 接続を確立し、サーバー・メッセージを listen する UI。メッセージによっては、リストを更新する、メッセージをユーザーに表示するなどといった特定のアクションが取られます。

Websockify プロキシー

Websockify プロキシーは、python-websockify ライブラリーによって実装されます。このプロキシーを初期化するには、CherryPy Web サーバー・エンジンを使用します (リスト 3 を参照)。

リスト 3. Websockify プロキシー
    params = {'listen_host': '127.0.0.1',
              'listen_port': config.get('server', 'websockets_port'),
              'ssl_only': False}

    # old websockify: do not use TokenFile
    if not tokenFile:
        params['target_cfg'] = WS_TOKENS_DIR

    # websockify 0.7 and higher: use TokenFile
    else:
        params['token_plugin'] = TokenFile(src=WS_TOKENS_DIR)

    def start_proxy():
        try:
            server = WebSocketProxy(RequestHandlerClass=CustomHandler,
                                    **params)
        except TypeError:
            server = CustomHandler(**params)

        server.start_server()

    proc = Process(target=start_proxy)
    proc.start()
    return proc

Websockify プロキシーは以下に示されているように、登録されているすべてのトークンを自身の WebSocket URI にバインドします。

リスト 4. 登録済みトークンのバインド
def add_proxy_token(name, port, is_unix_socket=False):
    with open(os.path.join(WS_TOKENS_DIR, name), 'w') as f:
        """
        From python documentation base64.urlsafe_b64encode(s)
        substitutes - instead of + and _ instead of / in the
        standard Base64 alphabet, BUT the result can still
        contain = which is not safe in a URL query component.
        So remove it when needed as base64 can work well without it.
        """
        name = base64.urlsafe_b64encode(name).rstrip('=')
        if is_unix_socket:
            f.write('%s: unix_socket:%s' % (name.encode('utf-8'), port))
        else:
            f.write('%s: localhost:%s' % (name.encode('utf-8'), port))

以下のコマンドは、myUnixSocket という Websockify プロキシー・エントリーを追加します。このエントリーが、Unix ソケット /run/my_socket への WebSocket 接続をプロキシーします。

add_proxy_token(‘myUnixSocket’, ‘/run/my_socket’, True)

UI で Unix ソケットとの WebSocket 接続を開くには、以下の URI を使用します。

wss://<server_address>:<port>/websockify?token=<b64encodedtoken>

上記の URI に含まれる b64encodedtoken は、myUnixSocket 文字列から「=」文字を除いた base64 値です。この構成の詳細については、このリンク先のページに掲載されている WebSocket プロキシー・モジュールを参照してください。

Push サーバー

Push サーバーには 2 つの要件があります。

  1. 複数の接続を同時に処理できること
  2. 接続されているすべてのクライアントに同じメッセージをブロードキャストできること

この例では、Push サーバーがブロードキャスト・エージェントの役割を果たします。ブロードキャスト機能は設けているものの、クライアント側からメッセージを受信することは想定していません。

リスト 5 に、Push サーバーの初期バージョンに該当する Python コードを記載します (pushserver モジュールの最終バージョンについては、このリンク先の GitHub 上にあるプロジェクト WoK を参照してください)。

リスト 5. WebSocket Push サーバー
class PushServer(object):

    def __init__(self):
        self.set_socket_file()

        websocket.add_proxy_token(TOKEN_NAME, self.server_addr, True)

        self.connections = []

        self.server_running = True
        self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET,
                                      socket.SO_REUSEADDR, 1)
        self.server_socket.bind(self.server_addr)
        self.server_socket.listen(10)
        wok_log.info('Push server created on address %s' % self.server_addr)

        self.connections.append(self.server_socket)
        cherrypy.engine.subscribe('stop', self.close_server, 1)

        server_loop = threading.Thread(target=self.listen)
        server_loop.setDaemon(True)
        server_loop.start()


    def listen(self):
        try:
            while self.server_running:
                read_ready, _, _ = select.select(self.connections,
                                                 [], [], 1)
                for sock in read_ready:
                    if not self.server_running:
                        break

                    if sock == self.server_socket:

                        new_socket, addr = self.server_socket.accept()
                        self.connections.append(new_socket)
                    else:
                        try:
                            data = sock.recv(4096)
                        except:
                            try:
                                self.connections.remove(sock)
                            except ValueError:
                                pass

        except Exception as e:
            raise RuntimeError('Exception occurred in listen() of pushserver '
                               'module: %s' % e.message)


    def send_notification(self, message):
        for sock in self.connections:
            if sock != self.server_socket:
                    sock.send(message)

Unix ソケットは、PushServer() クラスの __init__() によって初期化されて接続を listen します。このコードでは、WebSocket プロキシー内にトークンも追加して、フロントエンドが WebSocket 接続の URI にトークンを含めるようにしています。

サーバー Push アーキテクチャーを実装するにはいくつもの方法がありますが、ブロードキャストに対応するサーバー Push を実装するには、単一のリスナー・スレッド (select.select()) を使用して既知の接続を制御する方法が最も直感的だと思います。この場合、listen() メソッドがデーモン・スレッド内で実行されて、既存の接続を管理します。select.select メソッドはノンブロッキング関数として実行されます。この関数が終了するのは、self.connections 配列に含まれるソケットが読み取り可能な状態になった時点、または 1 秒経過してタイムアウトになった時点です。新しい接続が作成されると、その接続は self.connections 配列に挿入されます。接続が閉じられると、その接続が配列から削除されます。

listen() メソッドがソケットを配列から削除するのは、recv() 呼び出しの except ブロック内でソケットが閉じられていることを検出した場合、あるいは CLOSE メッセージを受信した場合です。send_notification() メソッドは、メッセージを、そのメッセージを受信するためにサブスクライブしているすべてのクライアントにブロードキャストします。上記のコードでは、クライアントがサブスクライブ解除した時点でソケット接続を閉じるために、except ブロックを使用しています。listen() を単独で使用してクライアントとサーバー間のソケット接続を閉じることはできません。そのようにしてソケット接続を閉じると、send_notification() メソッド内でパイプ切断エラーが発生することになるためです。

これについては、次のセクションで詳しく説明します。

WebSocket フロントエンドのトラブルシューティング

バックエンドをセットアップして構成するのは比較的簡単なことですが、フロントエンドのコーディングとなると、それほど簡単にはいきません。ブラウザーとの接続を開いておくか閉じておくかの決定から、特定の API の扱い方の違いまで、WebSocket フロントエンドには対処しなければならない課題がいくつかあります。このセクションでこれらの課題に対処する中で、バックエンド・コードの特定の要素を振り返ることになります。

最初に決めなければならないのは、UI 全体に対して単一の WebSocket 接続を持続させるのか、複数の WebSocket 接続をセットアップしてオンデマンドで開くのかです。まずは、複数のオンデマンド WebSocket 接続をセットアップする場合について考えましょう。考え方としては、UI がバックエンドからの特定のメッセージ (例えば、特定のタスクが完了したことなど) を待機する場合にだけ接続を開きます。非同期メッセージが期待されていなければ、接続する必要はありません。この方法では、UI とバックエンドの両方でリソースの使用が抑えられることになりますが、その代わり、WebSocket を開くごとに、そのライフサイクルを管理する必要が生じます。

UI でサーバーからの大量の非同期メッセージを受信することになるとしたら、単一の WebSocket 接続を持続させるほうが賢明な方法です。この持続接続を通知ストリームとして使用すれば、関連する UI コードで着信メッセージを listen し、受信したメッセージに対してアクションをとることができます。

どちらのソリューションにもそれぞれにふさわしい役割があるので、これから、2 つのソリューションを順に試してみます。重要な点は、開発する特定のアプリケーションにとって最適な手法を用いることです。

複数の WebSocket 接続

WebSocket を開く、閉じるということは、コンストラクターを呼び出して WebSocket 接続を開くか、close() メソッドを呼び出すというだけのことです。けれどもページ更新などに対するブラウザーの動作の違いによって、事態が複雑化する場合もあります。WebSocket を開くたびに、WebSocket プロキシーとの接続を確立し、続いてバックエンド上の実際の Push サーバーとの接続を確立します。この接続は閉じられるまで、開いたままの状態に維持します。以下のコードを見てください。

リスト 6. 受信したメッセージごとに呼び出される refreshInterface()
notificationsWebSocket = new WebSocket(url, ['base64']);
notificationsWebSocket.onmessage = function(event) {
    refreshInterface();
};

リスト 6 の onmessage は、ソケット内で新しいメッセージを受信するたびに呼び出されるイベント・ハンドラーです。refreshInterface() 関数は、メッセージが到着すると呼び出されます。複数の notificationsWebSocket が開いたままになっていると、refreshInterface() も繰り返し呼び出されることになります。

必要以上の WebSocket 接続を開くと、明らかに、フロントエンドとバックエンドの両方のリソースを無駄にすることになります。さらに、意図されない動作の要因にもなりかねません。このような問題を回避するには、必要以上の WebSocket 接続を開かないようにして、既存の WebSocket 接続を不要になった時点で閉じるようにします。WebSocket は、手作業で閉じることも、ブラウザーによって閉じることもできます。私がテスト用のブラウザーとして使った Chrome と Firefox ではいずれも、ブラウザー・ウィンドウを更新する、閉じる、あるいはドメインを変更するといった操作を行うと、開いている WebSocket 接続がブラウザーによって閉じられます。

WebSocket が閉じられる時点で、2 つの処理が行われます。

  1. onclose という名前のイベント・ハンドラーが呼び出されて、追加のクリーンアップ処理が行われます。
  2. 接続を閉じる側 (この場合は、UI WebSocket) がもう一方の側に接続終了を警告するために、終了ハンドシェークを開始します。このプロセスの一環として、値が 0x8 に設定された終了フレームが送信されます。バックエンドはこのフレームを受信した時点で、データの送信を停止し、自身の側のソケット接続を即時に閉じることになります。このプロセスは、onclose ハンドラー内に構成されているクリーンアップのタイプに関わらず行われます。

理想的には、onclose ハンドラーが呼び出されて、Push サーバーが終了フレームを受信するはずですが、この段階でテストした実装では、あいにくそのようにはなりませんでした。

Websockify プロキシーと終了フレーム

このチュートリアルを作成している時点で、Websockify プロキシー・プラグイン (v0.8.0-2) は Unix ソケットにも Push サーバーにも終了フレームを転送しません。そのため、UI ソケット内での接続を閉じることはできても、Push サーバーはそれを認識せずに、サーバー側の接続を開いたままの状態に維持します。

Push サーバーが新しいメッセージを、接続が閉じられたソケットを含むすべての接続にブロードキャストしようとすると、パイプ切断エラーを受け取ります。適切に処理しなければ、これが原因で Push サーバーが終了する可能性があります。ソケットが発行する send_notification メッセージは以下のとおりです。

リスト 7. パイプ切断エラーを引き起こす send_notification
    def send_notification(self, message):
        for sock in self.connections:
            if sock != self.server_socket:
                    sock.send(message)

この状況に対処する 1 つの方法は、sock.send() 呼び出しを try/except でラップしてから、パイプ切断の例外を処理することです。あるいは、メッセージをバックエンド・サーバーに送信する onclose イベント・ハンドラーを使用して、固有の終了ハンドシェークを作成するという方法もあります。この場合、Push サーバー側がソケット接続を終了するので、send_notification メソッドに変更を加える必要はありません。

このソリューションを試してみましょう。

onclose を使用して終了メッセージを送信する

onclose イベント・ハンドラーが呼び出されるのは WebSocket 接続が終了される直前なので、その時点では接続はまだ存続しています。そこで私たちが考えたのは、終了メッセージを Push サーバーに送信し、フロントエンド上でソケットが閉じられることを知らせることで、サーバーが正常に接続を終了できるようにするという方法です。つまり基本的には、終了フレームの役割を終了ハンドシェーク内でエミュレーションすることになります。

リスト 8 では WebSocket が、接続を閉じる直前に Push サーバーに CLOSE メッセージを送信します。

リスト 8. onclose イベント内での CLOSE メッセージの送信
notificationsWebSocket = new WebSocket(url, ['base64']);
(...)
notificationsWebSocket.onclose = function() {
    notificationsWebSocket.send(window.btoa(‘CLOSE’));
};

Push サーバー内では、UI からのすべての着信メッセージを listen し、CLOSE メッセージを受信したかどうかを確認します。受信している場合は、直ちにソケットを閉じます。

リスト 9. CLOSE に対する Push サーバーの応答
    def listen(self):
        try:
            while self.server_running:
                read_ready, _, _ = select.select(self.connections,
                                                 [], [], 1)
                for sock in read_ready:
                    if not self.server_running:
                        break

                    if sock == self.server_socket:

                        new_socket, addr = self.server_socket.accept()
                        self.connections.append(new_socket)
                    else:
                        try:
                            data = sock.recv(4096)
                        except:
                            try:
                                self.connections.remove(sock)
                            except ValueError:
                                pass

                            continue
                        if data and data == 'CLOSE':
                            try:
                                self.connections.remove(sock)
                            except ValueError:
                                pass
                            sock.close()

WebSocket が閉じられて CLOSE メッセージが送信される前に onclose を呼び出せば、send_notification ルーチン内でのパイプ切断エラーが防止されるはずです。onclose は Firefox 内ではそのように機能しますが、テストの結果、Chrome 内でアプリケーションを開くと、send_notification メソッド内で相変わらずパイプ切断エラーが発生することがわかりました。その原因は、Google Chrome 内での未解決の問題にあります。

単一の持続される WebSocket 接続

onclose と終了フレームに問題があることから、私たちは単一の WebSocket 接続を持続させるというソリューションに目を向けることになりました。単一の接続を確立し、ブラウザーが (ウィンドウを閉じる、ページを再読み込みする、別の URL を表示するなどして) ページを離れる場合だけその接続を閉じるようにするには、パイプ切断が発生した時点で Push サーバーが接続を閉じればよいだけです。接続を一本化したほうが、プロジェクトWoK の保守が容易になるだけでなく、WoK プラグインによって、WebSocket 戦略をゼロから実装する作業から解放されます。

リスト 10 に、変更後の send_notification メッセージを記載します。パイプ切断エラーに対処するために、有効な接続のリストからソケットを削除してから閉じている点に注意してください。

リスト 10. 単一の接続を持続させる場合の send_notification
        def send_notification(self, message):
        for sock in self.connections:
            if sock != self.server_socket:
                try:
                    sock.send(message)
                except IOError as e:
                    if 'Broken pipe' in str(e):
                        sock.close()
                        try:
                            self.connections.remove(sock)
                        except ValueError:
                            pass

ハートビートを構成する

複数の接続を使用する戦略では、接続を開いて使用した後、すぐに接続を閉じます。単一の持続接続を使用する場合、何も送信されないとしても、接続が開いたままの状態を無期限に維持する必要があります。けれども Chrome と Firefox 内でテストした結果、WebSocket 接続は一定のアイドル状態が続くとタイムアウトになって、接続が終了されます。

ブラウザーのタイムアウトを回避するために、定期的に Push サーバーに送信される単純なハートビート・メッセージを実装することにしました。WebSocket を存続させるこのメッセージに対して、Push サーバーが応答する必要はありません。デフォルトのタイムアウトは 300 秒となっているようですが、あらゆるブラウザーが同じタイムアウトを実装していると想定することはできません。無難な策として、ハートビート・メッセージの送信間隔を 30 秒に設定しました (リスト 11 を参照)。

リスト 11. タイムアウト間隔
    notificationsWebSocket = new WebSocket(url, ['base64']);
    var heartbeat = setInterval(function() {
        notificationsWebSocket.send(window.btoa('heartbeat'));
    }, 30000);

この単純なタイムアウトは WebSocket 接続を存続させながらも、Push サーバーに大量のハートビート・メッセージを送りつけることはありません。

リスナーを追加する

単一の持続接続が確立されていれば、単純なリスナー戦略を実装することで、サンプル・アプリケーション内の他のあらゆる UI コードでメッセージを listen できます。そのための設計は以下のとおりです。

  1. リスナーを特定のメッセージ・フォーマットにバインドします。このサンプル・アプリケーションは多数の通知メッセージを送信します。メッセージにバインドすることで、特定のメッセージだけを対象としたリスナーを登録できます。
  2. Push サーバーからメッセージを受信すると、メインの WebSocket チャネルがその内容を検証し、そのメッセージにバインドされているすべてのリスナーを呼び出します。
  3. WebSocket チャネルがリスナーのクリーンアップを開始します。URI が変更された場合は、リスナーの使用を中止する必要があります。例えば、URI /#tabs/user-log「Activity Log (アクティビティー・ログ)」タブに表示されるリスナーは、ユーザーが URI /#tabs/settings「Settings (設定)」タブを参照した時点で使用を中止しなければなりません。クリーンアップのバインド先は、URL (location.hash) が変更されるたびにトリガーされる hashchange です。このプロセスが必要となるのは 1 回だけなので、このプロセスを実行するための $.one()’ jQuery 呼び出しも 1 回設定するだけで済みます。

サンプル・アプリケーション内でリスナーを処理するためのコードは以下のとおりです。

リスト 12. プロジェクト WoK のリスナー構成
wok.notificationListeners = {};
wok.addNotificationListener = function(msg, func) {
    var listenerArray = wok.notificationListeners[msg];
    if (listenerArray == undefined) {
        listenerArray = [];
    }
    listenerArray.push(func);
    wok.notificationListeners[msg] = listenerArray;
    $(window).one("hashchange", function() {
        var listenerArray = wok.notificationListeners[msg];
        var del_index = listenerArray.indexOf(func);
        listenerArray.splice(del_index, 1);
        wok.notificationListeners[msg] = listenerArray;
    });
};

メッセージを該当するリスナーに送信する方法は以下のとおりです。

リスト 13. リスナー通知
wok.startNotificationWebSocket = function () {
    wok.notificationsWebSocket = new WebSocket(url, ['base64']);
    wok.notificationsWebSocket.onmessage = function(event) {
        var message  = window.atob(event.data);
         if (message === "") {
                continue;
        }
            
        var listenerArray = wok.notificationListeners[message];
        if (listenerArray == undefined) {
            continue;
        }
        for (var i = 0; i < listenerArray.length; i++) {
            listenerArray[i](message);
        }
    }
 };

以上のコードを実装したら、ソリューションの完成まであとわずかです。けれどもその前に、対処しなければならない問題がもう 1 つ残っています。

マージされたメッセージ

単一の接続が有効に機能するようになりましたが、この実装は、フロントエンド上でメッセージがどのように受信されるかについて根拠のない前提で成り立っています。Push サーバーがたて続けに 2 つのメッセージ (message1 の後に message2) を送信したとしたらどうなるでしょうか?このコードでは、WebSocket 内でメッセージの内容が読み取られて onmessage イベントが 2 回トリガーされることを前提としていますが、必ずしもそうなるとは限りません。それどころか、受信したメッセージがマージされて message1message2 となることが考えられます。

メッセージをマージするのは、一般的な TCP ソケットの動作です。つまり、2 つ以上のメッセージが短時間で連続して送信されると、受信側ソケットはそれらのメッセージをキューに入れるため、onmessage がそれらすべてのメッセージに対して 1 回だけトリガーされることになります。このような動作が想定外だとすると、このコードは破たんしてしまいます。この問題に対するソリューションは、あるメッセージが終了して別のメッセージが開始する場所をアプリケーションが判別できるようにするメッセージング・フォーマットを定義することです。

そのためによく使われている方法の 1 つは、TLV (Type-Length-Value の略) フォーマットです。このフォーマットでは、メッセージに type、length、value という 3 つのフィールドがこの順で含まれます。type と length は固定長のフィールドです。value は可変長のフィールドで、その長さは length フィールドによって宣言されます。type フィールドは、メッセージのタイプ (文字列、ブール値、バイナリー、整数など) を区別するために使用できます。また、アプリケーション固有の目的 (警告メッセージ、通知メッセージ、エラー・メッセージ) に使用することもできます。

ただし、このサンプル・アプリケーションに使用するには、TLV は過剰です。サンプル・アプリケーションでのメッセージは常に文字列であるため、type フィールドを使用する必要はありません。メッセージが連結されるという問題を解決するためには、固定長のフィールドを追加して、メッセージに属する chars の数を宣言するという方法があります。さらに単純な方法としては、送信するすべてのメッセージにメッセージ終了マーカーを追加することもできます。フロントエンドがメッセージ・バッファーを受信した後、このメッセージ・マーカーを使ってそのバッファーを解析し、個々のフロントエンド・メッセージに分割するという仕組みです。

リスト 14 に、メッセージ終了マーカーを追加して変更した send_notification メソッドを記載します。

リスト 14. send_notification に追加されたメッセージ終了マーカー
END_OF_MESSAGE_MARKER = '//EOM//'

    def send_notification(self, message):
        message += END_OF_MESSAGE_MARKER
        for sock in self.connections:
            if sock != self.server_socket:
                try:
                    sock.send(message)
                except IOError as e:
                    if 'Broken pipe' in str(e):
                        sock.close()
                        try:
                            self.connections.remove(sock)
                        except ValueError:
                            pass

リスト 14 では //EOM// という文字列を使用していますが、メッセージの終了を示すには、有効なメッセージに含めることができるものであれば、どの文字列にしても構いません (例えば、end など)。

リスト 15 に、メッセージ終了の解析を追加して完成させたフロントエンド・コードを記載します。完全なソースについては、このリンク先のプロジェクト WoK を参照してください。

リスト 15. WebSocket フロントエンド
wok.notificationListeners = {};
wok.addNotificationListener = function(msg, func) {
    var listenerArray = wok.notificationListeners[msg];
    if (listenerArray == undefined) {
        listenerArray = [];
    }
    listenerArray.push(func);
    wok.notificationListeners[msg] = listenerArray;
    $(window).one("hashchange", function() {
        var listenerArray = wok.notificationListeners[msg];
        var del_index = listenerArray.indexOf(func);
        listenerArray.splice(del_index, 1);
        wok.notificationListeners[msg] = listenerArray;
    });
};

wok.notificationsWebSocket = undefined;
wok.startNotificationWebSocket = function () {
    var addr = window.location.hostname + ':' + window.location.port;
    var token = wok.urlSafeB64Encode('woknotifications').replace(/=*$/g, "");
    var url = 'wss://' + addr + '/websockify?token=' + token;
    wok.notificationsWebSocket = new WebSocket(url, ['base64']);

    wok.notificationsWebSocket.onmessage = function(event) {
        var buffer_rcv = window.atob(event.data);
        var messages = buffer_rcv.split("//EOM//");
        for (var i = 0; i < messages.length; i++) {
            if (messages[i] === "") {
                continue;
            }
            var listenerArray = wok.notificationListeners[messages[i]];
            if (listenerArray == undefined) {
                continue;
            }
            for (var j = 0; j < listenerArray.length; j++) {
                listenerArray[j](messages[i]);
            }
        }
    };

    var heartbeat = setInterval(function() {
        wok.notificationsWebSocket.send(window.btoa('heartbeat'));
    }, 30000);

};

まとめ

サーバーからクライアントへの方向でのみ非同期メッセージをストリーミング配信できればよい Web アプリケーションには、SSE が簡潔で単純なソリューションになります。ただし、SSE は半二重の HTTP ソリューションであるため、クライアントからサーバーにメッセージをストリーミング配信することはできません。さらに、このチュートリアルを作成している時点では、SSE をサポートしている Microsoft 製ブラウザーは 1 つもありません。このことが SSE を選択できない決定的要因となるかどうかは、アプリケーションの対象ユーザーによって決まります。

WebSocket は SSE よりも複雑で要件が厳しいものの、全二重 TCP 接続であるため、より幅広いアプリケーション・シナリオに有効です。WebSocket は、最近の Web フレームワークのほとんどでサポートされており、主要な Web ブラウザーとモバイル・ブラウザーのすべてと互換性を持ちます。このチュートリアルでは取り上げませんでしたが、ゼロからコードを作成するのではなく、Tornado のようなサーバー・フレームワークを使用して迅速に Push サーバーを構成することもできます。

SSE はより単純で時間のかからないソリューションですが、拡張性に欠けています。Web アプリケーションの要件が変わり、例えばフロントエンドがバックエンドとやりとりしなければならなくなった場合には、WebSocket を使用してアプリケーションをリファクタリングすることになるでしょう。準備作業は多くなりますが、WebSocket は用途が広く、拡張性の高いフレームワークです。新しい機能を徐々に追加していく複雑なアプリケーションには、WebSocket のほうが適しています。


ダウンロード可能なリソース


関連トピック


コメント

コメントを登録するにはサインインあるいは登録してください。

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=Web development, Information Management, Mobile development
ArticleID=1050371
ArticleTitle=WebSocket と SSE による HTTP サーバー Push
publish-date=09282017