8.스프링 클라우드 스트림을 사용한 이벤트 드리븐 아키텍처 (EDA)

레디스 설치 (후반부 캐시 부분에서 사용)

$ docker run --name redis -p 6379:6379 redis

카프카 설치

  • 도커기반, 깃허브 (비공식 표준)

$ git clone https://github.com/wurstmeister/kafka-docker
$ cd kafka-docker
  • 내장된 주키퍼 사용

    • docker-compose-single-broker.yml 파일 확인 및 수정할 부분 수정

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    build: .
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  • 실행 (아래는 싱글브로커, 운영중에는 멀티 운영필요)

$ docker-compose -f docker-compose-single-broker.yml up
  • 테스트

# 테스를 위해 kafka 테스트용 다운 
$ wget http://mirror.navercorp.com/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
$ tar -zxvf kafka_2.12-2.6.0.tgz
$ cd kafka_2.12-2.6.0

# test_topic 이란 토픽 생성
$ bin/kafka-topics.sh --create --topic test_topic --zookeeper localhost:2181 --partitions 1 --replication-factor 1

# 토픽 생성 확인
$ bin/kafka-topics.sh --zookeeper localhost:2181 --list

# 결과 (외, 토픽 리스트가 출력됨)
test_topic

이벤트기반 아키텍처

  • 동기식이거나 선형적이지 않고, 제한적인 요청-응답 모델도 아니라, 바로 끊임 없이 메시지를 주고 받는 메시지 기반

  • 메시지를 사용해 상태변화를 표현 -> 이벤트로 통신한다는 개념

  • 스프링 클라우드 스트림 + Kafka 활용

8.1 메시지와 EDA, 마이크로 서비스 사례

  • 라이선싱과 조직 서비스 운영중

  • 배포 후 라이선싱 -> 조직 서비스의 읽기 시간이 다소 느림 감지

    • 조직서비스의 경우, 변경이 드물고 조직레코드의 기본키로 조직 서비스에서 데이터를 읽어오고 있음

    • 데이터 엑세스 비용을 들이지 않고, 조직 데이터의 읽기를 캐싱 할 수 있다면 응답시간을 크게 향상 시킬 수 있음(핵심)

캐싱 3가지 핵심 요구사항

  1. 캐싱된 데이터는 라이선싱 서비스의 모든 인스턴스에 일관성 유지

    • 동일한 조직 데이터 읽기가 보장 되어야 함을 의미

    • 데이터를 라이선싱 서비스 안에 로컬 해싱해선 안된다는 것을 의미

      • 즉, 레디스 같은 메모리 디비 활용하여 동기화(싱크작업) 필요

  2. 라이선싱 서비스를 호스팅하는 컨테이너 메모리에 캐싱 하면안됨

    • 서비스를 호스팅하는 런타임 컨테이너는 종종 크기 제약 존재

    • 다양한 액세스 패턴으로 데이터를 액세스 함

    • 로컬캐시는 클러스터 내 다른 모든 서비스와 동기화를 보장 해야함, 복잡성도 증가함 -> 분리해야지?

  3. 업데이트나 삭제 연산으로 조직 레코드 변경 될 때 라이선싱 서비스는 조직서비스의 상태 변화를 인식 해야함

    • 이런 구성으로 라이선싱 서비스가 캐싱된 특정 조직 데이터를 무효화하고 삭제할수 있게 함(분리)

2가지 접근법

  1. 동기식 요청-응답 모델

    • 조직상태 변경 -> 라이선싱과 조직서비스는 REST 엔드포인트를 이용해서로 통신 (A<->B) 다이렉트 방식

  2. 조직서비스 변경에 대한 비동기 이벤트(메시지)발송

    • 조직서비스는 자신 변경상태에(삭제,업데이트,추가 등) 대한 메시지를 발행하고, 큐에 발행(publish, kafka)

    • 이후 라이선싱 서비스는 중개자(메시지 브로커,kafka)에게서 수신

