Search
Duplicate
📒

[Spring WebSocket] 02-1. Server-Sent-Events(SSE) 구현하기

상태
미진행
수업
Spring Netty
주제
WebSocket
4 more properties
참고

Server-Sent Events(SSE)

NOTE
SSE 데이터를 실시간으로 Streaming 하는 기술이다!
예시이미지 → checkbox클릭시 모든 client에 반영된다! (우리는 알람기능에 사용한다.)
기존에는 Server의 변경된 데이터를 가져오기 위해서 새로고침, 혹은 지속적으로 request를 보내는 ajax 폴링, 외부 플러그인을 사용해야 했다.
이외에도 websocket을 사용할 수 있지만 HTTP 통신을 이용하는 것이 아닌 websocket만을 위한 별도의 서버와 프로토콜로 통신하기 때문에 비용이 많이든다.
SSE는 기존 HTTP 웹 서버에서, HTTP API만으로 동작되므로 개발난이도가 매우 쉽다!

Server-Sent Events 특징

NOTE
브라우저는 서버가 생성한 Stream을 계속해서 받는다(Server에서 보내는 Stream으로 Read Only)
Connection 유지를 위해 HTTP Protocol을 사용, HTTP/2를 통한 multiplexing 사용가능
연결이 끊어지면 EventSource가 오류 이벤트를 발생시키고 자동으로 다시 연결을 시도
대부분의 브라우저에서 지원

Server-Sent Events 사용시점

NOTE
효율적인 단방향 통신이 필요한 경우
실시간 데이터 스트리밍에 HTTP를 사용하려는 경우(RestFul의 Get Method와 유사)
사용되는 예
암호 화폐 또는 주가 피드 구독
라이브 스포츠 점수 받기
뉴스 업데이트 또는 알림

Server-Sent Events VS WebSocket

NOTE
Socket
Server-Sent-Event
브라우저 지원
대부분 브라우저에서 지원
통신 방향
양방향
리얼타임
Yes
데이터 형태
Binary, UTF-8
자동 재접속
No
최대 동시 접속 수
브라우저 연결 한도는 없지만 서버 셋업에 따라 다름
프로토콜
websocket
베터리 소모량
Firewall 친화적
Nope
사실 Socket하나만 알고 있어도 SSE방식을 모두 구현할 수 있다.
Socket은 양방향이기 때문에 SSE 역할도 해낼 수 있기 떄문
그러나 위에서 보는 것처럼 WebsocketSSE스펙차이 때문에 사용처에 따라 선택적으로 사용된다.

웹소켓 사용처 (리얼타임)

카카오톡
주식 트레이딩 데이터

SSE(알람)

SNS 친구 요청이나 알람

Spring 개발

NOTE
기본적인 흐름은 다음과 같다!
SSE 연결요청
SSE 데이터 전달
1.
클라이언트에서 SSE 연결 요청을 보낸다.
2.
서버에서는 클라이언트와 매핑되는 SSE 통신 객체를 만든다.
3.
서버에서 이벤트가 발생하면 해당 객체를 통해 클라이언트로 데이터를 전달한다.

Controller 구현

NOTE
@RestController @RequestMapping("/alarms") @RequiredArgsConstructor @Slf4j @Tag(name = "Alarms", description = "알람 관련 API, SSE 방식사용") public class AlarmController { private final SseService sseService; @GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE) @Operation(summary = "사용자가 SSE에 연결됨") public SseEmitter subscribe( @RequestHeader(name = "Authorization") String accessToken) { log.info("컨트롤러에 접속됨"); return alarmService.subscribe(accessToken); } //... }
Java
복사
연결 요청을 처리하기 위해서, MIME Typetext/event-stream형태로 받아줘야 한다.
반환값을 다른 객체로 감싸지말고, SseEmitter로 해야 front에서 제대로 받을 수 있다.

Repository 구현

