항해99

23.11.09 항해 99 16기 실전 프로젝트 32일차

김용글 2023. 11. 9. 22:13

오늘 공부한 것

* SSE 를 활용한 알림 기능 코드 작성 완료

 

SSE 활용 이유는 아래와 같다

https://nodaji1012-hanghae99-16.tistory.com/111

 

23.11.04 항해 99 16기 실전 프로젝트 28일차

오늘 공부한 것 * 유저 테스트 홍보 * SSE 를 활용한 알림 기능 공부 오늘은 오전에 유저 테스트를 홍보하고 다녔다항해99 에서 제공해준 사이트들을 다니며 아이디, 비밀번호도 찾고 새로가입도

nodaji1012-hanghae99-16.tistory.com

 

오늘은 어제 완료하지 못한 알림기능을 완료했다

사실 어디서부터 꼬인건지 찾지 못해서 코드를 처음부터 다 손봤다

정말 떨렸는데 성공해서 정말 다행이다내일까지 했다면.. 포기했을지도 모르겠다

 

참고 블로그는 아래와 같다

https://velog.io/@wnguswn7/Project-SseEmitter%EB%A1%9C-%EC%95%8C%EB%A6%BC-%EA%B8%B0%EB%8A%A5-%EA%B5%AC%ED%98%84%ED%95%98%EA%B8%B0

 

[Project] SSE(Servcer-Sent-Events)로 실시간 알림 기능 구현하기 !

1\. Server-Sent Events (SSE) 프로토콜 지원SseEmitter는 Spring에서 SSE 프로토콜을 지원하기 위한 클래스이므로이를 통해 실시간으로 업데이트되는 데이터나 알림과 같은 이벤트를 클라이언트에게 전달

velog.io

https://gilssang97.tistory.com/69

 

알림 기능을 구현해보자 - SSE(Server-Sent-Events)!

시작하기에 앞서 이번에 개발을 진행하면서 알림에 대한 요구사항을 만족시켜야하는 상황이 발생했다. 여기서 말하는 알림이 무엇인지 자세하게 살펴보자. A라는 사람이 스터디를 생성했고 B라

gilssang97.tistory.com

 

1. Controller

@RestController
@RequiredArgsConstructor
@RequestMapping("/api")
public class NotifyController {
    private final NotifyService notifyService;

    // 알림 구독 기능 수행
    @Operation(summary = "알림 구독 기능 수행", description = "알림 구독 기능 수행 api 입니다.")
    @GetMapping(value ="/subscribe", produces = "text/event-stream")
    public SseEmitter subscribe (@AuthenticationPrincipal UserDetailsImpl userDetails,
                                 @RequestHeader(value = "Last-Event-Id", required = false, defaultValue = "") String lastEventid) {
        return notifyService.subscribe(userDetails.getUsers(), lastEventid);
    }

    // 전체 알림 조회
    @Operation(summary = "전체 알림 조회", description = "전체 알림 조회 api 입니다.")
    @GetMapping("/notify")
    public List<NotifyResponseDto> notifyList(@AuthenticationPrincipal UserDetailsImpl userDetails) {
        return notifyService.notifyList(userDetails.getUsers());
    }

    // 알림 삭제
    @Operation(summary = "알림 삭제", description = "알림 삭제 api 입니다.")
    @DeleteMapping("/notify/{notifyId}")
    public MessageResponseDto notifyDelete (@PathVariable("notifyId") Long notifyId,
                                            @AuthenticationPrincipal UserDetailsImpl userDetails) {

        return notifyService.notifyDelete(notifyId, userDetails.getUsers());
    }
}

 

2. Dto

@Getter
@NoArgsConstructor
public class UserDto {
    private Long UsersId;

    public UserDto(Users users) {
        this.UsersId = users.getId();
    }
}
@Getter
public class NotifyResponseDto {
    private Long notifyId;
    private UserDto receiver;
    private UserDto sender;
    private String message;
    private boolean isRead;

    public NotifyResponseDto(Notify notify) {
        this.notifyId = notify.getId();
        this.receiver = new UserDto((notify.getReceiver()));
        this.sender = new UserDto(notify.getSender());
        this.message = notify.getMessage();
        this.isRead = notify.getIsRead();
    }
}

 

3. Entity

