항해99
23.11.08 항해 99 16기 실전 프로젝트 31일차
김용글
2023. 11. 8. 22:38
오늘 공부한 것
* SSE 를 활용한 알림 기능 코드 작성 및 고도화
어제 작성했던 코드는 너무 간단한게 작성한 코드여서 오류도 많고 다양한 에러를 잡을 수 없었기에
코드 고도화에 도전했다
물론 결과부터 이야기하자면 오늘도 실패다... 코드를 바꾼 이후 내가 쓴 게시글에 다른 사람이 댓글을
달아도 알림이 오지 않았다.. 로그를 찍어가면서 어디서 문제인지 찾아봐야 할듯 하다
1. Controller
@RestController
@RequiredArgsConstructor
@RequestMapping("/api")
public class NotifyController {
private final NotifyService notifyService;
// 알림 구독 기능 수행
@Operation(summary = "알림 구독 기능 수행", description = "알림 구독 기능 수행 api 입니다.")
@GetMapping(value ="/subscribe", produces = "text/event-stream")
public SseEmitter subscribe (@AuthenticationPrincipal UserDetailsImpl userDetails,
@RequestHeader(value = "Last-Event-Id", required = false, defaultValue = "") String lastEventid) {
return notifyService.subscribe(userDetails.getUsername(), lastEventid);
}
// 전체 알림 조회
@Operation(summary = "전체 알림 조회", description = "전체 알림 조회 api 입니다.")
@GetMapping("/notify")
public List<NotifyResponseDto> notifyList(@AuthenticationPrincipal UserDetailsImpl userDetails) {
return notifyService.notifyList(userDetails.getUsers());
}
2. Dto
@Getter
public class NotifyResponseDto {
private Long id;
private String content;
private Boolean isRead;
public NotifyResponseDto(Long Id, String content, Boolean isRead) {
this.id = Id;
this.content = content;
this.isRead = isRead;
}
public static NotifyResponseDto create(Notify notify) {
return new NotifyResponseDto(
notify.getId(),
notify.getContent(),
notify.getIsRead()
);
}
public static List<NotifyResponseDto> createList(List<Notify> notifyList) {
return notifyList.stream()
.map(NotifyResponseDto::create)
.toList();
}
}
3. Entity
@Entity
@Getter
@Setter
@NoArgsConstructor
public class Notify extends TimeStamped {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "notify_id")
private Long id;
private String content;
private String sender;
// 읽었는지 여부 확인
@Column(nullable = false)
private Boolean isRead;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "users_id", nullable = false)
@OnDelete(action = OnDeleteAction.CASCADE)
private Users receiver;
@Builder
public Notify(Users receiver, String content, Boolean isRead, Posts posts) {
this.receiver = receiver;
this.content = content;
this.isRead = isRead;
}
public void setIsRead(boolean isRead) {
this.isRead = isRead;
}
}
4. Repository
public interface EmitterRepository {
// Emitter 저장
SseEmitter save (String emitterId, SseEmitter sseEmitter);
// Event 저장
void saveEventCache (String emitterId, Object event);
// 해당 회원과 관련된 모든 Emitter를 찾는다
Map<String, SseEmitter> findAllEmitterStartWithByUsersId(String usersId);
// 해당 회원과 관련된 모든 이벤트를 찾는다
Map<String, Object> findAllEventCacheStartWithByUsersId(String usersId);
// Emitter 삭제
void deleteById (String id);
// 해당 회원과 관련된 모든 Emiitter를 지운다
void deleteAllEmitterStartWithId (String usersId);
// 해당 회원과 관련된 모든 event를 지운다
void deleteAllEventCacheStartWithId (String usersId);
}
@Repository
@NoArgsConstructor
public class EmitterRepositoryImpl implements EmitterRepository{
// ConcurrentHashMap 동시에 여러 스레드가 접근하더라도 안전하게 데이터를 조작할 수 있도록 보장
// 동시성 문제를 해결하고 맵에 데이터 저장 / 조회가능
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
// Emitter 저장
@Override
public SseEmitter save (String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId, sseEmitter);
return sseEmitter;
}
// Event 저장
@Override
public void saveEventCache(String eventCacheId, Object event) {
eventCache.put(eventCacheId, event);
}
// 구분자로 회원 ID를 사용하기에 StartWith를 사용 - 해당 회원과 관련된 모든 Emitter를 찾는다
@Override
public Map<String, SseEmitter> findAllEmitterStartWithByUsersId (String usersId) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(usersId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
// // 해당 회원과 관련된 모든 이벤트를 찾는다
@Override
public Map<String, Object> findAllEventCacheStartWithByUsersId (String usersId) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(usersId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
// Emitter 삭제
@Override
public void deleteById (String id) {
emitters.remove(id);
}
// 해당 회원과 관련된 모든 Emiitter를 지운다
@Override
public void deleteAllEmitterStartWithId (String usersId) {
emitters.forEach(
(key, emitter) -> {
if (key.startsWith(usersId)) {
emitters.remove(key);
}
}
);
}
// // 해당 회원과 관련된 모든 event를 지운다
@Override
public void deleteAllEventCacheStartWithId (String usersId) {
eventCache.forEach(
(key, emitter) ->{
if (key.startsWith(usersId)) {
eventCache.remove(key);
}
}
);
}
}
public interface NotifyRepository extends JpaRepository<Notify, Long> {
List<Notify> findAllByReceiverOrderByCreatedAtDesc(Users users);
}
5. Service
@Service
@RequiredArgsConstructor
public class NotifyService {
// emitterId : SseEmitter 를 구분 / 관리하기 위한 식별자
// emitterRepository 에 저장되어 특정 클라이언트의 연결을 관리하는데 사용됨
// eventId : 개별 알림 이벤트를 식별하기 위한 고유 값
// 각 알림 이벤트는 고유한 eventId 를 가지고 있고 클라이언트에게 전송될 때 이벤트의 식별을 위해 사용됨
// subscribe() : 클라이언트와의 SSE 스트림 통신을 유지하면서 연결을 생성하고 유지
// send() : 알림을 생성하고 해당알림을 수신하는 모든 클라이언트에게 전송
// SSE 연결 지속시간 설정
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private final EmitterRepository emitterRepository;
private final NotifyRepository notifyRepository;
private static Map<Long, Integer> notifyCounts = new HashMap<>();
// 알림 구독 기능 수행
// Spring 에서 제공하는 SseEmitter 를 생성 후 저장한 다음
// 필요할 때마다 구독자가 생성한 SseEmitter를 불러와서 이벤트에 대한 응답 전송
// Controller 에서 가져온 수신자의 식별정보와 마지막 이벤트 식별자를 받음
public SseEmitter subscribe(String nickName, String lastEventid) {
// emitterID 생성 nickName 을 포함하여 SseEmitter 를 식별하기 위한 고유 아이디생성
String emitterId = makeTimeIncludeId(nickName);
// 새로운 SseEmitter 객체를 생성하고 emitterId 를 키로 사용해 emitterRepository 에 저장
// 이렇게 생성된 SseEmitter 는 클라이언트에게 이벤트를 전송하는 역활 수행
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter((DEFAULT_TIMEOUT)));
// 완료, 타임아웃, 에러 발생시 SseEmitter를 emitterRepository 에서 삭제하도록 설정
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
emitter.onError((e) -> emitterRepository.deleteAllEmitterStartWithId(emitterId));
// 503 에러를 방지하기 위한 더미 이벤트 전송
String eventId = makeTimeIncludeId(nickName);
sendNotify(emitter, eventId, emitterId, "EventStream Created. [userEmail=" + nickName + "]");
// 미수신한 이벤트 전송
// 만약 클라이언트가 마지막으로 수신한 이벤트 ID 인 lastEventId 가 존재하면
// 이전에 발생한 이벤트 중 해당이벤트 이후의 이벤트들을 캐시에서 가져와 클라이언트에게 전송
// 이를 통해 클라이언트가 놓친 이벤트를 보상하여 데이터 유실 예방
if (hasLostData(lastEventid)) {
sendLostData(lastEventid, nickName, emitterId, emitter);
}
// 생성된 SseEmitter를 반환하여 클라이언트에게 전달
// 클라이언트는 이를 통해 서버로부터 알림 이벤트를 수신 / 처리 가능
return emitter;
}
// EmitterId, eventId를 생성
// 시간이 있는 이유 : ID 값만 사용하면 데이터가 언제 보내졌는지 유실되었는지 알 수없다
// 따라서 System.currentTimeMillis() 를 붙여두면 데이터가 유실된 시점을 파악할 수 있음으로
// 저장된 key 값 비교를 통해 유실된 데이터만 재전송 가능
private String makeTimeIncludeId(String email) {
return email + "_" + System.currentTimeMillis();
}
// SsEmitter 객체를 사용해서 SSE 를 클라이언트에게 전송하는 역활
private void sendNotify(SseEmitter emitter, String eventId, String emitterId, Object data) {
try {
// 더미데이터 전송
emitter.send(SseEmitter.event()
.id(eventId) // 이벤트의 고유식별자 설정
.name("see") // 이벤트의 이름 설정
.data(data) // 이벤트로 전송할 데이터 설정
);
// 만약 클라이언트의 연결이 끊어져 해당 예외를 캐치하면 끊긴 SseEmitter 객체를 제거하여 정리
} catch (IOException exception) {
emitterRepository.deleteById(emitterId);
}
}
// lastEventId 가 비어있지 않은지 확인하여 클라이언트가 이전 이벤트 이후에 새로운 이벤트를 놓치지 않았는지 확인
private boolean hasLostData(String lastEventId) {
// lastEvnetId 가 비어있지 않을 때 -> Controller 의 헤더를 통해 lastEvnetId 가 들어옴 -> 손실된 이벤트가 있다 -> true 리턴
// 즉, 클라이언트가 이전 이벤트 이후에 새로운 이벤트를 받지 않았으므로 이후 발생한 이벤트들이 손실됨을 의미
// lastEvnetId 비어있을때 -> Controller 의 헤더를 통해 lastEvnetId 가 들어오지않음 -> 손실된 이벤트가 없다 -> false 리턴
// 즉, 클라이언트가 이전에 받은 이벤트 이후에 새로운 이벤트들을 놓치지 않았다는 의미
return !lastEventId.isEmpty();
}
// 수신자에게 전송되지 못한 이벤트 데이터를 캐시에서 가져와 클라이언트에게 전송하는 과정 수행
private void sendLostData(String lastEventId, String userEmail, String emitterId, SseEmitter emitter) {
// 수신자의 이메일을 기준으로 캐시된 이벤트 데이터를 가져옴
Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByUsersId(String.valueOf(userEmail));
// eventCaches 맵 객체의 엔트리들을 스트림으로 변환해 순환 entry 는 키와 값의 쌍으로 구성됨
eventCaches.entrySet().stream()
// lastEventId 가 entry 의 키보다 작을 때만 필터링
// lastEventId.compareTo(entry.getKey())는 lastEventId와 entry.getKey()를 비교하여 순서를 나타내는 정수 값을 반환하는데,
// 무엇이 더 크냐에 따라 정수값의 부호가 달라진다.
// 음수 값일 경우 : lastEventId < entry.getKey()
// 0일 경우 : lastEventId == entry.getKey()
// 양수 값일 경우 : lastEventId > entry.getKey()
//⠀여기서는 lastEventId가 엔트리 키보다 작아야 하는 경우이기 때문에 음수값을 필터링하기 위해 < 0 이 있어야 함
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
// 필터리된 각 entry 에 대해 sendNotify() 를 호출하여 SseEmitter 를 통해 해당 알림을 클라이언트에게 전송
.forEach(entry -> sendNotify(emitter, entry.getKey(), emitterId, entry.getValue()));
}
// 알림을 생성하고 지정된 수신자에게 알림을 전송하는 기능 수행
// 알림 수신자, 알림 유형, 내용, URL 등의 정보를 인자로 받음
// 알림을 수신하는 모든 수신자에게 알림을 전송하기 위해 emitterRepository 에서 해당 수신자의
// 모든 SseEmitter 를 가져와 알림을 전송하고, 동시에 emitterRepository 에 이벤트 캐시 저장
public void send(Users receiver, String message) {
// 알림 객체 생성 및 저장 (수신자, 내용 저장)
Notify notify = notifyRepository.save(createNotify(receiver, message));
// 이벤트 ID 생성 (SseEmitter로 전송되는 이벤트의 고유 식별자로 사용됨)
String eventId = makeTimeIncludeId(String.valueOf(receiver.getId()));
// 수신자의 이메일을 SseEmitter 객체에서 관리되는 emitterRepository에서 사용합니다
String receiverId = receiver.getNickName();
// 이 메서드는 해당 수신자에 연결된 모든 SseEmitter 객체를 가져와 알림을 전송합니다
// 알림을 수신하는 모든 수신자에게 알림을 전송하고 동시에 emitterRepository에 이벤트 캐시를 저장합니다
Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByUsersId(receiverId);
emitters.forEach((key, emitter) -> {
// 데이터 캐시 저장 (유실된 데이터 처리를 위함)
emitterRepository.saveEventCache(key, notify);
// 데이터 전송
sendNotify(emitter, eventId, key, message);
});
}
// 파라미터로 받은 값들로 알림 객체를 bulder 를 이용해 생성
private Notify createNotify(Users receiver, String content) {
return Notify.builder()
.receiver(receiver)
.content(content)
.isRead(false)
.build();
}
// 조회
public List<NotifyResponseDto> notifyList(Users users) {
List<Notify> notificationList = notifyRepository.findAllByReceiverOrderByCreatedAtDesc(users);
return NotifyResponseDto.createList(notificationList);
}
@Slf4j
@Service
@RequiredArgsConstructor
public class CommentsService {
private final CommentsRepository commentsRepository;
private final PostsRepository postsRepository;
private final NotifyService notifyService;
// 댓글 생성
public MessageResponseDto commentsCreate(Long postId,
CommentsRequestDto requestDto,
Users users) {
Posts posts = postsRepository.findById(postId).orElseThrow(
() -> new CustomException(ErrorCode.POST_NOT_EXIST)); // 존재하지 않는 게시글입니다
Comments comments = new Comments(requestDto, users, posts);
commentsRepository.save(comments);
// 댓글이 자신의 게시물에 작성된 것인지 확인
if (!posts.getUsers().getEmail().equals(comments.getEmail())) {
log.info("posts.getUsers():{}", posts.getUsers().getId());
log.info("당신의 게시물에 새로운 댓글이 작성되었습니다");
notifyService.send(posts.getUsers(), "당신의 게시물에 새로운 댓글이 작성되었습니다: " + comments.getContents());
}
return new MessageResponseDto ("댓글을 작성하였습니다", 200);
}
6. 결과
알림이 확인되지 않음