NOTE
기존의 JpaRepository를 사용하기에는, 어떤 회원이 어떤 Emitter가 연결되었는지, 어떤 이벤트들이 현재까지 발생했는지를 저장하기에 까다로우므로 Repository를 따로 구현
@Repository public class EmitterRepository { private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>(); private final Map<String, Object> eventCache = new ConcurrentHashMap<>(); // emitterId 형식 => [유저의 accessToken] + _ + 생성시간 /** * Emitter를 저장한다 * @param emitterId * @param sseEmitter * @return */ public SseEmitter save(String emitterId, SseEmitter sseEmitter) { emitters.put(emitterId, sseEmitter); return sseEmitter; } /** * 이벤트를 저장한다. * @param eventCacheId * @param event */ public void saveEventCache(String eventCacheId, Object event) { eventCache.put(eventCacheId, event); } /** * 해당 회원과 관련된 모든 Emitter를 찾는다. * @param memberId * @return */ public Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId) { return emitters.entrySet().stream() .filter(entry -> entry.getKey().startsWith(memberId)) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } /** * 해당 회원과 관련된 모든 이벤트를 찾는다. * @param memberId * @return */ public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) { return eventCache.entrySet().stream() .filter(entry -> entry.getKey().startsWith(memberId)) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } /** * Emitter를 지운다. * @param id */ public void deleteById(String id) { emitters.remove(id); } /** * 해당 회원과 관련된 모든 Emitter를 지운다. * @param memberId */ public void deleteAllEmitterStartWithId(String memberId) { emitters.forEach( (key, emitter) -> { if (key.startsWith(memberId)) { emitters.remove(key); } } ); } /** * 해당 회원과 관련된 모든 이벤트를 지운다. * @param memberId */ public void deleteAllEventCacheStartWithId(String memberId) { eventCache.forEach( (key, emitter) -> { if (key.startsWith(memberId)) { eventCache.remove(key); } } ); } }
Java
복사
Emitter를 생성, 검색, 삭제
Emitter의 이벤트를 생성, 검색, 삭제

Service 구현

NOTE
어떻게 SseEmitte에 구독을 진행하는지와, 구독자들에게 어떻게 이벤트를 전달시키는지 알아보자!
전체 코드
String emitterId = makeTimeIncludeId(memberId); private String makeTimeIncludeId(Long memberId) { return memberId + "_" + System.currentTimeMillis(); } SseEmitter emitter = sseRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
Java
복사
접속한 사용자의 id로 emitter 생성
emitter.onCompletion(() -> sseRepository.deleteById(emitterId)); emitter.onTimeout(() -> sseRepository.deleteById(emitterId)); emitter.onError((e) -> sseRepository.deleteById(emitterId));
Java
복사
시간이 만료된 경우 자동으로 Repository에서 삭제 처리해줄 수 있는 콜백을 등록
String eventId = makeTimeIncludeId(userId); sendToClient(emitter, eventId, MESSAGE, emitterId, "EventStream Created. [userId=" + userId + "]");
Shell
복사
503 에러를 방지하기 위한 더미데이터 넣어줌
private void sendToClient(SseEmitter emitter, String eventId, String eventName, String emitterId, Object data) { try { emitter.send(SseEmitter.event() .id(eventId) // 이벤트 ID .name(eventName) // 이벤트 이름(이벤트 유형) message로 넣음 .data(data)); // 이벤트에 전송되는 데이터 } catch (IOException exception) { sseRepository.deleteById(emitterId); } }
Java
복사
클라이언트에게 데이터가 넘어간다.

React

NOTE
let eventSource = useRef < EventSource | null > null; eventSource = new EventSourcePolyfill(`${BASE_URL}/connect`, { headers: { 'Authorization': accessToken, 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Access-Control-Allow-Origin': '*', }, heartbeatTimeout: 86400000, });
JavaScript
복사
SSE를 수신한다 → EventSource말고 EventSourcePolyfill을 사용하는 이유는 EventSource를 사용하는 경우 헤더값이 제대로 들어가지 않는 이슈가 있다해서 사용.
eventSource.addEventListener('open', (e) => { console.log("연결됨"); });
JavaScript
복사
연결된 경우 이벤트
eventSource.addEventListener('message', (e) => { console.log(e.data) console.log("알람옴!") });
JavaScript
복사
message 이벤트가 온 경우