4. Java API를 이용하여 애플리케이션 만들기
Java API를 사용하여 애플리케이션 만들기
개발환경 준비
1.8 이상
개발환경 호스트명 ->
dev
로 할 예정Maven 프로젝트 구성
컨플루언트 플랫폼을 이용했으므로, 리포지토리 -> 컨플루언트
<repositories> <repository> <id>confluent</id> <url>https://packages.confluent.io/maven/</url> </repository> </repositories>
kafka 라이브러리 설정
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>2.0.0-cp1</version> </dependency> </dependencies>
Producer Java
public class FirstAppProducer { private static String topicName = "first-app"; public static void main(String[] args) { // 1. KafkaProducer에 필요한 설정 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "kafka:9092"); // 브로커로 보낼 <호스트명:포트번호> properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 카프카 클러스터에 메시지를 송신(produce)하는 객체 생성 Producer<Integer, String> producer = new KafkaProducer<>(properties); int key; String value; for (int i = 1; i <= 100; i++) { key = i; value = String.valueOf(i); // 3. 송신 메시지 생성 ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, key, value); // 4. 메시지 송신 후 Ack 받았을 때, 실행할 작업 (CallBack메서드) 등록 producer.send(record, (recordMetadata, e) -> { if (recordMetadata != null) { // 송신에 성공한 경우 처리 String infoString = String.format("Success partition:%d, offset:%d", recordMetadata.partition(), recordMetadata.offset()); System.out.println("infoString = " + infoString); } else { System.err.println(String.format("Fail : %s", e.getMessage())); } }); } // 5. KafkaProducer 닫고 종료 producer.close(); } }
# package -> src.main.java.io.ssosso.* # jar 파일은 maven 패키징으로 만들어진 jar ~with-dependencies.jar (Fat JAR) $ java -cp ~/first-app/kafak-test-1.0-SNAPSHOT-jar-with-dependencies.jar io.ssosso.chapter4.FirstAppProducer
Consumer Java
public class FirstAppConsumer { private static String topicName = "first-app"; public static void main(String[] args) { // 1. KafkaConsumer 필요한 설정 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "kafka:9092"); properties.setProperty("group.id", "FirstAppConsumerGroup"); properties.setProperty("enable.auto.commit", "false"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 2. 카프카 클러스터에서 메시지를 수신 하는 객체 생성 Consumer<Integer, String> consumer = new KafkaConsumer<>(properties); // 3. 구독(subscribe) 하는 Topic 등록 consumer.subscribe(Collections.singletonList(topicName)); for (int i = 0; i < 300; i++) { // 4. 메시지를 수신하여 콘솔에 표시 // 수신 메시지의 형에 맞춰서 형 맞추기 ConsumerRecords<Integer, String> records = consumer.poll(1); // 1개씩 땡겨감 for (ConsumerRecord<Integer, String> record : records) { String msgString = String.format("key : %d, value: %s", record.key(), record.value()); System.out.println(msgString); // 5. 처리가 완료된 메시지의 오프셋을 커밋 TopicPartition tp = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata metadata = new OffsetAndMetadata(record.offset() + 1); final Map<TopicPartition, OffsetAndMetadata> commitInfo = Collections.singletonMap(tp, metadata); consumer.commitSync(commitInfo); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } // 6. KafkaConsumer를 닫고 종 consumer.close(); } }
Maven -> 패키징
: ~jar-with-dependencies.jar 배포$ java -cp ~/firstapp-1.0-SNAPSHOT-jar-with-dependencies.jar io.ssosso.chapter4.FirstAppConsumer
동작확인
프로듀서 실행(FirstAppProducer)
-> 컨슈머 실행(FirstAppConsumer)
# 프로듀서
$ java -cp ~/first-app/kafak-test-1.0-SNAPSHOT-jar-with-dependencies.jar io.ssosso.chapter4.FirstAppProducer
# 컨슈머
$ java -cp ~/first-app/kafka-test-1.0-SNAPSHOT-jar-with-dependencies.jar io.ssosso.chapter4.FirstAppConsumer
Last updated
Was this helpful?