반응형
저번 포스팅에서 로컬 피시에 카프카를 설치 했다면 이번 포스팅에서는 springboot에서 직접 활용할수 있도록 셋팅을 해볼것이다.
실제로 개발을 하게 된다면 kafka서버는 어느 pod나 container 에 설치가 되어서 해당 서버와 토픽 주제만 알고 있다면,
이번 포스팅이 더욱 도움이 될것이다.
설치에 대해서는 아래 참고 포스팅을 참고 바란다.
먼저 카프카를 호출할 controller 를 만들었다.
@Slf4j @CrossOrigin("*") @RestController @RequestMapping(value = "/kafka/test") public class KafkaController { private final KafkaProducer producer; @Autowired KafkaController(KafkaProducer producer) { this.producer = producer; } @PostMapping(value = "/message") public String sendMessage(@RequestParam("message") String message) { this.producer.sendMessage(message); return "success"; } } |
sendMessage 메서드를 호출하게 되면 producer에 있는 sendMessage 가 호출이 된다.
그럼 producer를 살펴보자
@Service @Slf4j public class KafkaProducer { @Value(value = "${message.topic.name}") private String topicName; private final KafkaTemplate<String, String> kafkaTemplate; @Autowired public KafkaProducer(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(String message) { System.out.println(String.format("Produce message : %s", message)); CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message); // future.thenAccept(null) future.thenAccept(sCallback ->{ log.info("Callback partition:{}, offset:{}",sCallback.getRecordMetadata().partition(),sCallback.getRecordMetadata().offset()); // sCallback.getRecordMetadata().offset(); // sCallback.getRecordMetadata().partition(); System.out.println("callback 입니다."); }); } } |
프로듀서는 호출이 되면 현재 메세지를 topicname 으로 보내기 위한 KafkaTemplate를 사용하여 메세지를 send 한다.
그리고 콜백에 관련해서는 아래와 같이 성공 후 메세지를 출력하게 설정할수도 있다.
그리고 받는쪽인 consumer 를 보면 아래와 같다.
@Service public class KafkaConsumer { @KafkaListener(topics = "${message.topic.name}", groupId = ConsumerConfig.GROUP_ID_CONFIG) public void consume(String message) throws IOException { System.out.println(String.format("Consumed message : %s", message)); } } |
간단하다
kafkaListener 를 설정으로 달아서 항상 카프카를 주시하고 있고 설정한 topics으로 메세지가 들어오면 해당 메서드를 호출하게 된다.
그럼 controller 를 실행하게 되면 로그는 아래와 같다.
Produce message : WELCOME 2023-06-03T00:57:20.357+09:00 INFO 38765 --- [ad | producer-1] com.sungjun.kafka.KafkaProducer : Callback partition:0, offset:3 callback 입니다. Consumed message : WELCOME |
참고 포스팅
https://thenicesj.tistory.com/585
반응형
'IT > Java' 카테고리의 다른 글
자바에서 Null 체크 관련(if) (44) | 2023.06.09 |
---|---|
[Jackson] JsonNode, ObjectNode, ArrayNode (35) | 2023.06.06 |
custom annotation 활용 (51) | 2023.06.02 |
SPRINGBOOT 에서 등록된 Bean 추출 (54) | 2023.06.01 |
mybatis 에서 dto camel case 적용 (59) | 2023.05.29 |
댓글