4. Java API를 이용하여 애플리케이션 만들기

Java API를 사용하여 애플리케이션 만들기

개발환경 준비

  • 1.8 이상

  • 개발환경 호스트명 -> dev로 할 예정

  • Maven 프로젝트 구성

  • 컨플루언트 플랫폼을 이용했으므로, 리포지토리 -> 컨플루언트

    <repositories> 
      <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url> 
      </repository> 
    </repositories>
  • kafka 라이브러리 설정

    <dependencies>
      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>2.0.0-cp1</version>
      </dependency>
    </dependencies>
  • Producer Java

    public class FirstAppProducer {
      private static String topicName = "first-app";
    
      public static void main(String[] args) {
        // 1. KafkaProducer에 필요한 설정
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "kafka:9092");  // 브로커로 보낼 <호스트명:포트번호>
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
        // 2. 카프카 클러스터에 메시지를 송신(produce)하는 객체 생성
        Producer<Integer, String> producer = new KafkaProducer<>(properties);
    
        int key;
        String value;
    
        for (int i = 1; i <= 100; i++) {
          key = i;
          value = String.valueOf(i);
    
          // 3. 송신 메시지 생성
          ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, key, value);
    
          // 4. 메시지 송신 후 Ack 받았을 때, 실행할 작업 (CallBack메서드) 등록
          producer.send(record, (recordMetadata, e) -> {
            if (recordMetadata != null) {
              // 송신에 성공한 경우 처리
              String infoString = String.format("Success partition:%d, offset:%d", recordMetadata.partition(), recordMetadata.offset());
              System.out.println("infoString = " + infoString);
            } else {
              System.err.println(String.format("Fail : %s", e.getMessage()));
            }
          });
        }
        // 5. KafkaProducer 닫고 종료
        producer.close();
      }
    }
    # package -> src.main.java.io.ssosso.*
    # jar 파일은 maven 패키징으로 만들어진 jar ~with-dependencies.jar (Fat JAR)
    $ java -cp ~/first-app/kafak-test-1.0-SNAPSHOT-jar-with-dependencies.jar io.ssosso.chapter4.FirstAppProducer
  • Consumer Java

    public class FirstAppConsumer {
    
      private static String topicName = "first-app";
    
      public static void main(String[] args) {
        // 1. KafkaConsumer 필요한 설정
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "kafka:9092");
        properties.setProperty("group.id", "FirstAppConsumerGroup");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
        // 2. 카프카 클러스터에서 메시지를 수신 하는 객체 생성
        Consumer<Integer, String> consumer = new KafkaConsumer<>(properties);
    
        // 3. 구독(subscribe) 하는 Topic 등록
        consumer.subscribe(Collections.singletonList(topicName));
    
        for (int i = 0; i < 300; i++) {
    
          // 4. 메시지를 수신하여 콘솔에 표시
          // 수신 메시지의 형에 맞춰서 형 맞추기
          ConsumerRecords<Integer, String> records = consumer.poll(1); //  1개씩 땡겨감
          for (ConsumerRecord<Integer, String> record : records) {
            String msgString = String.format("key : %d, value: %s", record.key(), record.value());
            System.out.println(msgString);
    
            // 5. 처리가 완료된 메시지의 오프셋을 커밋
            TopicPartition tp = new TopicPartition(record.topic(), record.partition());
            OffsetAndMetadata metadata = new OffsetAndMetadata(record.offset() + 1);
            final Map<TopicPartition, OffsetAndMetadata> commitInfo = Collections.singletonMap(tp, metadata);
            consumer.commitSync(commitInfo);
          }
          try {
            Thread.sleep(1000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
        // 6. KafkaConsumer를 닫고 종
        consumer.close();
      }
    }

    Maven -> 패키징 : ~jar-with-dependencies.jar 배포

    $ java -cp ~/firstapp-1.0-SNAPSHOT-jar-with-dependencies.jar io.ssosso.chapter4.FirstAppConsumer

동작확인

프로듀서 실행(FirstAppProducer) -> 컨슈머 실행(FirstAppConsumer)

# 프로듀서
$ java -cp ~/first-app/kafak-test-1.0-SNAPSHOT-jar-with-dependencies.jar io.ssosso.chapter4.FirstAppProducer

# 컨슈머
$ java -cp ~/first-app/kafka-test-1.0-SNAPSHOT-jar-with-dependencies.jar io.ssosso.chapter4.FirstAppConsumer

Last updated