8.1.1 동기식 요청-응답 방식으로 상태 전달

  • 일반적으로 캐싱 데이터를 저장하기 위해 key-value 저장소 데이터인 레디스를 활용

실행 순서

  1. 캐싱데이터 읽기 (레디스, 조직데이터)

  2. 데이터가 없다면 ? 조직서비스에 데이터 요청

  3. 조직서비스의 변겨사항이 내포된 데이터 응답

    1. 변경에 대해 직접 응답하고 -> 캐시 데이터 레디스에 갱신-> 요청자에 응답

    2. 변경에 대해 레디스에 반영 -> 알림

문제점

  • 서비스간 직접 통신(직접 연결된 경우). 강한 결합

    • 데이터 조회를 위해 조직서비스에 항상 의존

    • 캐싱 무효화

      • 직접 조직서비스가 레디스 서버에 접근하여 캐시 데이터 삭제 및 갱신

        • 이 경우, 라이선싱 서비스가 소유하고 있는 레디스 서버에 직접 접근하기 때문에 그 자체로 문제. 다른 서비스가 소유하는 데이터 저장소와 통신하게 되므로!! (MSA에서 금기사항)

      • 라이선싱 서비스가 오픈한 무효화 용도의 End Point를 호출하여 갱신

  • ​쉽게 깨지는 서비스 관계

    • 캐시 변경을 무효화 하는 라이선싱 서비스의 엔드포인트 변경 -> 조직서비스도 따라서 변경되어야 하는 경우. (종속성 발생)

    • 라이선싱 서비스가 다운 or 느려지면 -> 조직 서비스와 직접 통신 하므로 영향을 받게 됨

    • 또한, 두 서비스는 레디스와 직접 연결되어 있으므로 레디스서버가 다운되면 두서비스가 영향 받는다

  • 조직 서비스 변경에 관심 있는 새 소비자를 추가할 때 경직성

    • 서비스를 추가할 때, 조직 서비스의 코드가 변경되고 재배포 해야하는 상황 발생

    • 거미줄 모양의 의존성 패턴 발생 -> SPOF(단일장애지점) 발생의 원인

8.1.2 메시징을 사용해 서비스간 상태변화 전달 (핵심)

특징

  • 느슨한 결합

    • 메시지큐를 중개자로 하여 두서비스간의 관심사가 분리된다

    • 조직서비스는 큐에 상태변화를 큐에 발행하고

    • 라이선싱 서비스는 큐에 발행된 메시지를 확인하여 수신

    • 서로의 연결고리를 분리함으로써 영향도를 제거

  • 내구성

    • 라이선싱 서비스가 다운되더라도 조직서비스에 영향을 미치지 않게 됨

    • 따라서, 조직서비스는 계속 변경사항을 큐에 발행이 가능하여 큐에 쌓아둠

    • 라이선싱이 재시작 되었을때, 큐에 발행된 정보를 확인하여 캐싱무효화 작업 및 데이터 처리를 수행할 수 있게 됨

    • 반대로 조직서비스가 다운되더라도, 캐시(레디스)에 데이터가 존재하므로 서비스가 유지 된다

  • 확장성

    • 소비자의 속도가 느리다면 소비자의 스레드 증가 -> 문제는 CPU 종속

    • 생산자의 속도가 느리다면 생산자의 스레드 증가 -> 문제는 CPU 종속

    • 큐 -> 수평확장이 가능하기 때문에 CPU등 걱정이 필요없

  • 유연성

    • 메시지 발신자는 누가 소비할지 모른다

    • 발신 서비스에 영향을 주지 않고 새로운 기능을 애플리케이션에 추가가 가능

    • 새로운 기능 -> 발행되는 이벤트를 수신 해서 적절히 대응

