# 8.스프링 클라우드 스트림을 사용한 이벤트 드리븐 아키텍처 (EDA)

## 레디스 설치 (후반부 캐시 부분에서 사용)

```bash
$ docker run --name redis -p 6379:6379 redis
```

## 카프카 설치&#x20;

* 도커기반, 깃허브 (비공식 표준)

```bash
$ git clone https://github.com/wurstmeister/kafka-docker
$ cd kafka-docker
```

* 내장된 주키퍼 사용
  * docker-compose-single-broker.yml 파일 확인 및 수정할 부분 수정

```yaml
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    build: .
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
```

* 실행 (아래는 싱글브로커, 운영중에는 멀티 운영필요)

```java
$ docker-compose -f docker-compose-single-broker.yml up
```

* 테스트

```bash
# 테스를 위해 kafka 테스트용 다운 
$ wget http://mirror.navercorp.com/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
$ tar -zxvf kafka_2.12-2.6.0.tgz
$ cd kafka_2.12-2.6.0

# test_topic 이란 토픽 생성
$ bin/kafka-topics.sh --create --topic test_topic --zookeeper localhost:2181 --partitions 1 --replication-factor 1

# 토픽 생성 확인
$ bin/kafka-topics.sh --zookeeper localhost:2181 --list

# 결과 (외, 토픽 리스트가 출력됨)
test_topic
```

## 이벤트기반 아키텍처

* 동기식이거나 선형적이지 않고, 제한적인 요청-응답 모델도 아니라, 바로 끊임 없이 메시지를 주고 받는 **메시지 기반**
* **메시지를 사용해 상태변화를 표현 -> 이벤트로 통신한다는 개념**
* 스프링 클라우드 스트림  + Kafka 활용

### 8.1 메시지와 EDA, 마이크로 서비스 사례

* 라이선싱과 조직 서비스 운영중&#x20;
* 배포 후 라이선싱 -> 조직 서비스의 읽기 시간이 다소 느림 감지
  * 조직서비스의 경우, 변경이 드물고 조직레코드의 기본키로 조직 서비스에서 데이터를 읽어오고 있음
  * 데이터 엑세스 비용을 들이지 않고, 조직 데이터의 읽기를 **캐싱 할 수 있다면 응답시간을 크게 향상 시킬 수 있음(핵심)**

#### **캐싱 3가지 핵심 요구사항**

1. **캐싱된 데이터**는 라이선싱 서비스의 **모든 인스턴스에 일관성 유지**
   * **동일한 조직 데이터 읽기가 보장 되어야 함을 의미**
   * **데이터를 라이선싱 서비스 안에 로컬 해싱해선 안된다는 것을 의미**
     * **즉, 레디스** 같은 메모리 디비 활용하여 **동기화(싱크작업) 필요**
2. 라이선싱 서비스를 **호스팅하는 컨테이너 메모리에 캐싱 하면안됨**
   * 서비스를 호스팅하는 런타임 컨테이너는 종종 크기 제약 존재
   * 다양한 액세스 패턴으로 데이터를 액세스 함
   * **로컬캐시는 클러스터 내 다른 모든 서비스와 동기화를 보장 해야함, 복잡성도 증가함 -> 분리해야지?**
3. 업데이트나 삭제 연산으로 **조직 레코드 변경 될 때** 라이선싱 서비스는 **조직서비스의 상태 변화를 인식 해야함**
   * 이런 구성으로 라이선싱 서비스가 캐싱된 특정 조직 데이터를 무효화하고 삭제할수 있게 함(분리)

#### 2가지 접근법

1. **동기식 요청-응답 모델**
   * 조직상태 변경 -> 라이선싱과 조직서비스는 REST 엔드포인트를 이용해서로 통신 (A<->B) 다이렉트 방식
