항해99

23.11.08 항해 99 16기 실전 프로젝트 31일차

김용글 2023. 11. 8. 22:38

오늘 공부한 것

* SSE 를 활용한 알림 기능 코드 작성 및 고도화

 

어제 작성했던 코드는 너무 간단한게 작성한 코드여서 오류도 많고 다양한 에러를 잡을 수 없었기에

코드 고도화에 도전했다

 

물론 결과부터 이야기하자면 오늘도 실패다... 코드를 바꾼 이후 내가 쓴 게시글에 다른 사람이 댓글을

달아도 알림이 오지 않았다.. 로그를 찍어가면서 어디서 문제인지 찾아봐야 할듯 하다

 

1. Controller

@RestController
@RequiredArgsConstructor
@RequestMapping("/api")
public class NotifyController {
    private final NotifyService notifyService;

    // 알림 구독 기능 수행
    @Operation(summary = "알림 구독 기능 수행", description = "알림 구독 기능 수행 api 입니다.")
    @GetMapping(value ="/subscribe", produces = "text/event-stream")
    public SseEmitter subscribe (@AuthenticationPrincipal UserDetailsImpl userDetails,
                                 @RequestHeader(value = "Last-Event-Id", required = false, defaultValue = "") String lastEventid) {
        return notifyService.subscribe(userDetails.getUsername(), lastEventid);
    }

    // 전체 알림 조회
    @Operation(summary = "전체 알림 조회", description = "전체 알림 조회 api 입니다.")
    @GetMapping("/notify")
    public List<NotifyResponseDto> notifyList(@AuthenticationPrincipal UserDetailsImpl userDetails) {
        return notifyService.notifyList(userDetails.getUsers());
    }

 

2. Dto

@Getter
public class NotifyResponseDto {
    private Long id;
    private String content;
    private Boolean isRead;

    public NotifyResponseDto(Long Id, String content, Boolean isRead) {
        this.id = Id;
        this.content = content;
        this.isRead = isRead;
    }

    public static NotifyResponseDto create(Notify notify) {
        return new NotifyResponseDto(
                notify.getId(),
                notify.getContent(),
                notify.getIsRead()
        );
    }

    public static List<NotifyResponseDto> createList(List<Notify> notifyList) {
        return notifyList.stream()
                .map(NotifyResponseDto::create)
                .toList();
    }
}

 

3. Entity

@Entity
@Getter
@Setter
@NoArgsConstructor
public class Notify extends TimeStamped {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "notify_id")
    private Long id;

    private String content;

    private String sender;

    // 읽었는지 여부 확인
    @Column(nullable = false)
    private Boolean isRead;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "users_id", nullable = false)
    @OnDelete(action = OnDeleteAction.CASCADE)
    private Users receiver;

    @Builder
    public Notify(Users receiver, String content, Boolean isRead, Posts posts) {
        this.receiver = receiver;
        this.content = content;
        this.isRead = isRead;
    }

    public void setIsRead(boolean isRead) {
        this.isRead = isRead;
    }
}

 

4. Repository

public interface EmitterRepository {

    // Emitter 저장
    SseEmitter save (String emitterId, SseEmitter sseEmitter);

    // Event 저장
    void saveEventCache (String emitterId, Object event);

    // 해당 회원과 관련된 모든 Emitter를 찾는다
    Map<String, SseEmitter> findAllEmitterStartWithByUsersId(String usersId);

    // 해당 회원과 관련된 모든 이벤트를 찾는다
    Map<String, Object> findAllEventCacheStartWithByUsersId(String usersId);

    // Emitter 삭제
    void deleteById (String id);

    // 해당 회원과 관련된 모든 Emiitter를 지운다
    void deleteAllEmitterStartWithId (String usersId);

    // 해당 회원과 관련된 모든 event를 지운다
    void deleteAllEventCacheStartWithId (String usersId);
}
@Repository
@NoArgsConstructor
public class EmitterRepositoryImpl implements EmitterRepository{
    // ConcurrentHashMap 동시에 여러 스레드가 접근하더라도 안전하게 데이터를 조작할 수 있도록 보장
    // 동시성 문제를 해결하고 맵에 데이터 저장 / 조회가능
    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();