단점

  • 메시지처리의 의미론 (message handling semantics)

    • 생산/소비 외 그이상의 지식이 필요

      • 메시지의 소비 순서 기반으로 어떻게 동작할지 이해 필

      • 순서대로 처리되지 않을때 어떤 일이 발생할지 이해 필요

    • 한 고객의 모든주문을 순서대로 처리해야 한다는 엄격한 요구 사항

      • 모든 메시지를 서로 독립적으로 소비하는 방식

      • 다르게 처리하도록 설정하고 구성

    • 메시지가 에러나면?

      • 재시도할것인가?

  • 메시지 가시성 (message visibility)

  • 메시지 코레오그래(무용술??)

8.2 Spring Cloud Stream

메시징은 복잡하지만 매우 강력하다

  • 애플리케이션에 메시지 발행자/소비자를 쉽게 구축할 수 있는 애노테이션 기반 프레임워크

  • 메시징 플랫폼의 구현 세부 사항을 추상화

  • Kafka, RabbitMQ 포함하여 사용 가능

  • 특정 플랫폼을 위한 세부구현 -> 애플리케이션 코드와 분리됨-> 메시지 발행/소비 구현은 플랫폼 중립적인 스프링 인터페이스로 수행

8.2.1 Spring Cloud Stream 아키텍처

  • 한 서비스가 발행자(publisher) / 한 서비스가 소비자(consumer)

  • 실행순서는 1(로직실행후 데이터 전달) -> 2(메시지 발행, 소스->채널->바인더) -> 3(큐) -> 4(메시지 수신, 바인더->채널->소스) -> 5 (로직실행에 사용)

8.3 간단한 메시지 생산자와 소비자 작성

스프링 클라우드에서 메시징기반을 위한 4개 컴포넌트

  • 소스

    • 서비스가 메시지 발행 준비가 되면, 소스를 사용해 메시지를 발행

    • publisher

    • POJO를 전달 받는 애노테이션 (@Source)

    • 메시지 수신 -> 직렬화(기본 JSON) 하여 채널로 송신

  • 채널

    • 메시지 생산자와 소비자가 메시지를 발행/소비 후 메시지를 보관할 큐를 추상화

    • 소스 부분과 싱크 부분 위치에 존재

    • 채널이름

      • 대상 큐의 이름과 관련 있으나 코드에서 큐 이름을 직접 사용하지 않고 채널이름을 사용하게 됨

    • 따라서 채널이 읽거나 쓰는 큐를 전환 하려면 애플리케이션 코드가 아닌 구성정보를 변경

  • 바인더

    • Spring Cloud Stream의 일부인 스프링 코드로 특정 메시지 플랫폼(Kafka, RabbitMQ)등 과 통신

    • 메시지 발행/소비 위해 별도의 라이브러리(Kafka,RabbitMQ 등 연계)를 제공하지 않고도 메시징 사용 가능

  • 싱크

    • 큐에서 메시지를 수신

    • 채널에서 수신대기 -> 싱크로 전달

    • 메시지를 POJO로 역직렬화 (JSON->Java Object) 디폴트

8.3.1 조직 서비스의 메시지 생산자 작성

조직서비스의 데이터 변경(추가,삭제,갱신) 발생 -> 큐에 변경알림 발행

라이브러리

  • Spring Cloud Stream -> spring-cloud-stream

  • Kafka Library -> spring-cloud-starter-stream-kafka

생산자 메시지 발행 순서

  1. 메시지 브로커 바인딩

    • @EnableBinding(Source.class) 애노테이션 추가

    • 이 애노테이션은 Spring Cloud Stream애플리케이션을 메시지 브로커(Kafka)와 바인딩하라고 알림

  2. Source 클래스

    • 정의된 채널들을 이용 메시지 브로커와 통신

    • Spring Cloud Stream은 메시지 브로커와 통신할 수 있는 기본채널이 존재

    • 단일 채널에만 발행시 편리함

  3. 메시지 발행 (서비스 로직)

    • 아래 코드에서 메시지 전달 POJOOrganizationChangeModel

    • Object -> JSON으로 직렬화

      • 이외 XML아파치 아브로 포맷을 포함한 다양한 포맷 직렬화가 가능

  4. 채널

    • 메시지 토픽 -> 채널로 매핑

    • 자바 인터페이스로 정의 -> 채널에서 Source 인터페이스를 사용

    • ouput() 메서드 -> MessageChannel 반환

    • MessageChannel -> 메시지 브로커에 메시지를 전달하는 방법정의

    • MessageBuilder

      • Spring에서 제공하는 헬퍼 클래스

      • Object -> Message 로 변경해줌

      • send(Message.class) 시그니처임

  5. 환경설정

    • Spring Cloud Stream의 Source -> 메시지 브로커, 토픽 매핑

