8.스프링 클라우드 스트림을 사용한 이벤트 드리븐 아키텍처 (EDA)
레디스 설치 (후반부 캐시 부분에서 사용)
카프카 설치
도커기반, 깃허브 (비공식 표준)
내장된 주키퍼 사용
docker-compose-single-broker.yml 파일 확인 및 수정할 부분 수정
실행 (아래는 싱글브로커, 운영중에는 멀티 운영필요)
테스트
이벤트기반 아키텍처
동기식이거나 선형적이지 않고, 제한적인 요청-응답 모델도 아니라, 바로 끊임 없이 메시지를 주고 받는 메시지 기반
메시지를 사용해 상태변화를 표현 -> 이벤트로 통신한다는 개념
스프링 클라우드 스트림 + Kafka 활용
8.1 메시지와 EDA, 마이크로 서비스 사례
라이선싱과 조직 서비스 운영중
배포 후 라이선싱 -> 조직 서비스의 읽기 시간이 다소 느림 감지
조직서비스의 경우, 변경이 드물고 조직레코드의 기본키로 조직 서비스에서 데이터를 읽어오고 있음
데이터 엑세스 비용을 들이지 않고, 조직 데이터의 읽기를 캐싱 할 수 있다면 응답시간을 크게 향상 시킬 수 있음(핵심)
캐싱 3가지 핵심 요구사항
캐싱된 데이터는 라이선싱 서비스의 모든 인스턴스에 일관성 유지
동일한 조직 데이터 읽기가 보장 되어야 함을 의미
데이터를 라이선싱 서비스 안에 로컬 해싱해선 안된다는 것을 의미
즉, 레디스 같은 메모리 디비 활용하여 동기화(싱크작업) 필요
라이선싱 서비스를 호스팅하는 컨테이너 메모리에 캐싱 하면안됨
서비스를 호스팅하는 런타임 컨테이너는 종종 크기 제약 존재
다양한 액세스 패턴으로 데이터를 액세스 함
로컬캐시는 클러스터 내 다른 모든 서비스와 동기화를 보장 해야함, 복잡성도 증가함 -> 분리해야지?
업데이트나 삭제 연산으로 조직 레코드 변경 될 때 라이선싱 서비스는 조직서비스의 상태 변화를 인식 해야함
이런 구성으로 라이선싱 서비스가 캐싱된 특정 조직 데이터를 무효화하고 삭제할수 있게 함(분리)
2가지 접근법
동기식 요청-응답 모델
조직상태 변경 -> 라이선싱과 조직서비스는 REST 엔드포인트를 이용해서로 통신 (A<->B) 다이렉트 방식
조직서비스 변경에 대한 비동기 이벤트(메시지)발송
조직서비스는 자신 변경상태에(삭제,업데이트,추가 등) 대한 메시지를 발행하고, 큐에 발행(publish, kafka)
이후 라이선싱 서비스는 중개자(메시지 브로커,kafka)에게서 수신
8.1.1 동기식 요청-응답 방식으로 상태 전달
일반적으로 캐싱 데이터를 저장하기 위해 key-value 저장소 데이터인 레디스를 활용
실행 순서
캐싱데이터 읽기 (레디스, 조직데이터)
데이터가 없다면 ? 조직서비스에 데이터 요청
조직서비스의 변겨사항이 내포된 데이터 응답
변경에 대해 직접 응답하고 -> 캐시 데이터 레디스에 갱신-> 요청자에 응답
변경에 대해 레디스에 반영 -> 알림
문제점
서비스간 직접 통신(직접 연결된 경우). 강한 결합
데이터 조회를 위해 조직서비스에 항상 의존
캐싱 무효화
직접 조직서비스가 레디스 서버에 접근하여 캐시 데이터 삭제 및 갱신
이 경우, 라이선싱 서비스가 소유하고 있는 레디스 서버에 직접 접근하기 때문에 그 자체로 문제. 다른 서비스가 소유하는 데이터 저장소와 통신하게 되므로!! (MSA에서 금기사항)
라이선싱 서비스가 오픈한 무효화 용도의 End Point를 호출하여 갱신
쉽게 깨지는 서비스 관계
캐시 변경을 무효화 하는 라이선싱 서비스의 엔드포인트 변경 -> 조직서비스도 따라서 변경되어야 하는 경우. (종속성 발생)
라이선싱 서비스가 다운 or 느려지면 -> 조직 서비스와 직접 통신 하므로 영향을 받게 됨
또한, 두 서비스는 레디스와 직접 연결되어 있으므로 레디스서버가 다운되면 두서비스가 영향 받는다
조직 서비스 변경에 관심 있는 새 소비자를 추가할 때 경직성
서비스를 추가할 때, 조직 서비스의 코드가 변경되고 재배포 해야하는 상황 발생
거미줄 모양의 의존성 패턴 발생 -> SPOF(단일장애지점) 발생의 원인
8.1.2 메시징을 사용해 서비스간 상태변화 전달 (핵심)
특징
느슨한 결합
메시지큐를 중개자로 하여 두서비스간의 관심사가 분리된다
조직서비스는 큐에 상태변화를 큐에 발행하고
라이선싱 서비스는 큐에 발행된 메시지를 확인하여 수신
서로의 연결고리를 분리함으로써 영향도를 제거
내구성
라이선싱 서비스가 다운되더라도 조직서비스에 영향을 미치지 않게 됨
따라서, 조직서비스는 계속 변경사항을 큐에 발행이 가능하여 큐에 쌓아둠
라이선싱이 재시작 되었을때, 큐에 발행된 정보를 확인하여 캐싱무효화 작업 및 데이터 처리를 수행할 수 있게 됨
반대로 조직서비스가 다운되더라도, 캐시(레디스)에 데이터가 존재하므로 서비스가 유지 된다
확장성
소비자의 속도가 느리다면 소비자의 스레드 증가 -> 문제는 CPU 종속
생산자의 속도가 느리다면 생산자의 스레드 증가 -> 문제는 CPU 종속
큐 -> 수평확장이 가능하기 때문에 CPU등 걱정이 필요없
유연성
메시지 발신자는 누가 소비할지 모른다
발신 서비스에 영향을 주지 않고 새로운 기능을 애플리케이션에 추가가 가능
새로운 기능 -> 발행되는 이벤트를 수신 해서 적절히 대응
단점
메시지처리의 의미론 (message handling semantics)
생산/소비 외 그이상의 지식이 필요
메시지의 소비 순서 기반으로 어떻게 동작할지 이해 필
순서대로 처리되지 않을때 어떤 일이 발생할지 이해 필요
한 고객의 모든주문을 순서대로 처리해야 한다는 엄격한 요구 사항
모든 메시지를 서로 독립적으로 소비하는 방식
다르게 처리하도록 설정하고 구성
메시지가 에러나면?
재시도할것인가?
메시지 가시성 (message visibility)
메시지 코레오그래(무용술??)
8.2 Spring Cloud Stream
메시징은 복잡하지만 매우 강력하다
애플리케이션에 메시지 발행자/소비자를 쉽게 구축할 수 있는 애노테이션 기반 프레임워크
메시징 플랫폼의 구현 세부 사항을 추상화
Kafka, RabbitMQ 포함하여 사용 가능
특정 플랫폼을 위한 세부구현 -> 애플리케이션 코드와 분리됨-> 메시지 발행/소비 구현은 플랫폼 중립적인
스프링 인터페이스
로 수행
8.2.1 Spring Cloud Stream 아키텍처
한 서비스가 발행자(publisher) / 한 서비스가 소비자(consumer)
실행순서는 1(로직실행후 데이터 전달) -> 2(메시지 발행, 소스->채널->바인더) -> 3(큐) -> 4(메시지 수신, 바인더->채널->소스) -> 5 (로직실행에 사용)
8.3 간단한 메시지 생산자와 소비자 작성
스프링 클라우드에서 메시징기반을 위한 4개 컴포넌트
소스
서비스가 메시지 발행 준비가 되면, 소스를 사용해 메시지를 발행
publisher
POJO를 전달 받는 애노테이션 (@Source)
메시지 수신 -> 직렬화(기본 JSON) 하여 채널로 송신
채널
메시지 생산자와 소비자가 메시지를 발행/소비 후 메시지를 보관할 큐를 추상화
소스 부분과 싱크 부분 위치에 존재
채널이름
대상 큐의 이름과 관련 있으나 코드에서 큐 이름을 직접 사용하지 않고 채널이름을 사용하게 됨
따라서 채널이 읽거나 쓰는 큐를 전환 하려면 애플리케이션 코드가 아닌 구성정보를 변경
바인더
Spring Cloud Stream의 일부인 스프링 코드로 특정 메시지 플랫폼(Kafka, RabbitMQ)등 과 통신
메시지 발행/소비 위해 별도의 라이브러리(Kafka,RabbitMQ 등 연계)를 제공하지 않고도 메시징 사용 가능
싱크
큐에서 메시지를 수신
채널에서 수신대기 -> 싱크로 전달
메시지를 POJO로 역직렬화 (JSON->Java Object) 디폴트
8.3.1 조직 서비스의 메시지 생산자 작성
조직서비스의 데이터 변경(추가,삭제,갱신) 발생 -> 큐에 변경알림 발행
라이브러리
Spring Cloud Stream -> spring-cloud-stream
Kafka Library -> spring-cloud-starter-stream-kafka
생산자 메시지 발행 순서
메시지 브로커 바인딩
@EnableBinding(Source.class) 애노테이션 추가
이 애노테이션은 Spring Cloud Stream에 애플리케이션을 메시지 브로커(Kafka)와 바인딩하라고 알림
Source 클래스
정의된 채널들을 이용 메시지 브로커와 통신
Spring Cloud Stream은 메시지 브로커와 통신할 수 있는 기본채널이 존재
단일 채널에만 발행시 편리함
메시지 발행 (서비스 로직)
아래 코드에서 메시지 전달 POJO
OrganizationChangeModel
Object -> JSON으로 직렬화
이외 XML 및 아파치 아브로 포맷을 포함한 다양한 포맷 직렬화가 가능
채널
메시지 토픽 -> 채널로 매핑
자바 인터페이스로 정의 -> 채널에서 Source 인터페이스를 사용
ouput() 메서드 ->
MessageChannel 반환
MessageChannel
-> 메시지 브로커에 메시지를 전달하는 방법정의MessageBuilder
Spring에서 제공하는 헬퍼 클래스
Object -> Message 로 변경해줌
send(Message.class) 시그니처임
환경설정
Spring Cloud Stream의 Source -> 메시지 브로커, 토픽 매핑
소스 부분
Application
Controller