@Entity
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Notify extends TimeStamped {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "notify_id")
    private Long id;

    private String message;

    // 읽었는지 여부 확인
    @Column(nullable = false)
    private Boolean isRead;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "receiver_id", nullable = false)
    @OnDelete(action = OnDeleteAction.CASCADE)
    private Users receiver;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "sender_id", nullable = false)
    @OnDelete(action = OnDeleteAction.CASCADE)
    private Users sender;
}

 

4. Repository

public interface EmitterRepository {

    // Emitter 저장
    SseEmitter save (String emitterId, SseEmitter sseEmitter);

    // Event 저장
    void saveEventCache (String emitterId, Object event);

    // 해당 회원과 관련된 모든 Emitter를 찾는다
    Map<String, SseEmitter> findAllEmitterStartWithByUsersId(String usersId);

    // 해당 회원과 관련된 모든 이벤트를 찾는다
    Map<String, Object> findAllEventCacheStartWithByUsersId(String usersId);

    // Emitter 삭제
    void deleteById (String id);

    // 해당 회원과 관련된 모든 Emiitter를 지운다
    void deleteAllEmitterStartWithId (String usersId);

    // 해당 회원과 관련된 모든 event를 지운다
    void deleteAllEventCacheStartWithId (String usersId);
}
@Repository
@NoArgsConstructor
public class EmitterRepositoryImpl implements EmitterRepository{
    // ConcurrentHashMap 동시에 여러 스레드가 접근하더라도 안전하게 데이터를 조작할 수 있도록 보장
    // 동시성 문제를 해결하고 맵에 데이터 저장 / 조회가능
    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();

    private final Map<String, Object> eventCache = new ConcurrentHashMap<>();

    // Emitter 저장
    @Override
    public SseEmitter save (String emitterId, SseEmitter sseEmitter) {
        emitters.put(emitterId, sseEmitter);
        return sseEmitter;
    }

    // Event 저장
    @Override
    public void saveEventCache(String eventCacheId, Object event) {
        eventCache.put(eventCacheId, event);
    }