소스 부분

  • Application

@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker
@EnableBinding(Source.class)
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
  • Controller

@RestController
@RequestMapping(value="v1/organizations")
public class OrganizationServiceController {

    @Autowired
    private OrganizationService orgService;

    @RequestMapping(value="/{organizationId}",method = RequestMethod.GET)
    public Organization getOrganization( @PathVariable("organizationId") String organizationId) {
        Organization org = orgService.getOrg(organizationId);
        org.setContactName(org.getContactName());
        return org;
    }

    @RequestMapping(value="/{organizationId}",method = RequestMethod.PUT)
    public void updateOrganization( @PathVariable("organizationId") String orgId, @RequestBody Organization org) {
        orgService.updateOrg( org );
    }

    @RequestMapping(value="/{organizationId}",method = RequestMethod.POST)
    public void saveOrganization(@RequestBody Organization org) {
       orgService.saveOrg( org );
    }

    @RequestMapping(value="/{organizationId}",method = RequestMethod.DELETE)
    @ResponseStatus(HttpStatus.NO_CONTENT)
    public void deleteOrganization( @PathVariable("organizationId") String orgId) {
        orgService.deleteOrg( orgId );
    }
}

  • Service

@Service
public class OrganizationService {

    @Autowired
    private OrganizationRepository orgRepository;

    @Autowired
    private SimpleSourceBean simpleSourceBean;

    public Organization getOrg(String organizationId) {
        return orgRepository.findById(organizationId).get();
    }

    public void saveOrg(Organization org){
        org.setId( UUID.randomUUID().toString());

        orgRepository.save(org);
        
        // save 메시지 발행
        simpleSourceBean.publishOrgChange("SAVE", org.getId());
    }

    public void updateOrg(Organization org){
        orgRepository.save(org);
        
        // update 메시지 발행
        simpleSourceBean.publishOrgChange("UPDATE", org.getId());

    }

    public void deleteOrg(String  orgId){
        orgRepository.deleteById( orgId );
        
        // delete 메시지 발행
        simpleSourceBean.publishOrgChange("DELETE", orgId);
    }
}

  • SimpleSourceBean (Source 구현부)

@Component
public class SimpleSourceBean {
    private Source source;

    private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);

    @Autowired
    public SimpleSourceBean(Source source){
        this.source = source;
    }

    public void publishOrgChange(String action,String orgId){
       logger.debug("Sending Kafka message {} for Organization Id: {}", action, orgId);
        OrganizationChangeModel change =  new OrganizationChangeModel(
                OrganizationChangeModel.class.getTypeName(),
                action,
                orgId,
                UserContext.getCorrelationId());

        source.output().send(MessageBuilder.withPayload(change).build());
    }
}

구성 설정

spring:
  cloud:
    stream:
      bindings: # Spring Cloud Stream의 메시지 브로커 발행 구성의 시작점 (property) 
        output: # 채널 관련 설정 
            destination:  orgChangeTopic    # 메시지큐 or 토픽의 이름
            content-type: application/json  # 메시지 송수신 타입(JSON)
        kafka: # 메시지 플랫폼을 Kafka 사용 -> Spring Cloud Stream에 알림
          binder:
            zkNodes: localhost  # 주키퍼 위치
            brokers: localhost  # 카프카 위치

생각 해볼 점

  • 정확하게 얼마나 많은 데이터를 메시지에 넣어야함?

    • 어플마다 다름. 예제에서는 변경된 조직 ID만 반환함

    • 메시징 데이터 복사본은 절대 넣지 않음

      • 이슈가 많음

      • 비즈니스 로직은 데이터 변화에 민감하며, 데이터가 변경된 사실만을 전파하고, 이용하는 서비스들이 항상 마스터(데이터를 소유한 서비스)로 다시가서 새로운 데이터 사본을 가져오게함

      • 실행 시간은 좀 더 걸릴 수 있으나 항상 최신데이터를 얻을 수 있음

      • 마스터에서 받아 오는 동안 데이터 변화가 있을 수 있으나, 큐에서 받은 (복사본)데이터를 무턱대고 사용하는 것보다 가능성이 낮음

    • 얼마나 많은 데이터를 전달할지 신중히 생각하고 고민해야한다

      • 메시지 큐에 너무 오래 보관되거나 데이터를 포함한 이전 메시지가 실패해서 일관성이 없는 메시지가 전달되면

      • 데이터의 상태 보다는 메시지 상태에 의존하고 있으므로 전달된 데이터는 최신이 아닐 수 있다

    • 메시지의 상태를 전달 하려면, 메시지에 날짜-시간 스탬프 or 버전 번호도 포함시켜 데이터를 소비하는 서비스가 전달된 데이터를 검사하고, 이미 가진데이터 복사본보다 이전것이 아닌지 확인 할 수 있어야한다. (데이터는 비순차적으로 검색될 수 있음, 메시지는 순차적으로 쌓임)

8.3.2 라이선싱 서비스에서 메시지 소비자 작성

라이브러리

  • Spring Cloud Stream -> spring-cloud-stream

  • Kafka Library -> spring-cloud-starter-stream-kafka

소비자 메시지 발행 순서

  1. 메시지 브로커와 바인딩

    • @EnableBinding(Sink.class) -> Application.java 명시

  2. Input 채널

    • @StreamListener(Sink.INPUT) -> 메서드에 명시

      • 수신이 될 때마다, 해당 메서드를 실행하게 됨

  3. 환경설정

    • spring.cloud.stream.bindings : 메시지 브로커 바인딩시작점

    • spring.cloud.stream.bindings.input : input 채널

    • spring.cloud.stream.bindings.input.destination : 토픽이름

    • spring.cloud.stream.bindings.input.content-type : 역직렬화 타입 (JSON -> JavaObject, 기본)

    • spring.cloud.stream.bindings.input.group : 한번만 처리하는 의미를 보장하는데 사용, 메시지를 소비할 소비자 그룹의 이름

      • 동일한 메시지 큐를 수신하는 여러 서비스가 많은 인스터를갖고 있고, 각각의 고유 서비스가 메시지 복사본을 처리하길 원하지만, 서비스 인스턴스 그룹안에서는 한 서비스 인스턴스만 메시지를 사용하고 처리 해야한다

      • group 프로퍼티 : 서비스가 속한 소비자 그룹을 식별

      • 모든 서비스 인스턴스가 동일한 그룹 이름을 가지고 있는한, 스프링 클라우드 스트림과 하부 메시지 브로커는 해당그룹에 속한 인스턴스에 메시지 복사본 하나만 소비할 것을 보장

소비자 그룹에서 서비스 인스턴스 그룹이 메시지를 한 번만 처리할 것을 보장

환경설

  • application.yml

spring:
  cloud:
    stream:
      bindings:
        kafka:
          binder:
            zkNodes: localhost    # 주키퍼 위치
            brokers: localhost    # 메시지 브로커 위치 (여기서는 Kafka)
        input:    # Input 채널
          destination: orgChangeTopic     # Topic 이름
          content-type: application/json  # JSON -> Java Object 역직렬화 타입
          group: licensingGroup           # 1번만 처리한다는 의미를 보장한다는데 사용하게 됨

