23.11.09 항해 99 16기 실전 프로젝트 32일차
오늘 공부한 것
* SSE 를 활용한 알림 기능 코드 작성 완료
SSE 활용 이유는 아래와 같다
https://nodaji1012-hanghae99-16.tistory.com/111
23.11.04 항해 99 16기 실전 프로젝트 28일차
오늘 공부한 것 * 유저 테스트 홍보 * SSE 를 활용한 알림 기능 공부 오늘은 오전에 유저 테스트를 홍보하고 다녔다항해99 에서 제공해준 사이트들을 다니며 아이디, 비밀번호도 찾고 새로가입도
nodaji1012-hanghae99-16.tistory.com
오늘은 어제 완료하지 못한 알림기능을 완료했다
사실 어디서부터 꼬인건지 찾지 못해서 코드를 처음부터 다 손봤다
정말 떨렸는데 성공해서 정말 다행이다내일까지 했다면.. 포기했을지도 모르겠다
참고 블로그는 아래와 같다
[Project] SSE(Servcer-Sent-Events)로 실시간 알림 기능 구현하기 !
1\. Server-Sent Events (SSE) 프로토콜 지원SseEmitter는 Spring에서 SSE 프로토콜을 지원하기 위한 클래스이므로이를 통해 실시간으로 업데이트되는 데이터나 알림과 같은 이벤트를 클라이언트에게 전달
velog.io
https://gilssang97.tistory.com/69
알림 기능을 구현해보자 - SSE(Server-Sent-Events)!
시작하기에 앞서 이번에 개발을 진행하면서 알림에 대한 요구사항을 만족시켜야하는 상황이 발생했다. 여기서 말하는 알림이 무엇인지 자세하게 살펴보자. A라는 사람이 스터디를 생성했고 B라
gilssang97.tistory.com
1. Controller
@RestController
@RequiredArgsConstructor
@RequestMapping("/api")
public class NotifyController {
private final NotifyService notifyService;
// 알림 구독 기능 수행
@Operation(summary = "알림 구독 기능 수행", description = "알림 구독 기능 수행 api 입니다.")
@GetMapping(value ="/subscribe", produces = "text/event-stream")
public SseEmitter subscribe (@AuthenticationPrincipal UserDetailsImpl userDetails,
@RequestHeader(value = "Last-Event-Id", required = false, defaultValue = "") String lastEventid) {
return notifyService.subscribe(userDetails.getUsers(), lastEventid);
}
// 전체 알림 조회
@Operation(summary = "전체 알림 조회", description = "전체 알림 조회 api 입니다.")
@GetMapping("/notify")
public List<NotifyResponseDto> notifyList(@AuthenticationPrincipal UserDetailsImpl userDetails) {
return notifyService.notifyList(userDetails.getUsers());
}
// 알림 삭제
@Operation(summary = "알림 삭제", description = "알림 삭제 api 입니다.")
@DeleteMapping("/notify/{notifyId}")
public MessageResponseDto notifyDelete (@PathVariable("notifyId") Long notifyId,
@AuthenticationPrincipal UserDetailsImpl userDetails) {
return notifyService.notifyDelete(notifyId, userDetails.getUsers());
}
}
2. Dto
@Getter
@NoArgsConstructor
public class UserDto {
private Long UsersId;
public UserDto(Users users) {
this.UsersId = users.getId();
}
}
@Getter
public class NotifyResponseDto {
private Long notifyId;
private UserDto receiver;
private UserDto sender;
private String message;
private boolean isRead;
public NotifyResponseDto(Notify notify) {
this.notifyId = notify.getId();
this.receiver = new UserDto((notify.getReceiver()));
this.sender = new UserDto(notify.getSender());
this.message = notify.getMessage();
this.isRead = notify.getIsRead();
}
}
3. Entity
@Entity
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Notify extends TimeStamped {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "notify_id")
private Long id;
private String message;
// 읽었는지 여부 확인
@Column(nullable = false)
private Boolean isRead;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "receiver_id", nullable = false)
@OnDelete(action = OnDeleteAction.CASCADE)
private Users receiver;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "sender_id", nullable = false)
@OnDelete(action = OnDeleteAction.CASCADE)
private Users sender;
}
4. Repository
public interface EmitterRepository {
// Emitter 저장
SseEmitter save (String emitterId, SseEmitter sseEmitter);
// Event 저장
void saveEventCache (String emitterId, Object event);
// 해당 회원과 관련된 모든 Emitter를 찾는다
Map<String, SseEmitter> findAllEmitterStartWithByUsersId(String usersId);
// 해당 회원과 관련된 모든 이벤트를 찾는다
Map<String, Object> findAllEventCacheStartWithByUsersId(String usersId);
// Emitter 삭제
void deleteById (String id);
// 해당 회원과 관련된 모든 Emiitter를 지운다
void deleteAllEmitterStartWithId (String usersId);
// 해당 회원과 관련된 모든 event를 지운다
void deleteAllEventCacheStartWithId (String usersId);
}
@Repository
@NoArgsConstructor
public class EmitterRepositoryImpl implements EmitterRepository{
// ConcurrentHashMap 동시에 여러 스레드가 접근하더라도 안전하게 데이터를 조작할 수 있도록 보장
// 동시성 문제를 해결하고 맵에 데이터 저장 / 조회가능
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
// Emitter 저장
@Override
public SseEmitter save (String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId, sseEmitter);
return sseEmitter;
}
// Event 저장
@Override
public void saveEventCache(String eventCacheId, Object event) {
eventCache.put(eventCacheId, event);
}
// 구분자로 회원 ID를 사용하기에 StartWith를 사용 - 해당 회원과 관련된 모든 Emitter를 찾는다
@Override
public Map<String, SseEmitter> findAllEmitterStartWithByUsersId (String usersId) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(usersId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
// // 해당 회원과 관련된 모든 이벤트를 찾는다
@Override
public Map<String, Object> findAllEventCacheStartWithByUsersId (String usersId) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(usersId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
// Emitter 삭제
@Override
public void deleteById (String id) {
emitters.remove(id);
}
// 해당 회원과 관련된 모든 Emiitter를 지운다
@Override
public void deleteAllEmitterStartWithId (String usersId) {
emitters.forEach(
(key, emitter) -> {
if (key.startsWith(usersId)) {
emitters.remove(key);
}
}
);
}
// // 해당 회원과 관련된 모든 event를 지운다
@Override
public void deleteAllEventCacheStartWithId (String usersId) {
eventCache.forEach(
(key, emitter) ->{
if (key.startsWith(usersId)) {
eventCache.remove(key);
}
}
);
}
}
public interface NotifyRepository extends JpaRepository<Notify, Long> {
List<Notify> findByReceiver(Users users);
}
5. Service
1) NotifyService
@Service
@RequiredArgsConstructor
public class NotifyService {
// emitterId : SseEmitter 를 구분 / 관리하기 위한 식별자
// emitterRepository 에 저장되어 특정 클라이언트의 연결을 관리하는데 사용됨
// eventId : 개별 알림 이벤트를 식별하기 위한 고유 값
// 각 알림 이벤트는 고유한 eventId 를 가지고 있고 클라이언트에게 전송될 때 이벤트의 식별을 위해 사용됨
// subscribe() : 클라이언트와의 SSE 스트림 통신을 유지하면서 연결을 생성하고 유지
// send() : 알림을 생성하고 해당알림을 수신하는 모든 클라이언트에게 전송
// SSE 연결 지속시간 설정
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private final EmitterRepository emitterRepository;
private final NotifyRepository notifyRepository;
private static Map<Long, Integer> notifyCounts = new HashMap<>();
// 알림 구독 기능 수행
// Spring 에서 제공하는 SseEmitter 를 생성 후 저장한 다음
// 필요할 때마다 구독자가 생성한 SseEmitter를 불러와서 이벤트에 대한 응답 전송
// Controller 에서 가져온 수신자의 식별정보와 마지막 이벤트 식별자를 받음
public SseEmitter subscribe(Users users, String lastEventId) {
Long userId = users.getId();
// emitterID 생성 nickName 을 포함하여 SseEmitter 를 식별하기 위한 고유 아이디생성
String emitterId = userId + "_" + System.currentTimeMillis();
// 새로운 SseEmitter 객체를 생성하고 emitterId 를 키로 사용해 emitterRepository 에 저장
// 이렇게 생성된 SseEmitter 는 클라이언트에게 이벤트를 전송하는 역활 수행
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter((DEFAULT_TIMEOUT)));
// 완료, 타임아웃, 에러 발생시 SseEmitter를 emitterRepository 에서 삭제하도록 설정
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
emitter.onError((e) -> emitterRepository.deleteAllEmitterStartWithId(emitterId));
// 503 에러를 방지하기 위한 더미 이벤트 전송
sendNotify(emitter, emitterId, "EventStream Created. [userId=" + userId + "]");
if (lastEventId != null && !lastEventId.isEmpty()) {
Map<String, Object> events = emitterRepository.findAllEventCacheStartWithByUsersId(String.valueOf(userId));
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendNotify(emitter, entry.getKey(), entry.getValue()));
}
// 생성된 SseEmitter를 반환하여 클라이언트에게 전달
// 클라이언트는 이를 통해 서버로부터 알림 이벤트를 수신 / 처리 가능
return emitter;
}
// SsEmitter 객체를 사용해서 SSE 를 클라이언트에게 전송하는 역활
private void sendNotify(SseEmitter emitter, String eventId,Object data) {
try {
// 더미데이터 전송
emitter.send(SseEmitter.event()
.id(eventId) // 이벤트의 고유식별자 설정
.name("see") // 이벤트의 이름 설정
.data(data) // 이벤트로 전송할 데이터 설정
);
// 만약 클라이언트의 연결이 끊어져 해당 예외를 캐치하면 끊긴 SseEmitter 객체를 제거하여 정리
} catch (IOException exception) {
emitterRepository.deleteById(eventId);
throw new CustomException(ErrorCode.RUN_TIME_ERROR); // 연결에 실패했습니다
}
}
public void send(Users receiver, Users sender, String message) {
Notify notify = createNotify(receiver, sender, message);
// 이벤트 ID 생성 (SseEmitter로 전송되는 이벤트의 고유 식별자로 사용됨)
String eventId = String.valueOf(receiver.getId());
// 알림 객체 생성 및 저장 (수신자, 내용 저장)
notifyRepository.save(notify);
// 이 메서드는 해당 수신자에 연결된 모든 SseEmitter 객체를 가져와 알림을 전송합니다
// 알림을 수신하는 모든 수신자에게 알림을 전송하고 동시에 emitterRepository에 이벤트 캐시를 저장합니다
Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByUsersId(eventId);
emitters.forEach((key, emitter) -> {
// 데이터 캐시 저장 (유실된 데이터 처리를 위함)
emitterRepository.saveEventCache(key, notify);
// 데이터 전송
sendNotify(emitter, key, new NotifyResponseDto(notify));
});
}
// 파라미터로 받은 값들로 알림 객체를 bulder 를 이용해 생성
private Notify createNotify(Users receiver, Users sender, String message) {
return Notify.builder()
.sender(sender)
.receiver(receiver)
.message(message)
.isRead(false)
.build();
}
// 조회
public List<NotifyResponseDto> notifyList(Users users) {
List<Notify> notify = notifyRepository.findByReceiver(users);
List<NotifyResponseDto> notifyResponseDto = notify.stream()
.map(NotifyResponseDto::new)
.collect(Collectors.toList());
return notifyResponseDto;
}
// 알림 삭제
public MessageResponseDto notifyDelete(Long notifyId, Users users) {
Notify notify = notifyRepository.findById(notifyId).orElseThrow(
() -> new CustomException(ErrorCode.NOTIFY_NOT_EXIST)); // 존재하지 않는 알림입니다
if (users.getUserRole() == UserRoleEnum.ADMIN) {
notifyRepository.delete(notify);
return new MessageResponseDto("관리자가 알림을 삭제하였습니다", 200);
} else if (notify.getReceiver().getId().equals(users.getId())) {
notifyRepository.delete(notify);
return new MessageResponseDto("알림을 삭제하였습니다", 200);
} else {
throw new CustomException(ErrorCode.NOT_ALLOWED); // 권한이 없습니다
}
}
}
2) PostsService
@Service
@RequiredArgsConstructor
@Transactional
public class PostsService {
private final PostsRepository postsRepository;
private final TripDateRepository tripDateRepository;
private final PostsLikeRepository postsLikeRepository;
private final UserRepository usersRepository;
private final CommentsRepository commentsRepository;
private final TagsRepository tagsRepository;
// 사진 저장을 위한 필드 선언
private final AmazonS3ResourceStorage amazonS3ResourceStorage;
private final AmazonS3Client amazonS3Client;
private final PostsPicturesRepository postsPicturesRepository;
@Value("${cloud.aws.s3.bucket}")
private String bucket;
private final NotifyService notifyService;
// 게시글 좋아요 및 좋아요 취소
public LikeResponseDto like(Long id, Users users) {
Posts posts = checkPosts(id); // 게시글 조회
Users existUser = checkUser(users); // 사용자 조회
PostsLike overlap = postsLikeRepository.findByPostsAndUsers(posts, existUser);
if (overlap != null) {
postsLikeRepository.delete(overlap); // 좋아요 삭제
posts.unlike(); // 해당 게시물 좋아요 취소시키는 메서드
return new LikeResponseDto("좋아요 취소", HttpServletResponse.SC_OK, false);
} else {
PostsLike postsLike = new PostsLike(posts, existUser);
postsLikeRepository.save(postsLike); // 좋아요 저장
posts.like(); // 해당 게시물 좋아요수 증가시키는 메서드
// 좋아요가 자신의 게시물에 작성된 것인지 확인
if (!posts.getUsers().getEmail().equals(existUser.getEmail())) {
notifyService.send(posts.getUsers(), users, "새로운 좋아요가 있습니다");
}
return new LikeResponseDto("좋아요 확인", HttpServletResponse.SC_OK, true);
}
}
3) CommentsService
@Service
@RequiredArgsConstructor
public class CommentsService {
private final CommentsRepository commentsRepository;
private final PostsRepository postsRepository;
private final NotifyService notifyService;
// 댓글 생성
public MessageResponseDto commentsCreate(Long postId,
CommentsRequestDto requestDto,
Users users) {
Posts posts = postsRepository.findById(postId).orElseThrow(
() -> new CustomException(ErrorCode.POST_NOT_EXIST)); // 존재하지 않는 게시글입니다
Comments comments = new Comments(requestDto, users, posts);
commentsRepository.save(comments);
// 댓글이 자신의 게시물에 작성된 것인지 확인
if (!posts.getUsers().getEmail().equals(comments.getEmail())) {
notifyService.send(posts.getUsers(), users, "새로운 댓글이 있습니다");
}
return new MessageResponseDto ("댓글을 작성하였습니다", 200);
}
4) RepliesService
@Service
@RequiredArgsConstructor
public class RepliesService {
private final RepliesRepository repliesRepository;
private final CommentsRepository commentsRepository;
private final NotifyService notifyService;
// 대댓글 생성
public MessageResponseDto repliesCreate(Long commentId,
RepliesRequestDto requestDto,
Users users) {
Comments comments = commentsRepository.findById(commentId).orElseThrow(
() -> new CustomException(ErrorCode.COMMENTS_NOT_EXIST)); // 존재하지 않는 댓글입니다
Replies replies = new Replies(requestDto, users, comments);
repliesRepository.save(replies);
// 댓글이 자신의 게시물에 작성된 것인지 확인
if (!comments.getPosts().getUsers().getEmail().equals(replies.getEmail())) {
notifyService.send(comments.getPosts().getUsers(), users, "새로운 대댓글이 있습니다");
}
return new MessageResponseDto ("대댓글을 작성하였습니다", 200);
}
6. 결과