2. **조직서비스 변경에 대한 비동기 이벤트(메시지)발송**
   * 조직서비스는 자신 변경상태에(삭제,업데이트,추가 등) 대한 메시지를 발행하고, **큐에 발행**(publish, kafka)
   * 이후 라이선싱 서비스는 **중개자**(메시지 브로커,kafka)에게서 수신

#### 8.1.1 동기식 요청-응답 방식으로 상태 전달

![전통적인 캐싱 솔루션 구축 방안](https://1162853276-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-M1ciKJvsGgeAKQBeHlJ%2F-MGGoER0u0OmIJqlsJuX%2F-MGGsRhX5j-bmrs40Ui4%2Fimage.png?alt=media\&token=b7b7038b-1c90-4c4c-823f-341c3fa41683)

* 일반적으로 캐싱 데이터를 저장하기 위해 key-value 저장소 데이터인 레디스를 활용

#### 실행 순서

1. 캐싱데이터 읽기 (레디스, 조직데이터)
2. 데이터가 없다면 ? 조직서비스에 데이터 요청
3. 조직서비스의 변겨사항이 내포된 데이터 응답
   1. 변경에 대해 직접 응답하고 -> **캐시 데이터 레디스에 갱신**-> 요청자에 응답
   2. 변경에 대해 레디스에 반영 -> 알림

#### 문제점

* **서비스간 직접 통신(직접 연결된 경우). 강한 결합**&#x20;
  * 데이터 조회를 위해 **조직서비스에 항상 의존**&#x20;
  * **캐싱 무효화**
    * **직접 조직서비스가 레디스 서버에 접근하여 캐시 데이터 삭제 및 갱신**
      * 이 경우, 라이선싱 서비스가 소유하고 있는 레디스 서버에 직접 접근하기 때문에 그 자체로 문제. 다른 서비스가 소유하는 데이터 저장소와 통신하게 되므로!! (**MSA에서 금기사항**)
    * **라이선싱 서비스가 오픈한 무효화 용도의 End Point를 호출하여 갱신**
* **​쉽게 깨지는 서비스 관계**
  * 캐시 변경을 무효화 하는 라이선싱 서비스의 엔드포인트 변경 -> 조직서비스도 따라서 변경되어야 하는 경우. (종속성 발생)
  * 라이선싱 서비스가 다운 or 느려지면 -> 조직 서비스와 **직접 통신 하므로 영향**을 받게 됨
  * 또한, 두 서비스는 레디스와 직접 연결되어 있으므로 **레디스서버가 다운**되면 두서비스가 영향 받는다
* **조직 서비스 변경에 관심 있는 새 소비자를 추가할 때 경직성**
  * 서비스를 추가할 때, **조직 서비스의 코드가 변경되고 재배포 해야하는 상황 발생**
  * **거미줄 모양의 의존성 패턴 발생 -> SPOF(단일장애지점) 발생의 원인**

**8.1.2 메시징을 사용해 서비스간 상태변화 전달 (핵심)**

![메시징을 사용해 서비스간 상태변화 전달](https://1162853276-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-M1ciKJvsGgeAKQBeHlJ%2F-MGH4GQRqWEmB-da0-KQ%2F-MGHLUtmXPgPnCl-vG2O%2Fimage.png?alt=media\&token=70dbaa79-17bf-4bba-a9fc-2a4fe115c37f)

#### 특징

* **느슨한 결합**
  * **메시지큐를 중개자**로 하여 두서비스간의 관심사가 분리된다
  * 조직서비스는 **큐에 상태변화를 큐에 발행**하고
  * 라이선싱 서비스는 큐에 **발행된 메시지를 확인하여 수신**
  * 서로의 **연결고리를 분리함으로써 영향도를 제거**
* **내구성**
  * **라이선싱 서비스가 다운되더라도 조직서비스에 영향을 미치지 않게 됨**
  * 따라서, 조직서비스는 **계속 변경사항을 큐에 발행이 가능**하여 큐에 쌓아둠
  * 라이선싱이 재시작 되었을때, **큐에 발행된 정보를 확인하여 캐싱무효화 작업 및 데이터 처리를 수행**할 수 있게 됨
  * 반대로 조직서비스가 다운되더라도, **캐시(레디스)에 데이터가 존재하므로 서비스가 유지** 된다
* **확장성**
  * 소비자의 속도가 느리다면 소비자의 스레드 증가 -> 문제는 CPU 종속
  * 생산자의 속도가 느리다면 생산자의 스레드 증가 -> 문제는 CPU 종속
  * **큐 -> 수평확장이 가능하기 때문에 CPU등 걱정이 필요없**
* **유연성**
  * **메시지 발신자는 누가 소비할지 모른다**
  * 발신 서비스에 영향을 주지 않고 새로운 기능을 애플리케이션에 추가가 가능
  * 새로운 기능 -> 발행되는 이벤트를 수신 해서 적절히 대응

#### 단점

* 메시지처리의 의미론 (message handling semantics)
  * 생산/소비 외 그이상의 지식이 필요
    * **메시지의 소비 순서 기반으로 어떻게 동작할지 이해 필**
    * **순서대로 처리되지 않을때 어떤 일이 발생할지 이해 필요**
  * **한 고객의 모든주문을 순서대로 처리해야 한다는 엄격한 요구 사항**
    * 모든 메시지를 서로 독립적으로 소비하는 방식
    * 다르게 처리하도록 설정하고 구성&#x20;
  * 메시지가 에러나면?&#x20;
    * 재시도할것인가?
* 메시지 가시성 (message visibility)
* 메시지 코레오그래(무용술??)

### 8.2 Spring Cloud Stream

#### 메시징은 복잡하지만 매우 강력하다

* 애플리케이션에 메시지 **발행자/소비자를** 쉽게 구축할 수 있는 **애노테이션 기반 프레임워크**
* 메시징 플랫폼의 **구현 세부 사항을 추상화**
* **Kafka, RabbitMQ** 포함하여 사용 가능
* 특정 플랫폼을 위한 세부구현 -> **애플리케이션 코드와 분리됨-> 메시지 발행/소비 구현은 플랫폼 중립적인 `스프링 인터페이스`로 수행**

#### 8.2.1 Spring Cloud Stream 아키텍처

![](https://1162853276-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-M1ciKJvsGgeAKQBeHlJ%2F-MGMacvp0LJjUNGKD0Ku%2F-MGMc8zJnGLWKUhD06fN%2Fimage.png?alt=media\&token=3ce09bc5-bb01-43f9-8ec6-1e0734cc5847)

* 한 서비스가 **발행자(publisher)** / 한 서비스가 **소비자(consumer)**
* 실행순서는 1(로직실행후 데이터 전달) -> 2(메시지 발행, 소스->채널->바인더) -> 3(큐) -> 4(메시지 수신, 바인더->채널->소스) -> 5 (로직실행에 사용)

### 8.3 간단한 메시지 생산자와 소비자 작성

#### 스프링 클라우드에서 메시징기반을 위한 4개 컴포넌트&#x20;

* **소스**
  * 서비스가 메시지 발행 준비가 되면, **소스**를 사용해 **메시지를 발행**
  * publisher&#x20;
  * POJO를 전달 받는 **애노테이션 (@Source)**
  * 메시지 수신 -> **직렬화(기본 JSON) 하여 채널로 송신**
* **채널**
  * 메시지 생산자와 소비자가 메시지를 **발행/소비 후 메시지를 보관할 큐를 추상화**
  * **소스 부분과 싱크 부분** 위치에 존재
  * **채널이름**
    * 대상 큐의 이름과 관련 있으나 코드에서 **큐 이름을 직접 사용하지 않고** **채널이름을 사용하게 됨**
  * 따라서 **채널이** 읽거나 쓰는 **큐를 전환 하려면** 애플리케이션 코드가 아닌 **구성정보를 변경**
* **바인더**
  * Spring Cloud Stream의 일부인 **스프링 코드로** **특정 메시지 플랫폼(Kafka, RabbitMQ)등 과 통신**
  * 메시지 발행/소비 위해 **별도의 라이브러리(Kafka,RabbitMQ 등 연계)를 제공하지 않고도** **메시징 사용 가능**
* **싱크**

  * 큐에서 **메시지를 수신**
  * **채널에서 수신대기 -> 싱크로 전달**
  * **메시지를 POJO로 역직렬화 (JSON->Java Object) 디폴트**

#### 8.3.1 조직 서비스의 메시지 생산자 작성

![생산자 측 프로세스](https://1162853276-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-M1ciKJvsGgeAKQBeHlJ%2F-MGMcicJQK4fZMRShGBE%2F-MGMfeBNtzCXbWVEOiMc%2Fimage.png?alt=media\&token=c1079177-7ed4-4e07-8940-270a77db3307)

#### 조직서비스의 데이터 변경(추가,삭제,갱신) 발생 -> 큐에 변경알림 발행

#### 라이브러리&#x20;

* Spring Cloud Stream -> **spring-cloud-stream**
* Kafka Library -> **spring-cloud-starter-stream-kafka**

#### 생산자 메시지 발행 순서

1. **메시지 브로커 바인딩**
   * **@EnableBinding(Source.class)**  애노테이션 추가
   * 이 애노테이션은 **Spring Cloud Stream**에 **애플리케이션을 메시지 브로커(Kafka)와 바인딩**하라고 알림
2. **Source 클래스**
   * 정의된 **채널**들을 이용 **메시지 브로커와 통신**
   * Spring Cloud Stream은 **메시지 브로커와 통신할 수 있는 기본채널이 존재**
   * **단일 채널에만 발행시 편리함**
3. **메시지 발행 (서비스 로직)**
   * 아래 코드에서 메시지 전달 POJO`OrganizationChangeModel`
   * **Object -> JSON으로 직렬화**
     * 이외 **XML** 및 **아파치 아브로 포맷**을 포함한 다양한 포맷 직렬화가 가능
4. **채널**
   * 메시지 토픽 -> 채널로 매핑
   * 자바 인터페이스로 정의 -> 채널에서 Source 인터페이스를 사용
   * ouput() 메서드 -> `MessageChannel 반환`
   * `MessageChannel` ->  메시지 브로커에 **메시지를 전달하는 방법정의**
   * **MessageBuilder**
     * Spring에서 제공하는 헬퍼 클래스
     * **Object -> Message 로 변경해줌**
     * **send(Message.class)  시그니처임**
5. **환경설정**
   * Spring Cloud Stream의 **Source -> 메시지 브로커, 토픽 매핑**

#### 소스 부분

* Application

```java
@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker
@EnableBinding(Source.class)
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
```

* Controller

```java
@RestController
@RequestMapping(value="v1/organizations")
public class OrganizationServiceController {

    @Autowired
    private OrganizationService orgService;

    @RequestMapping(value="/{organizationId}",method = RequestMethod.GET)
    public Organization getOrganization( @PathVariable("organizationId") String organizationId) {
        Organization org = orgService.getOrg(organizationId);
        org.setContactName(org.getContactName());
        return org;
    }

    @RequestMapping(value="/{organizationId}",method = RequestMethod.PUT)
    public void updateOrganization( @PathVariable("organizationId") String orgId, @RequestBody Organization org) {
        orgService.updateOrg( org );
    }

    @RequestMapping(value="/{organizationId}",method = RequestMethod.POST)
    public void saveOrganization(@RequestBody Organization org) {
       orgService.saveOrg( org );
    }

    @RequestMapping(value="/{organizationId}",method = RequestMethod.DELETE)
    @ResponseStatus(HttpStatus.NO_CONTENT)
    public void deleteOrganization( @PathVariable("organizationId") String orgId) {
        orgService.deleteOrg( orgId );
    }
}
```

�

* Service

```java
@Service
public class OrganizationService {

    @Autowired
    private OrganizationRepository orgRepository;

    @Autowired
    private SimpleSourceBean simpleSourceBean;

    public Organization getOrg(String organizationId) {
        return orgRepository.findById(organizationId).get();
    }

    public void saveOrg(Organization org){
        org.setId( UUID.randomUUID().toString());

        orgRepository.save(org);
        
        // save 메시지 발행
        simpleSourceBean.publishOrgChange("SAVE", org.getId());
    }

    public void updateOrg(Organization org){
        orgRepository.save(org);
        
        // update 메시지 발행
        simpleSourceBean.publishOrgChange("UPDATE", org.getId());

    }

    public void deleteOrg(String  orgId){
        orgRepository.deleteById( orgId );
        
        // delete 메시지 발행
        simpleSourceBean.publishOrgChange("DELETE", orgId);
    }
}
```

�

* SimpleSourceBean (Source 구현부)�

```java
@Component
public class SimpleSourceBean {
    private Source source;

    private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);

    @Autowired
    public SimpleSourceBean(Source source){
        this.source = source;
    }

    public void publishOrgChange(String action,String orgId){
       logger.debug("Sending Kafka message {} for Organization Id: {}", action, orgId);
        OrganizationChangeModel change =  new OrganizationChangeModel(
                OrganizationChangeModel.class.getTypeName(),
                action,
                orgId,
                UserContext.getCorrelationId());

        source.output().send(MessageBuilder.withPayload(change).build());
    }
}
```

#### 구성 설정&#x20;

```yaml
spring:
  cloud:
    stream:
      bindings: # Spring Cloud Stream의 메시지 브로커 발행 구성의 시작점 (property) 
        output: # 채널 관련 설정 
            destination:  orgChangeTopic    # 메시지큐 or 토픽의 이름
            content-type: application/json  # 메시지 송수신 타입(JSON)
        kafka: # 메시지 플랫폼을 Kafka 사용 -> Spring Cloud Stream에 알림
          binder:
            zkNodes: localhost  # 주키퍼 위치
            brokers: localhost  # 카프카 위치
```

####

#### 생각 해볼 점

* 정확하게 얼마나 많은 데이터를 메시지에 넣어야함?
  * 어플마다 다름. 예제에서는 **변경된 조직 ID만 반환함**
  * **메시징 데이터 복사본은 절대 넣지 않음**
    * **이슈가 많음**
    * 비즈니스 로직은 데이터 변화에 민감하며, 데이터가 변경된 사실만을 전파하고,  이용하는 서비스들이 항상 마스터(데이터를 소유한 서비스)로 다시가서 새로운 데이터 사본을 가져오게함
    * 실행 시간은 좀 더 걸릴 수 있으나 항상 최신데이터를 얻을 수 있음
    * 마스터에서 받아 오는 동안 데이터 변화가 있을 수 있으나, 큐에서 받은 (복사본)데이터를 무턱대고 사용하는 것보다 가능성이 낮음
  * **얼마나 많은 데이터를 전달할지 신중히 생각하고 고민**해야한다
    * 메시지 큐에 너무 오래 보관되거나 데이터를 포함한 이전 메시지가 실패해서 일관성이 없는 메시지가 전달되면
    * 데이터의 상태 보다는 **메시지 상태에 의존하고 있으므로 전달된 데이터는 최신이 아닐 수 있다**
  * **메시지의 상태를 전달 하려면, 메시지에 날짜-시간 스탬프 or 버전 번호도 포함시켜 데이터를 소비하는 서비스가 전달된 데이터를 검사하고, 이미 가진데이터 복사본보다 이전것이 아닌지 확인 할 수 있어야한다. (데이터는 비순차적으로 검색될 수 있음, 메시지는 순차적으로 쌓임)**

#### **8.3.2 라이선싱 서비스에서 메시지 소비자 작성**

![소비자 측 프로세스](https://1162853276-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-M1ciKJvsGgeAKQBeHlJ%2F-MGO7Vz1gq3f4lHTN9yi%2F-MGOD-hjTx7bhTexI88g%2Fimage.png?alt=media\&token=212ec984-16bd-425e-b702-551fa2165930)

#### 라이브러리&#x20;

* Spring Cloud Stream -> spring-cloud-stream
* Kafka Library -> spring-cloud-starter-stream-kafka

#### 소비자 메시지 발행 순서

1. **메시지 브로커와 바인딩**
   * **@EnableBinding(Sink.class)** -> Application.java 명시&#x20;
2. Input 채널
   * @StreamListener(Sink.INPUT) -> 메서드에 명시
     * 수신이 될 때마다, 해당 메서드를 실행하게 됨
3. 환경설정&#x20;
   * `spring.cloud.stream.bindings` :  메시지 브로커 바인딩시작점
   * `spring.cloud.stream.bindings.input` : input 채널 &#x20;
   * `spring.cloud.stream.bindings.input.destination` :  토픽이름
   * `spring.cloud.stream.bindings.input.content-type` : 역직렬화 타입 (JSON -> JavaObject, 기본)
   * `spring.cloud.stream.bindings.input.group` : 한번만 처리하는 의미를 보장하는데 사용, **메시지를 소비할 소비자 그룹의 이름**
     * 동일한 메시지 큐를 수신하는 여러 서비스가 많은 인스터를갖고 있고, 각각의 고유 서비스가 메시지 복사본을 처리하길 원하지만, 서비스 인스턴스 그룹안에서는 한 서비스 인스턴스만 메시지를 사용하고 처리 해야한다&#x20;
     * group 프로퍼티 : 서비스가 속한 소비자 그룹을 식별
     * 모든 서비스 인스턴스가 **동일한 그룹 이름을 가지고 있는한, 스프링 클라우드 스트림과 하부 메시지 브로커는 해당그룹에 속한 인스턴스에 메시지 복사본 하나만 소비할 것을 보장**

#### **소비자 그룹에서 서비스 인스턴스 그룹이 메시지를 한 번만 처리할 것을 보장**�

![](https://1162853276-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-M1ciKJvsGgeAKQBeHlJ%2F-MGQVBJSFCgzje5swbpj%2F-MGQ_UQ3jDxi8hW-IaS_%2Fimage.png?alt=media\&token=c8433aca-d76d-4ec6-b84c-ab0e698e6837)

#### 환경설

* application.yml

```yaml
spring:
  cloud:
    stream:
      bindings:
        kafka:
          binder:
            zkNodes: localhost    # 주키퍼 위치
            brokers: localhost    # 메시지 브로커 위치 (여기서는 Kafka)
        input:    # Input 채널
          destination: orgChangeTopic     # Topic 이름
          content-type: application/json  # JSON -> Java Object 역직렬화 타입
          group: licensingGroup           # 1번만 처리한다는 의미를 보장한다는데 사용하게 됨
```

#### Application.java

```java
@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker
@EnableBinding(Sink.class)  // 메시지 브로커와 바인딩 -> 수신(소비자측)
public class Application {
    
    private static final Logger logger = LoggerFactory.getLogger(Application.class);
    
    @StreamListener(Sink.INPUT) // 메시지가 입력채널에서 수신될때마다 이메서드를 실행하게됨
    public void loggerSink(OrganizationChangeModel orgChange) {
        logger.debug("Received an event for organization id {}", orgChange.getOrganizationId());
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
```

### 8.4 Spring Cloud Stream 분산 캐싱 사례

* 라이선싱 서비스&#x20;
  * **조직서비스의 데이터를 필요**로함
  * 조직데이터에 대한 **분산 레디스 캐시를 항상 확인**함&#x20;
  * 조직데이터가 **캐시에 있다면**, **캐시 반환**
  * **캐시에 없다면**, 조직서비스를 호출하고 호출 결과를 **레디스 해시에 캐싱**

#### 8.4.1 레디스로 조회 캐싱

#### 설정

1. **Spring Data Redis 의존성**을 포함해서 라이선싱 서비스를 구성
   * jedis
   * commons-pool2
   * spring-data-redis&#x20;
2. **레디스에 대한 데이터 베이스 커넥션 설정**
   * **jedis** : 스프링과 레디스 서버 통신 (**커넥터**)
   * **RedisTemplate** 객체 생성&#x20;
3. **레디스 해시와 통신하는 Spring Data Redis의 레포지토리 클래스 정의**
   * RedisTemplate 사용
   * **HashOperations** : 헬퍼 메서드들이 정의됨 (Spring 제공)&#x20;
     * 레디스와 모든 통신은 key-value&#x20;
4. **레디스와 라이선싱 서비스를 사용해 조직 데이터를 저장하고 읽기**
   * 구현체를 통해 기본적인 CRUD
   * 키를 사용해 저장/조회 등
   * 레디스에 작업수행할 HASH\_NAME을 같이 알려주어야함
     * **HashOperations.put(HASH\_NAME, key, value) 형식**
     * **HashOperations.delete(HASH\_NAME, key, value) 형식**

#### 8.4.2 사용자 정의 채널 정의

현재까지는 Spring Cloud Stream의 **Source, Sink** 인터페이스와 함께 기본으로 제공되는 **output, input 채널을 사용**

**둘 이상의 채널을 정의 or 고유한 채널이름 사용**

* 사용자 고유 **인터페이스를 정의하고,** 애플리케이션이 **필요한 만큼 input과 output**을 노출하면 됨
* `@Input : 메서드 레벨의 채널이름 정의`
  * `@Input("chanelName")`
* **인터페이스 정의**
  * **SubscribableChannel** : Input 채널 (라이선싱)
  * **MessageChannel :** Output 채널 (조직)&#x20;

```java
public interface CustomChannels {
    @Input("inboundOrgChanges")     // 메서드 레벨 채널이름 정의
    SubscribableChannel orgs();
}
```

* **Input 채널 설정 수정**&#x20;
  * input 대신 사용자정&#xC758;**(@Input 내용**)을 명시
  * Sink.class -> @Input("input") 으로 되어 있음 (기본사용)

```yaml
spring:
  cloud:
    stream:
      bindings:
        inboundOrgChanges:                # 사용자 정의에 명시한 채널이름을 input 대시 명시
          destination: orgChangeTopic     # Topic 이름
          content-type: application/json  # JSON -> Java Object 역직렬화 타입
          group: licensingGroup           # 1번만 처리한다는 의미를 보장한다는데 사용하게 됨
        kafka:
          binder:
            zkNodes: localhost    # 주키퍼 위치
            brokers: localhost    # 메시지 브로커 위치 (여기서는 Kafka)
        input:    # Input 채널
          destination: orgChangeTopic2     # Topic 이름
          content-type: application/json  # JSON -> Java Object 역직렬화 타입
          group: licensingGroup           # 1번만 처리한다는 의미를 보장한다는데 사용하게 됨�
```

* **메시지 처리를 하기위한 클래스 정의**&#x20;
  * **@EnableBinding(인터페이스) -> `타입`에 명시**
    * 파라미터는 Sink.class 처럼 **CustomChannles**(사용자 정의 인터페이스)를 명시
  * **@StreamListener("채널이름") -> `메서드`에 명시**
    * Sink.INPUT 처럼 -> **application.yml에 명시했던 내용 명시**

```java
@EnableBinding(CustomChannels.class)  // Application.java에 명시한 부분 삭제 -> Sink.class 대신 사용자정의 인터페이스 사용 
public class OrganizationChangeHandler {

  @Autowired
  private OrganizationRedisRepository organizationRedisRepository;

  private static final Logger logger = LoggerFactory.getLogger(OrganizationChangeHandler.class);

  @StreamListener("inboundOrgChanges")  // application.yml에 명시한 내용 일치 (input 대신사용한)
  public void loggerSink(OrganizationChangeModel orgChange) {
    logger.debug("Received a message of type " + orgChange.getType());
    switch (orgChange.getAction()) {
      case "GET":
        logger.debug("Received a GET event from the organization service for organization id {}", orgChange.getOrganizationId());
        break;
      case "SAVE":
        logger.debug("Received a SAVE event from the organization service for organization id {}", orgChange.getOrganizationId());
        break;
      case "UPDATE":
        logger.debug("Received a UPDATE event from the organization service for organization id {}", orgChange.getOrganizationId());
        organizationRedisRepository.deleteOrganization(orgChange.getOrganizationId());
        break;
      case "DELETE":
        logger.debug("Received a DELETE event from the organization service for organization id {}", orgChange.getOrganizationId());
        organizationRedisRepository.deleteOrganization(orgChange.getOrganizationId());
        break;
      default:
        logger.error("Received an UNKNOWN event from the organization service of type {}", orgChange.getType());
        break;
    }
  }
}

```

* 이 시점에서 Output 채널인 조직서비스에 대한 추가 수정 내용은없으나 필요하다면, Output 채널도 변경을 하면된다.
* Update 및 Delete -> 레디스에서 삭제

#### output 채널 커스텀

* **인터페이스 정의**
  * **@Output("채널이름") 명시 -> 설정에 추**

```java
public interface CustomSource {
  @Output("outboundSource")
  MessageChannel output();
}
```

* **설정**&#x20;
  * **채널 이름 : ouboundSource**

```yaml
spring:
  cloud:
    stream:
      bindings: # Spring Cloud Stream의 메시지 브로커 발행 구성의 시작점 (property)
        output: # 채널 관련 설정
          destination: orgChangeTopic    # 메시지큐 or 토픽의 이름
          content-type: application/json  # 메시지 송수신 타입(JSON)
        kafka: # 메시지 플랫폼을 Kafka 사용 -> Spring Cloud Stream에 알림
          binder:
            zkNodes: localhost  # 주키퍼 위치
            brokers: localhost  # 카프카 위치
        outboundSource:
          destination: orgChangeTopic2    # 메시지큐 or 토픽의 이름
          content-type: application/json  # 메시지 송수신 타입(JSON)
```

* **사용 클래스**
  * **@EnableBinding**(타입) -> 바인딩할 타입명시
    * **해당 애노테이션은 자동으로 Bean 등록됨**
  * **@SendTo("채널이름") -> 채널 -> 토픽(orgChangeTopic2)로 송**

```java
@EnableBinding(CustomSource.class)
public class CustomSourceHandler {

  private static final Logger logger = LoggerFactory.getLogger(CustomSourceHandler.class);

  @Autowired
  private CustomSource customSource;

  @SendTo("outboundSource")
  public void outTest(OrganizationChangeModel model) {
    logger.debug("Sending Kafka2 message {} ",model);
    customSource.output().send(MessageBuilder.withPayload(model).build());
  }

}
```

* 수신쪽&#x20;
  * **Sink.INPUT -> input 채널**에서 처리
  * input 채널에 **orgChangeTopic2 토픽**으로 설정되어 있음

```java
@StreamListener(Sink.INPUT)
public void loggerSink(OrganizationChangeModel orgChange) {
    logger.debug("Received an event for organization id {}", orgChange.getOrganizationId());
}
```

�
