본문 바로가기
IT/Java

kafka 실습 (springboot에서 활용)

by 성준하이 2023. 6. 5.
반응형

저번 포스팅에서 로컬 피시에 카프카를 설치 했다면 이번 포스팅에서는 springboot에서 직접 활용할수 있도록 셋팅을 해볼것이다.

 

실제로 개발을 하게 된다면 kafka서버는 어느 pod나 container 에 설치가 되어서 해당 서버와 토픽 주제만 알고 있다면,

이번 포스팅이 더욱 도움이 될것이다.

 

설치에 대해서는 아래 참고 포스팅을 참고 바란다.

 

먼저 카프카를 호출할 controller 를 만들었다.

@Slf4j
@CrossOrigin("*")
@RestController
@RequestMapping(value = "/kafka/test")
public class KafkaController {
private final KafkaProducer producer;


    @Autowired
    KafkaController(KafkaProducer producer) {
        this.producer = producer;
    }


    @PostMapping(value = "/message")
    public String sendMessage(@RequestParam("message") String message) {
        this.producer.sendMessage(message);
        return "success";
    }
}

 

sendMessage 메서드를 호출하게 되면 producer에 있는 sendMessage 가 호출이 된다.

그럼 producer를 살펴보자

@Service
@Slf4j
public class KafkaProducer {

@Value(value = "${message.topic.name}")
private String topicName;

private final KafkaTemplate<String, String> kafkaTemplate;


    @Autowired
    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }


    public void sendMessage(String message) {
        System.out.println(String.format("Produce message : %s", message));
        CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
        
//        future.thenAccept(null)
        future.thenAccept(sCallback ->{
        log.info("Callback partition:{}, offset:{}",sCallback.getRecordMetadata().partition(),sCallback.getRecordMetadata().offset());
//        sCallback.getRecordMetadata().offset();
//        sCallback.getRecordMetadata().partition();
        System.out.println("callback 입니다.");
        });
    }
    
}

프로듀서는 호출이 되면 현재 메세지를 topicname 으로 보내기 위한 KafkaTemplate를 사용하여 메세지를 send 한다.

그리고 콜백에 관련해서는 아래와 같이 성공 후 메세지를 출력하게 설정할수도 있다.

 

그리고 받는쪽인 consumer 를 보면 아래와 같다.

@Service
public class KafkaConsumer {


@KafkaListener(topics = "${message.topic.name}", groupId = ConsumerConfig.GROUP_ID_CONFIG)
    public void consume(String message) throws IOException {
        System.out.println(String.format("Consumed message : %s", message));
    }
}

간단하다

kafkaListener 를 설정으로 달아서 항상 카프카를 주시하고 있고 설정한 topics으로 메세지가 들어오면 해당 메서드를 호출하게 된다.

 

그럼 controller 를 실행하게 되면 로그는 아래와 같다.

Produce message : WELCOME
2023-06-03T00:57:20.357+09:00  INFO 38765 --- [ad | producer-1] com.sungjun.kafka.KafkaProducer          : Callback partition:0, offset:3
callback 입니다.
Consumed message : WELCOME

참고 포스팅

https://thenicesj.tistory.com/585

 

kafka 실습 (설치)

이전 포스팅에서 kafka 에 대한 내용을 다룬적이 있다. 자세한 내용은 아래 참고 포스팅 참고 바란다. 이번 포스팅에서는 피시에 카프카를 설치를 하고 직접 springboot 에서 테스트 해보는 코드를

thenicesj.tistory.com

 

반응형

'IT > Java' 카테고리의 다른 글

자바에서 Null 체크 관련(if)  (44) 2023.06.09
[Jackson] JsonNode, ObjectNode, ArrayNode  (35) 2023.06.06
custom annotation 활용  (51) 2023.06.02
SPRINGBOOT 에서 등록된 Bean 추출  (54) 2023.06.01
mybatis 에서 dto camel case 적용  (59) 2023.05.29

댓글