참고
Server-Sent Events(SSE)
NOTE
SSE는 데이터를 실시간으로 Streaming 하는 기술이다!
예시이미지 → checkbox클릭시 모든 client에 반영된다! (우리는 알람기능에 사용한다.)
•
기존에는 Server의 변경된 데이터를 가져오기 위해서 새로고침, 혹은 지속적으로 request를 보내는 ajax 폴링, 외부 플러그인을 사용해야 했다.
•
이외에도 websocket을 사용할 수 있지만 HTTP 통신을 이용하는 것이 아닌 websocket만을 위한 별도의 서버와 프로토콜로 통신하기 때문에 비용이 많이든다.
•
SSE는 기존 HTTP 웹 서버에서, HTTP API만으로 동작되므로 개발난이도가 매우 쉽다!
Server-Sent Events 특징
NOTE
•
브라우저는 서버가 생성한 Stream을 계속해서 받는다(Server에서 보내는 Stream으로 Read Only)
•
Connection 유지를 위해 HTTP Protocol을 사용, HTTP/2를 통한 multiplexing 사용가능
•
연결이 끊어지면 EventSource가 오류 이벤트를 발생시키고 자동으로 다시 연결을 시도
•
대부분의 브라우저에서 지원
Server-Sent Events 사용시점
NOTE
•
효율적인 단방향 통신이 필요한 경우
•
실시간 데이터 스트리밍에 HTTP를 사용하려는 경우(RestFul의 Get Method와 유사)
•
사용되는 예
◦
암호 화폐 또는 주가 피드 구독
◦
라이브 스포츠 점수 받기
◦
뉴스 업데이트 또는 알림
Server-Sent Events VS WebSocket
NOTE
Socket | Server-Sent-Event |
브라우저 지원 | 대부분 브라우저에서 지원 |
통신 방향 | 양방향 |
리얼타임 | Yes |
데이터 형태 | Binary, UTF-8 |
자동 재접속 | No |
최대 동시 접속 수 | 브라우저 연결 한도는 없지만 서버 셋업에 따라 다름 |
프로토콜 | websocket |
베터리 소모량 | 큼 |
Firewall 친화적 | Nope |
•
사실 Socket하나만 알고 있어도 SSE방식을 모두 구현할 수 있다.
◦
Socket은 양방향이기 때문에 SSE 역할도 해낼 수 있기 떄문
•
그러나 위에서 보는 것처럼 Websocket과 SSE의 스펙차이 때문에 사용처에 따라 선택적으로 사용된다.
웹소켓 사용처 (리얼타임)
•
카카오톡
•
주식 트레이딩 데이터
SSE(알람)
•
SNS 친구 요청이나 알람
Spring 개발
NOTE
기본적인 흐름은 다음과 같다!
SSE 연결요청
SSE 데이터 전달
1.
클라이언트에서 SSE 연결 요청을 보낸다.
2.
서버에서는 클라이언트와 매핑되는 SSE 통신 객체를 만든다.
3.
서버에서 이벤트가 발생하면 해당 객체를 통해 클라이언트로 데이터를 전달한다.
Controller 구현
NOTE
@RestController
@RequestMapping("/alarms")
@RequiredArgsConstructor
@Slf4j
@Tag(name = "Alarms", description = "알람 관련 API, SSE 방식사용")
public class AlarmController {
private final SseService sseService;
@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@Operation(summary = "사용자가 SSE에 연결됨")
public SseEmitter subscribe(
@RequestHeader(name = "Authorization") String accessToken) {
log.info("컨트롤러에 접속됨");
return alarmService.subscribe(accessToken);
}
//...
}
Java
복사
•
연결 요청을 처리하기 위해서, MIME Type을 text/event-stream형태로 받아줘야 한다.
•
반환값을 다른 객체로 감싸지말고, SseEmitter로 해야 front에서 제대로 받을 수 있다.
Repository 구현
NOTE
기존의 JpaRepository를 사용하기에는, 어떤 회원이 어떤 Emitter가 연결되었는지, 어떤 이벤트들이 현재까지 발생했는지를 저장하기에 까다로우므로 Repository를 따로 구현
@Repository
public class EmitterRepository {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
// emitterId 형식 => [유저의 accessToken] + _ + 생성시간
/**
* Emitter를 저장한다
* @param emitterId
* @param sseEmitter
* @return
*/
public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId, sseEmitter);
return sseEmitter;
}
/**
* 이벤트를 저장한다.
* @param eventCacheId
* @param event
*/
public void saveEventCache(String eventCacheId, Object event) {
eventCache.put(eventCacheId, event);
}
/**
* 해당 회원과 관련된 모든 Emitter를 찾는다.
* @param memberId
* @return
*/
public Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
/**
* 해당 회원과 관련된 모든 이벤트를 찾는다.
* @param memberId
* @return
*/
public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
/**
* Emitter를 지운다.
* @param id
*/
public void deleteById(String id) {
emitters.remove(id);
}
/**
* 해당 회원과 관련된 모든 Emitter를 지운다.
* @param memberId
*/
public void deleteAllEmitterStartWithId(String memberId) {
emitters.forEach(
(key, emitter) -> {
if (key.startsWith(memberId)) {
emitters.remove(key);
}
}
);
}
/**
* 해당 회원과 관련된 모든 이벤트를 지운다.
* @param memberId
*/
public void deleteAllEventCacheStartWithId(String memberId) {
eventCache.forEach(
(key, emitter) -> {
if (key.startsWith(memberId)) {
eventCache.remove(key);
}
}
);
}
}
Java
복사
•
Emitter를 생성, 검색, 삭제
•
Emitter의 이벤트를 생성, 검색, 삭제
Service 구현
NOTE
어떻게 SseEmitte에 구독을 진행하는지와, 구독자들에게 어떻게 이벤트를 전달시키는지 알아보자!
전체 코드
String emitterId = makeTimeIncludeId(memberId);
private String makeTimeIncludeId(Long memberId) {
return memberId + "_" + System.currentTimeMillis();
}
SseEmitter emitter = sseRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
Java
복사
접속한 사용자의 id로 emitter 생성
emitter.onCompletion(() -> sseRepository.deleteById(emitterId));
emitter.onTimeout(() -> sseRepository.deleteById(emitterId));
emitter.onError((e) -> sseRepository.deleteById(emitterId));
Java
복사
시간이 만료된 경우 자동으로 Repository에서 삭제 처리해줄 수 있는 콜백을 등록
String eventId = makeTimeIncludeId(userId);
sendToClient(emitter, eventId, MESSAGE, emitterId, "EventStream Created. [userId=" + userId + "]");
Shell
복사
503 에러를 방지하기 위한 더미데이터 넣어줌
private void sendToClient(SseEmitter emitter, String eventId, String eventName, String emitterId, Object data) {
try {
emitter.send(SseEmitter.event()
.id(eventId) // 이벤트 ID
.name(eventName) // 이벤트 이름(이벤트 유형) message로 넣음
.data(data)); // 이벤트에 전송되는 데이터
} catch (IOException exception) {
sseRepository.deleteById(emitterId);
}
}
Java
복사
클라이언트에게 데이터가 넘어간다.
React
NOTE
let eventSource = useRef < EventSource | null > null;
eventSource = new EventSourcePolyfill(`${BASE_URL}/connect`, {
headers: {
'Authorization': accessToken,
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Access-Control-Allow-Origin': '*',
},
heartbeatTimeout: 86400000,
});
JavaScript
복사
SSE를 수신한다 → EventSource말고 EventSourcePolyfill을 사용하는 이유는 EventSource를 사용하는 경우 헤더값이 제대로 들어가지 않는 이슈가 있다해서 사용.
eventSource.addEventListener('open', (e) => {
console.log("연결됨");
});
JavaScript
복사
연결된 경우 이벤트
eventSource.addEventListener('message', (e) => {
console.log(e.data)
console.log("알람옴!")
});
JavaScript
복사
message 이벤트가 온 경우