Application.java

@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker
@EnableBinding(Sink.class)  // 메시지 브로커와 바인딩 -> 수신(소비자측)
public class Application {
    
    private static final Logger logger = LoggerFactory.getLogger(Application.class);
    
    @StreamListener(Sink.INPUT) // 메시지가 입력채널에서 수신될때마다 이메서드를 실행하게됨
    public void loggerSink(OrganizationChangeModel orgChange) {
        logger.debug("Received an event for organization id {}", orgChange.getOrganizationId());
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

8.4 Spring Cloud Stream 분산 캐싱 사례

  • 라이선싱 서비스

    • 조직서비스의 데이터를 필요로함

    • 조직데이터에 대한 분산 레디스 캐시를 항상 확인

    • 조직데이터가 캐시에 있다면, 캐시 반환

    • 캐시에 없다면, 조직서비스를 호출하고 호출 결과를 레디스 해시에 캐싱

8.4.1 레디스로 조회 캐싱

설정

  1. Spring Data Redis 의존성을 포함해서 라이선싱 서비스를 구성

    • jedis

    • commons-pool2

    • spring-data-redis

  2. 레디스에 대한 데이터 베이스 커넥션 설정

    • jedis : 스프링과 레디스 서버 통신 (커넥터)

    • RedisTemplate 객체 생성

  3. 레디스 해시와 통신하는 Spring Data Redis의 레포지토리 클래스 정의

    • RedisTemplate 사용

    • HashOperations : 헬퍼 메서드들이 정의됨 (Spring 제공)

      • 레디스와 모든 통신은 key-value

  4. 레디스와 라이선싱 서비스를 사용해 조직 데이터를 저장하고 읽기

    • 구현체를 통해 기본적인 CRUD

    • 키를 사용해 저장/조회 등

    • 레디스에 작업수행할 HASH_NAME을 같이 알려주어야함

      • HashOperations.put(HASH_NAME, key, value) 형식

      • HashOperations.delete(HASH_NAME, key, value) 형식

8.4.2 사용자 정의 채널 정의

현재까지는 Spring Cloud Stream의 Source, Sink 인터페이스와 함께 기본으로 제공되는 output, input 채널을 사용

둘 이상의 채널을 정의 or 고유한 채널이름 사용

  • 사용자 고유 인터페이스를 정의하고, 애플리케이션이 필요한 만큼 input과 output을 노출하면 됨

  • @Input : 메서드 레벨의 채널이름 정의

    • @Input("chanelName")

  • 인터페이스 정의

    • SubscribableChannel : Input 채널 (라이선싱)

    • MessageChannel : Output 채널 (조직)

public interface CustomChannels {
    @Input("inboundOrgChanges")     // 메서드 레벨 채널이름 정의
    SubscribableChannel orgs();
}
  • Input 채널 설정 수정

    • input 대신 사용자정의(@Input 내용)을 명시

    • Sink.class -> @Input("input") 으로 되어 있음 (기본사용)

spring:
  cloud:
    stream:
      bindings:
        inboundOrgChanges:                # 사용자 정의에 명시한 채널이름을 input 대시 명시
          destination: orgChangeTopic     # Topic 이름
          content-type: application/json  # JSON -> Java Object 역직렬화 타입
          group: licensingGroup           # 1번만 처리한다는 의미를 보장한다는데 사용하게 됨
        kafka:
          binder:
            zkNodes: localhost    # 주키퍼 위치
            brokers: localhost    # 메시지 브로커 위치 (여기서는 Kafka)
        input:    # Input 채널
          destination: orgChangeTopic2     # Topic 이름
          content-type: application/json  # JSON -> Java Object 역직렬화 타입
          group: licensingGroup           # 1번만 처리한다는 의미를 보장한다는데 사용하게 됨
  • 메시지 처리를 하기위한 클래스 정의

    • @EnableBinding(인터페이스) -> 타입에 명시

      • 파라미터는 Sink.class 처럼 CustomChannles(사용자 정의 인터페이스)를 명시

    • @StreamListener("채널이름") -> 메서드에 명시

      • Sink.INPUT 처럼 -> application.yml에 명시했던 내용 명시

@EnableBinding(CustomChannels.class)  // Application.java에 명시한 부분 삭제 -> Sink.class 대신 사용자정의 인터페이스 사용 
public class OrganizationChangeHandler {

  @Autowired
  private OrganizationRedisRepository organizationRedisRepository;

  private static final Logger logger = LoggerFactory.getLogger(OrganizationChangeHandler.class);

  @StreamListener("inboundOrgChanges")  // application.yml에 명시한 내용 일치 (input 대신사용한)
  public void loggerSink(OrganizationChangeModel orgChange) {
    logger.debug("Received a message of type " + orgChange.getType());
    switch (orgChange.getAction()) {
      case "GET":
        logger.debug("Received a GET event from the organization service for organization id {}", orgChange.getOrganizationId());
        break;
      case "SAVE":
        logger.debug("Received a SAVE event from the organization service for organization id {}", orgChange.getOrganizationId());
        break;
      case "UPDATE":
        logger.debug("Received a UPDATE event from the organization service for organization id {}", orgChange.getOrganizationId());
        organizationRedisRepository.deleteOrganization(orgChange.getOrganizationId());
        break;
      case "DELETE":
        logger.debug("Received a DELETE event from the organization service for organization id {}", orgChange.getOrganizationId());
        organizationRedisRepository.deleteOrganization(orgChange.getOrganizationId());
        break;
      default:
        logger.error("Received an UNKNOWN event from the organization service of type {}", orgChange.getType());
        break;
    }
  }
}
  • 이 시점에서 Output 채널인 조직서비스에 대한 추가 수정 내용은없으나 필요하다면, Output 채널도 변경을 하면된다.

  • Update 및 Delete -> 레디스에서 삭제

output 채널 커스텀

  • 인터페이스 정의

    • @Output("채널이름") 명시 -> 설정에 추

public interface CustomSource {
  @Output("outboundSource")
  MessageChannel output();
}
  • 설정

    • 채널 이름 : ouboundSource

spring:
  cloud:
    stream:
      bindings: # Spring Cloud Stream의 메시지 브로커 발행 구성의 시작점 (property)
        output: # 채널 관련 설정
          destination: orgChangeTopic    # 메시지큐 or 토픽의 이름
          content-type: application/json  # 메시지 송수신 타입(JSON)
        kafka: # 메시지 플랫폼을 Kafka 사용 -> Spring Cloud Stream에 알림
          binder:
            zkNodes: localhost  # 주키퍼 위치
            brokers: localhost  # 카프카 위치
        outboundSource:
          destination: orgChangeTopic2    # 메시지큐 or 토픽의 이름
          content-type: application/json  # 메시지 송수신 타입(JSON)
  • 사용 클래스

    • @EnableBinding(타입) -> 바인딩할 타입명시

      • 해당 애노테이션은 자동으로 Bean 등록됨

    • @SendTo("채널이름") -> 채널 -> 토픽(orgChangeTopic2)로 송

@EnableBinding(CustomSource.class)
public class CustomSourceHandler {

  private static final Logger logger = LoggerFactory.getLogger(CustomSourceHandler.class);

  @Autowired
  private CustomSource customSource;

  @SendTo("outboundSource")
  public void outTest(OrganizationChangeModel model) {
    logger.debug("Sending Kafka2 message {} ",model);
    customSource.output().send(MessageBuilder.withPayload(model).build());
  }

}
  • 수신쪽

    • Sink.INPUT -> input 채널에서 처리

    • input 채널에 orgChangeTopic2 토픽으로 설정되어 있음

@StreamListener(Sink.INPUT)
public void loggerSink(OrganizationChangeModel orgChange) {
    logger.debug("Received an event for organization id {}", orgChange.getOrganizationId());
}

Last updated