# 4. Java API를 이용하여 애플리케이션 만들기

## Java API를 사용하여 애플리케이션 만들기

### 개발환경 준비

* 1.8 이상
* 개발환경 호스트명 -> `dev`로 할 예정
* Maven 프로젝트 구성
* 컨플루언트 플랫폼을 이용했으므로, 리포지토리 -> 컨플루언트

  ```markup
  <repositories> 
    <repository>
      <id>confluent</id>
      <url>https://packages.confluent.io/maven/</url> 
    </repository> 
  </repositories>
  ```
* kafka 라이브러리 설정

  ```markup
  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>2.0.0-cp1</version>
    </dependency>
  </dependencies>
  ```
* Producer Java

  ```java
  public class FirstAppProducer {
    private static String topicName = "first-app";

    public static void main(String[] args) {
      // 1. KafkaProducer에 필요한 설정
      Properties properties = new Properties();
      properties.setProperty("bootstrap.servers", "kafka:9092");  // 브로커로 보낼 <호스트명:포트번호>
      properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
      properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

      // 2. 카프카 클러스터에 메시지를 송신(produce)하는 객체 생성
      Producer<Integer, String> producer = new KafkaProducer<>(properties);

      int key;
      String value;

      for (int i = 1; i <= 100; i++) {
        key = i;
        value = String.valueOf(i);

        // 3. 송신 메시지 생성
        ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, key, value);

        // 4. 메시지 송신 후 Ack 받았을 때, 실행할 작업 (CallBack메서드) 등록
        producer.send(record, (recordMetadata, e) -> {
          if (recordMetadata != null) {
            // 송신에 성공한 경우 처리
            String infoString = String.format("Success partition:%d, offset:%d", recordMetadata.partition(), recordMetadata.offset());
            System.out.println("infoString = " + infoString);
          } else {
            System.err.println(String.format("Fail : %s", e.getMessage()));
          }
        });
      }
      // 5. KafkaProducer 닫고 종료
      producer.close();
    }
  }
  ```

  ```bash
  # package -> src.main.java.io.ssosso.*
  # jar 파일은 maven 패키징으로 만들어진 jar ~with-dependencies.jar (Fat JAR)
  $ java -cp ~/first-app/kafak-test-1.0-SNAPSHOT-jar-with-dependencies.jar io.ssosso.chapter4.FirstAppProducer
  ```
* Consumer Java

  ```java
  public class FirstAppConsumer {

    private static String topicName = "first-app";

    public static void main(String[] args) {
      // 1. KafkaConsumer 필요한 설정
      Properties properties = new Properties();
      properties.setProperty("bootstrap.servers", "kafka:9092");
      properties.setProperty("group.id", "FirstAppConsumerGroup");
      properties.setProperty("enable.auto.commit", "false");
      properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
      properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

      // 2. 카프카 클러스터에서 메시지를 수신 하는 객체 생성
      Consumer<Integer, String> consumer = new KafkaConsumer<>(properties);

      // 3. 구독(subscribe) 하는 Topic 등록
      consumer.subscribe(Collections.singletonList(topicName));

      for (int i = 0; i < 300; i++) {

        // 4. 메시지를 수신하여 콘솔에 표시
        // 수신 메시지의 형에 맞춰서 형 맞추기
        ConsumerRecords<Integer, String> records = consumer.poll(1); //  1개씩 땡겨감
        for (ConsumerRecord<Integer, String> record : records) {
          String msgString = String.format("key : %d, value: %s", record.key(), record.value());
          System.out.println(msgString);

          // 5. 처리가 완료된 메시지의 오프셋을 커밋
          TopicPartition tp = new TopicPartition(record.topic(), record.partition());
          OffsetAndMetadata metadata = new OffsetAndMetadata(record.offset() + 1);
          final Map<TopicPartition, OffsetAndMetadata> commitInfo = Collections.singletonMap(tp, metadata);
          consumer.commitSync(commitInfo);
        }
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      // 6. KafkaConsumer를 닫고 종
      consumer.close();
    }
  }
  ```

  `Maven -> 패키징` : \~jar-with-dependencies.jar 배포

  ```bash
  $ java -cp ~/firstapp-1.0-SNAPSHOT-jar-with-dependencies.jar io.ssosso.chapter4.FirstAppConsumer
  ```

**동작확인**

프로듀서 실행`(FirstAppProducer)` -> 컨슈머 실행`(FirstAppConsumer)`

```bash
# 프로듀서
$ java -cp ~/first-app/kafak-test-1.0-SNAPSHOT-jar-with-dependencies.jar io.ssosso.chapter4.FirstAppProducer

# 컨슈머
$ java -cp ~/first-app/kafka-test-1.0-SNAPSHOT-jar-with-dependencies.jar io.ssosso.chapter4.FirstAppConsumer
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://ssosso.gitbook.io/study/messagequeue/kafka/4.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
