服务器发送事件(Server-sent Events)是一种服务器向客户端发送事件和数据的单向通讯。Server-Sent 事件指的是网页自动获取来自服务器的更新。以前也可能做到这一点,前提是网页不得不询问是否有可用的更新。通过服务器发送事件,更新能够自动到达。

优缺点对比

WebSocket

  • 优点:双工通信
  • 缺点:需专门定义数据协议,解析数据流,且部分服务器支持不完善,后台例如java spring boot 2.1.2 仅支持websocket 1.0(最高已达1.3)

SSE

  • 优点:开发简单,和传统的http开发几乎无任何差别,客户端开发简单,有标准支持(EventSource)
  • 缺点:和websocket相比,只能单工通信,建立连接后,只能由服务端发往客户端,且占用一个连接,如需客户端向服务端通信,需额外打开一个连接

Springboot的Server实现

/**
 * @author blog.unclezs.com
 * @date 2021/07/17
 */
@RestController
@SpringBootApplication
public class SseServer implements ApplicationContextAware {
  private ApplicationContext context;


  public static void main(String[] args) {
    SpringApplication.run(SseServer.class, args);
  }

  @GetMapping(value = "/sse/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public ResponseEntity<SseEmitter> sseServer(@PathVariable(value = "id") String id) {
    // 防止nginx缓存请求
    HttpHeaders httpHeaders = new HttpHeaders();
    httpHeaders.set("X-Accel-Buffering", "no");
    httpHeaders.setCacheControl(CacheControl.noCache());
    SseService manager = context.getBean(SseService.class);
    SseEmitter emitter = manager.registerSseEmitter(id);
    return ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).headers(httpHeaders).body(emitter);
  }

  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.context = applicationContext;
    new Thread(() -> {
      while (true) {
        try {
          // 模拟推送消息
          context.getBean(SseService.class).sendMessage("123", String.format("sse消息:【%s】", DateUtil.now()));
          Thread.sleep(5000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).start();
  }
}
/**
 * @author blog.unclezs.com
 * @date 2021/07/17
 */
@Service
public class SseServiceImpl implements SseService {
  private static final Map<String, SseEmitter> SSE_EMITTERS = new HashMap<>();

  public SseEmitter registerSseEmitter(String id) {
    SseEmitter emitter = new SseEmitter(60L * 1000L);
    emitter.onCompletion(() -> System.out.println("SseEmitter is completed"));
    emitter.onTimeout(() -> System.out.println("SseEmitter is timed out"));
    emitter.onError((ex) -> System.out.println("SseEmitter got error:" + ex.getMessage()));
    SSE_EMITTERS.put(id, emitter);
    return emitter;
  }

  @Override
  public void sendMessage(String id, Object data) {
    SseEmitter emitter = SSE_EMITTERS.get(id);
    if (emitter == null) {
      return;
    }
    System.out.println("发送消息:" + id + " data: " + data);
    try {
      emitter.send(data);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

客户端实现

Okhttp版本

/**
 * @author blog.unclezs.com
 * @date 2021/07/17
 */
public class OkSseClient {
  public static void main(String[] args) {
    // 定义see接口
    Request request = new Request.Builder().url("http://127.0.0.1:8080/sse/123").build();
    OkHttpClient okHttpClient = new OkHttpClient.Builder()
            .connectTimeout(1, TimeUnit.DAYS)
            .readTimeout(1, TimeUnit.DAYS)//这边需要将超时显示设置长一点,不然刚连上就断开,之前以为调用方式错误被坑了半天
            .build();

    // 实例化EventSource,注册EventSource监听器
    RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() {
      private long callStartNanos;

      private void printEvent(String name) {
        long nowNanos = System.nanoTime();
        if (name.equals("callStart")) {
          callStartNanos = nowNanos;
        }
        long elapsedNanos = nowNanos - callStartNanos;
        System.out.printf("=====> %.3f %s%n", elapsedNanos / 1000000000d, name);
      }

      @Override
      public void onOpen(EventSource eventSource, Response response) {
        printEvent("onOpen");
      }

      @Override
      public void onEvent(EventSource eventSource, String id, String type, String data) {
        printEvent("onEvent");
        //请求到的数据
        System.out.println(data);
      }

      @Override
      public void onClosed(EventSource eventSource) {
        printEvent("onClosed");
      }

      @Override
      public void onFailure(EventSource eventSource, Throwable t, Response response) {
        t.printStackTrace();
        //这边可以监听并重新打开
        printEvent("onFailure");
      }
    });
    //真正开始请求的一步
    realEventSource.connect(okHttpClient);
  }
}

Java原生实现

/**
 * @author blog.unclezs.com
 * @date 2021/07/17
 */
public class SseClient {

  /**
   * 获取SSE输入流。
   *
   * @param urlPath /
   * @return /
   * @throws IOException /
   */
  public static InputStream getSseInputStream(String urlPath) throws IOException {
    URL url = new URL(urlPath);
    HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
    // 这儿根据自己的情况选择get或post
    urlConnection.setRequestMethod("GET");
    urlConnection.setDoOutput(true);
    urlConnection.setDoInput(true);
    urlConnection.setUseCaches(false);
    urlConnection.setRequestProperty("Connection", "Keep-Alive");
    urlConnection.setRequestProperty("Charset", "UTF-8");
    //读取过期时间(很重要,建议加上)
    urlConnection.setReadTimeout(20 * 1000);
    // text/plain模式
    urlConnection.setRequestProperty("Content-Type", "text/plain; charset=UTF-8");
    InputStream inputStream = urlConnection.getInputStream();
    return new BufferedInputStream(inputStream);
  }

  /**
   * 读取数据。
   *
   * @param is                /
   * @param sseMessageHandler /
   * @throws IOException /
   */
  public static void readStream(InputStream is, SseMessageHandler sseMessageHandler) throws IOException {
    try {
      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
      String line;
      while ((line = reader.readLine()) != null) {
        // 处理数据接口
        sseMessageHandler.actMsg(is, line);
      }
      // 当服务器端主动关闭的时候,客户端无法获取到信号。现在还不清楚原因。所以无法执行的此处。
      reader.close();
    } catch (IOException e) {
      e.printStackTrace();
      throw new IOException("关闭数据流!");
    }
  }

  public static void main(String[] args) throws IOException {
    String urlPath = "http://localhost:8080/sse/123";
    InputStream inputStream = getSseInputStream(urlPath);
    readStream(inputStream, (is, line) -> System.out.println(line));
  }

}

最后

本文中的代码源码

博客
分类
标签
归档
关于