    private final Map<String, Object> eventCache = new ConcurrentHashMap<>();

    // Emitter 저장
    @Override
    public SseEmitter save (String emitterId, SseEmitter sseEmitter) {
        emitters.put(emitterId, sseEmitter);
        return sseEmitter;
    }

    // Event 저장
    @Override
    public void saveEventCache(String eventCacheId, Object event) {
        eventCache.put(eventCacheId, event);
    }

    // 구분자로 회원 ID를 사용하기에 StartWith를 사용 - 해당 회원과 관련된 모든 Emitter를 찾는다
    @Override
    public Map<String, SseEmitter> findAllEmitterStartWithByUsersId (String usersId) {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(usersId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    //  // 해당 회원과 관련된 모든 이벤트를 찾는다
    @Override
    public Map<String, Object> findAllEventCacheStartWithByUsersId (String usersId) {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(usersId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    // Emitter 삭제
    @Override
    public void deleteById (String id) {
        emitters.remove(id);
    }

    // 해당 회원과 관련된 모든 Emiitter를 지운다
    @Override
    public void deleteAllEmitterStartWithId (String usersId) {
        emitters.forEach(
                (key, emitter) -> {
                    if (key.startsWith(usersId)) {
                        emitters.remove(key);
                    }
                }
        );
    }

    // // 해당 회원과 관련된 모든 event를 지운다
    @Override
    public void deleteAllEventCacheStartWithId (String usersId) {
        eventCache.forEach(
                (key, emitter) ->{
                    if (key.startsWith(usersId)) {
                        eventCache.remove(key);
                    }
                }
        );
    }
}
public interface NotifyRepository extends JpaRepository<Notify, Long> {

    List<Notify> findAllByReceiverOrderByCreatedAtDesc(Users users);
}

 

5. Service

@Service
@RequiredArgsConstructor
public class NotifyService {

    // emitterId : SseEmitter 를 구분 / 관리하기 위한 식별자
    //             emitterRepository 에 저장되어 특정 클라이언트의 연결을 관리하는데 사용됨
    // eventId : 개별 알림 이벤트를 식별하기 위한 고유 값
    //           각 알림 이벤트는 고유한 eventId 를 가지고 있고 클라이언트에게 전송될 때 이벤트의 식별을 위해 사용됨
    // subscribe() : 클라이언트와의 SSE 스트림 통신을 유지하면서 연결을 생성하고 유지
    // send() : 알림을 생성하고 해당알림을 수신하는 모든 클라이언트에게 전송

    // SSE 연결 지속시간 설정
    private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;

    private final EmitterRepository emitterRepository;
    private final NotifyRepository notifyRepository;

    private static Map<Long, Integer> notifyCounts = new HashMap<>();

    // 알림 구독 기능 수행
    // Spring 에서 제공하는 SseEmitter 를 생성 후 저장한 다음
    // 필요할 때마다 구독자가 생성한 SseEmitter를 불러와서 이벤트에 대한 응답 전송
    // Controller 에서 가져온 수신자의 식별정보와 마지막 이벤트 식별자를 받음
    public SseEmitter subscribe(String nickName, String lastEventid) {

        // emitterID 생성 nickName 을 포함하여 SseEmitter 를 식별하기 위한 고유 아이디생성
        String emitterId = makeTimeIncludeId(nickName);

        // 새로운 SseEmitter 객체를 생성하고 emitterId 를 키로 사용해 emitterRepository 에 저장
        // 이렇게 생성된 SseEmitter 는 클라이언트에게 이벤트를 전송하는 역활 수행
        SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter((DEFAULT_TIMEOUT)));

        // 완료, 타임아웃, 에러 발생시 SseEmitter를 emitterRepository 에서 삭제하도록 설정
        emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
        emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
        emitter.onError((e) -> emitterRepository.deleteAllEmitterStartWithId(emitterId));

        // 503 에러를 방지하기 위한 더미 이벤트 전송
        String eventId = makeTimeIncludeId(nickName);
        sendNotify(emitter, eventId, emitterId, "EventStream Created. [userEmail=" + nickName + "]");

        // 미수신한 이벤트 전송
        // 만약 클라이언트가 마지막으로 수신한 이벤트 ID 인 lastEventId 가 존재하면
        // 이전에 발생한 이벤트 중 해당이벤트 이후의 이벤트들을 캐시에서 가져와 클라이언트에게 전송
        // 이를 통해 클라이언트가 놓친 이벤트를 보상하여 데이터 유실 예방
        if (hasLostData(lastEventid)) {
            sendLostData(lastEventid, nickName, emitterId, emitter);
        }

        // 생성된 SseEmitter를 반환하여 클라이언트에게 전달
        // 클라이언트는 이를 통해 서버로부터 알림 이벤트를 수신 / 처리 가능
        return emitter;
    }

    // EmitterId, eventId를 생성
    // 시간이 있는 이유 : ID 값만 사용하면 데이터가 언제 보내졌는지 유실되었는지 알 수없다
    //                 따라서 System.currentTimeMillis() 를 붙여두면 데이터가 유실된 시점을 파악할 수 있음으로
    //                 저장된 key 값 비교를 통해 유실된 데이터만 재전송 가능
    private String makeTimeIncludeId(String email) {
        return email + "_" + System.currentTimeMillis();
    }

    // SsEmitter 객체를 사용해서 SSE 를 클라이언트에게 전송하는 역활
    private void sendNotify(SseEmitter emitter, String eventId, String emitterId, Object data) {
        try {
            // 더미데이터 전송
            emitter.send(SseEmitter.event()
                    .id(eventId) // 이벤트의 고유식별자 설정
                    .name("see") // 이벤트의 이름 설정
                    .data(data) // 이벤트로 전송할 데이터 설정
            );
            // 만약 클라이언트의 연결이 끊어져 해당 예외를 캐치하면 끊긴 SseEmitter 객체를 제거하여 정리
        } catch (IOException exception) {
            emitterRepository.deleteById(emitterId);
        }
    }

    // lastEventId 가 비어있지 않은지 확인하여 클라이언트가 이전 이벤트 이후에 새로운 이벤트를 놓치지 않았는지 확인
    private boolean hasLostData(String lastEventId) {

        // lastEvnetId 가 비어있지 않을 때 -> Controller 의 헤더를 통해 lastEvnetId 가 들어옴 -> 손실된 이벤트가 있다 ->  true 리턴
        // 즉, 클라이언트가 이전 이벤트 이후에 새로운 이벤트를 받지 않았으므로 이후 발생한 이벤트들이 손실됨을 의미
        // lastEvnetId 비어있을때 -> Controller 의 헤더를 통해 lastEvnetId 가 들어오지않음 -> 손실된 이벤트가 없다 ->  false 리턴
        // 즉, 클라이언트가 이전에 받은 이벤트 이후에 새로운 이벤트들을 놓치지 않았다는 의미
        return !lastEventId.isEmpty();
    }

    // 수신자에게 전송되지 못한 이벤트 데이터를 캐시에서 가져와 클라이언트에게 전송하는 과정 수행
    private void sendLostData(String lastEventId, String userEmail, String emitterId, SseEmitter emitter) {

        // 수신자의 이메일을 기준으로 캐시된 이벤트 데이터를 가져옴
        Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByUsersId(String.valueOf(userEmail));
        // eventCaches 맵 객체의 엔트리들을 스트림으로 변환해 순환 entry 는 키와 값의 쌍으로 구성됨
        eventCaches.entrySet().stream()
                // lastEventId 가 entry 의 키보다 작을 때만 필터링
                // lastEventId.compareTo(entry.getKey())는 lastEventId와 entry.getKey()를 비교하여 순서를 나타내는 정수 값을 반환하는데,
                // 무엇이 더 크냐에 따라 정수값의 부호가 달라진다.
                // 음수 값일 경우 : lastEventId < entry.getKey()
                // 0일 경우 : lastEventId == entry.getKey()
                // 양수 값일 경우 : lastEventId > entry.getKey()
                //⠀여기서는 lastEventId가 엔트리 키보다 작아야 하는 경우이기 때문에 음수값을 필터링하기 위해 < 0 이 있어야 함
                .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
                // 필터리된 각 entry 에 대해 sendNotify() 를 호출하여 SseEmitter 를 통해 해당 알림을 클라이언트에게 전송
                .forEach(entry -> sendNotify(emitter, entry.getKey(), emitterId, entry.getValue()));
    }

    // 알림을 생성하고 지정된 수신자에게 알림을 전송하는 기능 수행
    // 알림 수신자, 알림 유형, 내용, URL 등의 정보를 인자로 받음
    // 알림을 수신하는 모든 수신자에게 알림을 전송하기 위해 emitterRepository 에서 해당 수신자의
    // 모든 SseEmitter 를 가져와 알림을 전송하고, 동시에 emitterRepository 에 이벤트 캐시 저장
    public void send(Users receiver, String message) {
        // 알림 객체 생성 및 저장 (수신자, 내용 저장)
        Notify notify = notifyRepository.save(createNotify(receiver, message));

        // 이벤트 ID 생성 (SseEmitter로 전송되는 이벤트의 고유 식별자로 사용됨)
        String eventId = makeTimeIncludeId(String.valueOf(receiver.getId()));

        // 수신자의 이메일을 SseEmitter 객체에서 관리되는 emitterRepository에서 사용합니다
        String receiverId = receiver.getNickName();

        // 이 메서드는 해당 수신자에 연결된 모든 SseEmitter 객체를 가져와 알림을 전송합니다
        // 알림을 수신하는 모든 수신자에게 알림을 전송하고 동시에 emitterRepository에 이벤트 캐시를 저장합니다
        Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByUsersId(receiverId);

        emitters.forEach((key, emitter) -> {
            // 데이터 캐시 저장 (유실된 데이터 처리를 위함)
            emitterRepository.saveEventCache(key, notify);
            // 데이터 전송
            sendNotify(emitter, eventId, key, message);
        });
    }

    // 파라미터로 받은 값들로 알림 객체를 bulder 를 이용해 생성
    private Notify createNotify(Users receiver, String content) {
        return Notify.builder()
                .receiver(receiver)
                .content(content)
                .isRead(false)
                .build();
    }

    // 조회
    public List<NotifyResponseDto> notifyList(Users users) {
        List<Notify> notificationList = notifyRepository.findAllByReceiverOrderByCreatedAtDesc(users);
        return NotifyResponseDto.createList(notificationList);
    }
    @Slf4j
    @Service
    @RequiredArgsConstructor
    public class CommentsService {
    private final CommentsRepository commentsRepository;
    private final PostsRepository postsRepository;
    private final NotifyService notifyService;
    
    // 댓글 생성
    public MessageResponseDto commentsCreate(Long postId,
                                              CommentsRequestDto requestDto,
                                              Users users) {

        Posts posts = postsRepository.findById(postId).orElseThrow(
                () -> new CustomException(ErrorCode.POST_NOT_EXIST)); // 존재하지 않는 게시글입니다

        Comments comments = new Comments(requestDto, users, posts);

        commentsRepository.save(comments);

        // 댓글이 자신의 게시물에 작성된 것인지 확인
        if (!posts.getUsers().getEmail().equals(comments.getEmail())) {
            log.info("posts.getUsers():{}", posts.getUsers().getId());
            log.info("당신의 게시물에 새로운 댓글이 작성되었습니다");
            notifyService.send(posts.getUsers(), "당신의 게시물에 새로운 댓글이 작성되었습니다: " + comments.getContents());
        }

        return new MessageResponseDto ("댓글을 작성하였습니다", 200);
    }

 

6. 결과

    알림이 확인되지 않음