    // 구분자로 회원 ID를 사용하기에 StartWith를 사용 - 해당 회원과 관련된 모든 Emitter를 찾는다
    @Override
    public Map<String, SseEmitter> findAllEmitterStartWithByUsersId (String usersId) {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(usersId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    //  // 해당 회원과 관련된 모든 이벤트를 찾는다
    @Override
    public Map<String, Object> findAllEventCacheStartWithByUsersId (String usersId) {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(usersId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    // Emitter 삭제
    @Override
    public void deleteById (String id) {
        emitters.remove(id);
    }

    // 해당 회원과 관련된 모든 Emiitter를 지운다
    @Override
    public void deleteAllEmitterStartWithId (String usersId) {
        emitters.forEach(
                (key, emitter) -> {
                    if (key.startsWith(usersId)) {
                        emitters.remove(key);
                    }
                }
        );
    }

    // // 해당 회원과 관련된 모든 event를 지운다
    @Override
    public void deleteAllEventCacheStartWithId (String usersId) {
        eventCache.forEach(
                (key, emitter) ->{
                    if (key.startsWith(usersId)) {
                        eventCache.remove(key);
                    }
                }
        );
    }
}
public interface NotifyRepository extends JpaRepository<Notify, Long> {

    List<Notify> findByReceiver(Users users);
}

 

5. Service

 1) NotifyService

@Service
@RequiredArgsConstructor
public class NotifyService {

    // emitterId : SseEmitter 를 구분 / 관리하기 위한 식별자
    //             emitterRepository 에 저장되어 특정 클라이언트의 연결을 관리하는데 사용됨
    // eventId : 개별 알림 이벤트를 식별하기 위한 고유 값
    //           각 알림 이벤트는 고유한 eventId 를 가지고 있고 클라이언트에게 전송될 때 이벤트의 식별을 위해 사용됨
    // subscribe() : 클라이언트와의 SSE 스트림 통신을 유지하면서 연결을 생성하고 유지
    // send() : 알림을 생성하고 해당알림을 수신하는 모든 클라이언트에게 전송

    // SSE 연결 지속시간 설정
    private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;

    private final EmitterRepository emitterRepository;
    private final NotifyRepository notifyRepository;

    private static Map<Long, Integer> notifyCounts = new HashMap<>();

    // 알림 구독 기능 수행
    // Spring 에서 제공하는 SseEmitter 를 생성 후 저장한 다음
    // 필요할 때마다 구독자가 생성한 SseEmitter를 불러와서 이벤트에 대한 응답 전송
    // Controller 에서 가져온 수신자의 식별정보와 마지막 이벤트 식별자를 받음
    public SseEmitter subscribe(Users users, String lastEventId) {

        Long userId = users.getId();
        // emitterID 생성 nickName 을 포함하여 SseEmitter 를 식별하기 위한 고유 아이디생성
        String emitterId = userId + "_" + System.currentTimeMillis();

        // 새로운 SseEmitter 객체를 생성하고 emitterId 를 키로 사용해 emitterRepository 에 저장
        // 이렇게 생성된 SseEmitter 는 클라이언트에게 이벤트를 전송하는 역활 수행
        SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter((DEFAULT_TIMEOUT)));

        // 완료, 타임아웃, 에러 발생시 SseEmitter를 emitterRepository 에서 삭제하도록 설정
        emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
        emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
        emitter.onError((e) -> emitterRepository.deleteAllEmitterStartWithId(emitterId));

        // 503 에러를 방지하기 위한 더미 이벤트 전송
        sendNotify(emitter, emitterId, "EventStream Created. [userId=" + userId + "]");

        if (lastEventId != null && !lastEventId.isEmpty()) {
            Map<String, Object> events = emitterRepository.findAllEventCacheStartWithByUsersId(String.valueOf(userId));
            events.entrySet().stream()
                    .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
                    .forEach(entry -> sendNotify(emitter, entry.getKey(), entry.getValue()));
        }
        // 생성된 SseEmitter를 반환하여 클라이언트에게 전달
        // 클라이언트는 이를 통해 서버로부터 알림 이벤트를 수신 / 처리 가능
        return emitter;
    }

    // SsEmitter 객체를 사용해서 SSE 를 클라이언트에게 전송하는 역활
    private void sendNotify(SseEmitter emitter, String eventId,Object data) {
        try {
            // 더미데이터 전송
            emitter.send(SseEmitter.event()
                    .id(eventId) // 이벤트의 고유식별자 설정
                    .name("see") // 이벤트의 이름 설정
                    .data(data) // 이벤트로 전송할 데이터 설정
            );
            // 만약 클라이언트의 연결이 끊어져 해당 예외를 캐치하면 끊긴 SseEmitter 객체를 제거하여 정리
        } catch (IOException exception) {
            emitterRepository.deleteById(eventId);
            throw new CustomException(ErrorCode.RUN_TIME_ERROR); // 연결에 실패했습니다
        }
    }

    public void send(Users receiver, Users sender, String message) {
        Notify notify = createNotify(receiver, sender, message);
        // 이벤트 ID 생성 (SseEmitter로 전송되는 이벤트의 고유 식별자로 사용됨)
        String eventId = String.valueOf(receiver.getId());
        // 알림 객체 생성 및 저장 (수신자, 내용 저장)
        notifyRepository.save(notify);

        // 이 메서드는 해당 수신자에 연결된 모든 SseEmitter 객체를 가져와 알림을 전송합니다
        // 알림을 수신하는 모든 수신자에게 알림을 전송하고 동시에 emitterRepository에 이벤트 캐시를 저장합니다
        Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByUsersId(eventId);

        emitters.forEach((key, emitter) -> {
            // 데이터 캐시 저장 (유실된 데이터 처리를 위함)
            emitterRepository.saveEventCache(key, notify);
            // 데이터 전송
            sendNotify(emitter, key, new NotifyResponseDto(notify));
        });
    }

    // 파라미터로 받은 값들로 알림 객체를 bulder 를 이용해 생성
    private Notify createNotify(Users receiver, Users sender, String message) {
        return Notify.builder()
                .sender(sender)
                .receiver(receiver)
                .message(message)
                .isRead(false)
                .build();
    }

    // 조회
    public List<NotifyResponseDto> notifyList(Users users) {
        List<Notify> notify = notifyRepository.findByReceiver(users);
        List<NotifyResponseDto> notifyResponseDto = notify.stream()
                .map(NotifyResponseDto::new)
                .collect(Collectors.toList());
        return notifyResponseDto;
    }

    // 알림 삭제
    public MessageResponseDto notifyDelete(Long notifyId, Users users) {

        Notify notify = notifyRepository.findById(notifyId).orElseThrow(
                () -> new CustomException(ErrorCode.NOTIFY_NOT_EXIST)); // 존재하지 않는 알림입니다

        if (users.getUserRole() == UserRoleEnum.ADMIN) {
            notifyRepository.delete(notify);
            return new MessageResponseDto("관리자가 알림을 삭제하였습니다", 200);
        } else if (notify.getReceiver().getId().equals(users.getId())) {
            notifyRepository.delete(notify);
            return new MessageResponseDto("알림을 삭제하였습니다", 200);
        } else {
            throw new CustomException(ErrorCode.NOT_ALLOWED); // 권한이 없습니다
        }
    }
}

 

 2) PostsService

