JAX-RSをやってみる (11) - Server Sent Events -

2014-06-21T00:00:00+00:00 Java JAX-RS

公式ドキュメント: https://jersey.java.net/documentation/latest/user-guide.html#sse

Server Sent Eventsにも対応している模様。んまぁドキュメントに書いてる通りにjersey-media-sseなライブラリを参照をすればAutoDiscoverableとかそれによりSseFeatureが作用してServer Sent Eventsなのを利用出来るようになる感じかと

Home.java

package sample.controller;

import java.io.IOException;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;

import org.glassfish.jersey.media.sse.EventOutput;
import org.glassfish.jersey.media.sse.OutboundEvent;
import org.glassfish.jersey.media.sse.SseFeature;

@Path("/sample")
public class Home {

    @Path("events")
    @GET
    @Produces(SseFeature.SERVER_SENT_EVENTS)
    public EventOutput getServerSentEvents() {
        final EventOutput output = new EventOutput();

        new Thread() {
            @Override
            public void run() {
                try {
                    OutboundEvent.Builder builder = new OutboundEvent.Builder();
                    builder.name("sample_event");
                    builder.data(String.class, "hoge");

                    final OutboundEvent event = builder.build();
                    output.write(event);
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        output.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        return output;
    }
}

っていう感じでEventOutputをリソースメソッドから返すなりで、@ProducesでSERVER_SENT_EVENTSなMediaTypeを指定すれば良い模様。

んまぁjersey-media-sseを使ったテストに関しては

package sample.controller;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.Application;

import org.glassfish.jersey.media.sse.EventListener;
import org.glassfish.jersey.media.sse.EventSource;
import org.glassfish.jersey.media.sse.InboundEvent;
import org.glassfish.jersey.test.JerseyTest;
import org.junit.Test;

import sample.SampleApplication;
import static org.junit.Assert.*;
import static org.hamcrest.Matchers.*;

public class HomeTest extends JerseyTest {

    @Override
    protected Application configure() {
        return new SampleApplication();
    }

    @Test
    public void test_index() throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        final List<String> data = new ArrayList<String>(1);

        EventSource es = EventSource.target(target("/sample/events")).build();
        assertThat(es, notNullValue());

        es.register(
            new EventListener() {
                @Override
                public void onEvent(InboundEvent inboundEvent) {
                    try {
                        data.add(inboundEvent.readData());
                    } finally {
                        latch.countDown();
                    }
                }
            },
            // ここで起きないイベント名を指定するとテストはタイムアウトによりずっこける
            "sample_event"
        );
        es.open();
        latch.await(3, TimeUnit.SECONDS);
        es.close();

        assertThat(data, hasSize(1));
        assertThat(data, hasItem("hoge"));
    }
}

てな感じでEventSourceクラスなりEventListenerなりを使ってテストする事が出来る模様で。https://github.com/jersey/jersey/blob/master/examples/sse-item-store-webapp/src/test/java/org/glassfish/jersey/examples/sseitemstore/ItemStoreResourceTest.javaとかにもあるのでそれ参考にすれば良いのではと

SseBroadcasterを使う

上記だけだと面白くないので、SseBroadcasterっていうのを使うとServer Sent Events側にイベントをブロードキャスト出来る仕組みがある模様。例えば

<html>
  <head>
    <script src="jquery.js"></script>
  </head>
  <body>
    <input type="text" id="message" />
    <button onclick="send(document.querySelector("#message").value)">send</button>
    <hr />
    <div id="messages"></div>
    <script>
var messages = document.querySelector("#messages");

var es = new EventSource("/jaxrs/resources/sample/events");
es.addEventListener("sample_event", function(e) {
  console.log(e);

  var elm = document.createElement("div");
  elm.innerText = e.data;

  messages.appendChild(elm);
}, false);

function send(message) {
  $.post("/jaxrs/resources/sample/push", { message: message });
}
    </script>
  </body>
</html>

みたいにテキストで入力した値をPOSTで/sample/pushに送る。で/sample/push側でServer Sent Eventsにブロードキャストする事で指定した値がServer Sent Eventsに通知されてイベントでキャッチ出来るような感じを利用するには

package sample.controller;

import javax.inject.Singleton;
import javax.ws.rs.Consumes;
import javax.ws.rs.FormParam;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.glassfish.jersey.media.sse.EventOutput;
import org.glassfish.jersey.media.sse.OutboundEvent;
import org.glassfish.jersey.media.sse.SseBroadcaster;
import org.glassfish.jersey.media.sse.SseFeature;

@Singleton
@Path("/sample")
public class Home {

    private final SseBroadcaster broadcaster = new SseBroadcaster();

    @Path("events")
    @GET
    @Produces(SseFeature.SERVER_SENT_EVENTS)
    public EventOutput getServerSentEvents() {
        final EventOutput output = new EventOutput();
        broadcaster.add(output);

        return output;
    }

    @Path("push")
    @POST
    @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
    public Response push(@FormParam("message") String message) {
        OutboundEvent event = new OutboundEvent.Builder()
            .name("sample_event")
            .data(String.class, message)
            .build();

        broadcaster.broadcast(event);

        return Response.ok().build();
    }
}

Server Sent Eventsを利用する際に生成したEventOutputをSseBroadcasterに突っ込んでおいて、POSTなりのリクエストでデータを受け取る側のリソースメソッドでbroadcastメソッドを使ってOutboundEventを送る。するとSseBroadcaster側に突っ込んでおいたEventOutputに書き込み処理が行われる。要はPOSTしたりしてデータを処理した後にブロードキャストしたいような要件であればSseBroadcasterを使う事で容易に実現出来る的なAPIかと

ちなみにこの場合のテストは

package sample.controller;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Form;
import javax.ws.rs.core.Response;

import org.glassfish.jersey.media.sse.EventListener;
import org.glassfish.jersey.media.sse.EventSource;
import org.glassfish.jersey.media.sse.InboundEvent;
import org.glassfish.jersey.test.JerseyTest;
import org.junit.Test;

import sample.SampleApplication;
import static org.junit.Assert.*;
import static org.hamcrest.Matchers.*;

public class HomeTest extends JerseyTest {

    @Override
    protected Application configure() {
        return new SampleApplication();
    }

    @Test
    public void test_index() throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        final List<String> data = new ArrayList<String>(1);

        EventSource es = EventSource.target(target("/sample/events")).build();
        assertThat(es, notNullValue());

        es.register(
            new EventListener() {
                @Override
                public void onEvent(InboundEvent inboundEvent) {
                    try {
                        data.add(inboundEvent.readData());
                    } finally {
                        latch.countDown();
                    }
                }
            },
            "sample_event"
        );
        es.open();

        // 送るデータを準備
        Form form = new Form();
        form.param("message", "hoge");

        // データを送信。んでステータスが正常に返ってくるかをテスト
        Response response = target("/sample/push").request().post(Entity.form(form));
        assertThat(response, notNullValue());
        assertThat(response.getStatus(), is(Response.Status.OK.getStatusCode()));

        // Server Sent Eventsにブロードキャストするまでウェイト
        latch.await(3, TimeUnit.SECONDS);
        es.close();


        // その後にデータを検証
        assertThat(data, hasSize(1));
        assertThat(data, hasItem("hoge"));
    }
}

的な感じかと。んまぁ大体はドキュメント通りなので(ry

JAX-RSをやってみる (12) - MVC - JAX-RSをやってみる (10) - AsyncResponse -