    @Service
    @RequiredArgsConstructor
    @Transactional
    public class PostsService {
    
    private final PostsRepository postsRepository;
    private final TripDateRepository tripDateRepository;
    private final PostsLikeRepository postsLikeRepository;
    private final UserRepository usersRepository;
    private final CommentsRepository commentsRepository;
    private final TagsRepository tagsRepository;

    // 사진 저장을 위한 필드 선언
    private final AmazonS3ResourceStorage amazonS3ResourceStorage;
    private final AmazonS3Client amazonS3Client;
    private final PostsPicturesRepository postsPicturesRepository;

    @Value("${cloud.aws.s3.bucket}")
    private String bucket;

    private final NotifyService notifyService;
    
    // 게시글 좋아요 및 좋아요 취소
    public LikeResponseDto like(Long id, Users users) {
        Posts posts = checkPosts(id); // 게시글 조회

        Users existUser = checkUser(users); // 사용자 조회

        PostsLike overlap = postsLikeRepository.findByPostsAndUsers(posts, existUser);
        if (overlap != null) {
            postsLikeRepository.delete(overlap); // 좋아요 삭제
            posts.unlike(); // 해당 게시물 좋아요 취소시키는 메서드
            return new LikeResponseDto("좋아요 취소", HttpServletResponse.SC_OK, false);
        } else {
            PostsLike postsLike = new PostsLike(posts, existUser);
            postsLikeRepository.save(postsLike); // 좋아요 저장
            posts.like(); // 해당 게시물 좋아요수 증가시키는 메서드

            // 좋아요가 자신의 게시물에 작성된 것인지 확인
            if (!posts.getUsers().getEmail().equals(existUser.getEmail())) {
                notifyService.send(posts.getUsers(), users, "새로운 좋아요가 있습니다");
            }
            return new LikeResponseDto("좋아요 확인", HttpServletResponse.SC_OK, true);
        }
    }

 

 3) CommentsService

@Service
@RequiredArgsConstructor
public class CommentsService {
    private final CommentsRepository commentsRepository;
    private final PostsRepository postsRepository;
    private final NotifyService notifyService;

    // 댓글 생성
    public MessageResponseDto commentsCreate(Long postId,
                                              CommentsRequestDto requestDto,
                                              Users users) {

        Posts posts = postsRepository.findById(postId).orElseThrow(
                () -> new CustomException(ErrorCode.POST_NOT_EXIST)); // 존재하지 않는 게시글입니다

        Comments comments = new Comments(requestDto, users, posts);

        commentsRepository.save(comments);

        // 댓글이 자신의 게시물에 작성된 것인지 확인
        if (!posts.getUsers().getEmail().equals(comments.getEmail())) {
            notifyService.send(posts.getUsers(), users, "새로운 댓글이 있습니다");
        }

        return new MessageResponseDto ("댓글을 작성하였습니다", 200);
    }

 

 4) RepliesService

@Service
@RequiredArgsConstructor

public class RepliesService {
    private final RepliesRepository repliesRepository;
    private final CommentsRepository commentsRepository;
    private final NotifyService notifyService;

    // 대댓글 생성
    public MessageResponseDto repliesCreate(Long commentId,
                                            RepliesRequestDto requestDto,
                                            Users users) {

        Comments comments = commentsRepository.findById(commentId).orElseThrow(
                () -> new CustomException(ErrorCode.COMMENTS_NOT_EXIST)); // 존재하지 않는 댓글입니다

        Replies replies = new Replies(requestDto, users, comments);
        repliesRepository.save(replies);

        // 댓글이 자신의 게시물에 작성된 것인지 확인
        if (!comments.getPosts().getUsers().getEmail().equals(replies.getEmail())) {
            notifyService.send(comments.getPosts().getUsers(), users, "새로운 대댓글이 있습니다");
        }

        return new MessageResponseDto ("대댓글을 작성하였습니다", 200);
    }

 

6. 결과