Programming/Kafka

[kafka] spring kafka vs kafka clients

bisi 2020. 5. 6. 17:44

Java 기반 어플리케이션으로 kafka를 적용할때 사용하는 라이브러리는

spring kafka vs kafka clients 2가지가 있다.

 

spring-kafka는 kafka-clients보다 레퍼런스가 풍부하며, 고수준의 라이브러리로 사용자가 사용하기 편리한 장점이 있다. 

kafka clients는 저수준의 라이브러리로 사용자가 직접 Configuration을 해줘야하며, 

 

spring-kafka : springboot 기반으로 편리하게 사용가능

spring-kafka가 Spring 기반으로 미리 등록된 빈으로 활용하여 환경설정이나 Consumer, Producer 기능을 비교적 쉽게 구현할 수 있다. 아래는 Consumer, Producer 환경 설정의 예시 코드 이다.

 

① pom.xml에 spring-kafka 라이브러리 추가 

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.2.7.RELEASE</version>
</dependency>

 

② application.yml 설정

spring:
  main:
    web-application-type: servlet  
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: log
    template:
      default-topic: test-data  

 

③ Consumer Configuration 설정 

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaReceiveConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String consumer_groupid;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer_groupid);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

④ Producer Configuration 설정

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaSenderConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

 

다만, spring-kafka라이브러리 사용시 Consumer, Producer 둘 중의 하나의 기능만 구현하고 싶어도 Consumer, Producer  환경설정을 모두 해줘야한다는 불편함이 있다.  Producer 하나만 환경 설정하고, 어플리케이션 실행시 아래와 같이 Consumer.properties에 대한 파일을 열심히 찾다가 없어서 에러를 출력한다. 

 

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.2.4.RELEASE)

2020-05-21 10:24:21.021 [restartedMain] INFO  n.d.p.DataProcessorApplication : Starting DataProcessorApplication on DESKTOP-000 with PID 15742 (D:\log\target\classes started by log in D:\log)
2020-05-21 10:24:21.024 [restartedMain] INFO  n.d.p.DataProcessorApplication : No active profile set, falling back to default profiles: default
2020-05-21 10:24:21.089 [restartedMain] INFO  o.s.b.d.e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2020-05-21 10:24:21.089 [restartedMain] INFO  o.s.b.d.e.DevToolsPropertyDefaultsPostProcessor : For additional web related logging consider setting the 'logging.level.web' property to 'DEBUG'
2020-05-21 10:24:21.900 [restartedMain] INFO  o.s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data Reactive Cassandra repositories in DEFAULT mode.
2020-05-21 10:24:21.921 [restartedMain] INFO  o.s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 15ms. Found 0 Reactive Cassandra repository interfaces.
2020-05-21 10:24:21.925 [restartedMain] INFO  o.s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data Cassandra repositories in DEFAULT mode.
2020-05-21 10:24:21.930 [restartedMain] INFO  o.s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 4ms. Found 0 Cassandra repository interfaces.
2020-05-21 10:24:22.157 [restartedMain] INFO  o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$f6c4d4d7] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-05-21 10:24:22.360 [restartedMain] INFO  org.xnio : XNIO version 3.3.8.Final
2020-05-21 10:24:22.381 [restartedMain] INFO  org.xnio.nio : XNIO NIO Implementation Version 3.3.8.Final
2020-05-21 10:24:22.521 [restartedMain] INFO  io.undertow.servlet : Initializing Spring embedded WebApplicationContext
2020-05-21 10:24:22.521 [restartedMain] INFO  o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 1431 ms
2020-05-21 10:24:24.381 [restartedMain] INFO  o.s.s.c.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-05-21 10:24:24.775 [restartedMain] WARN  o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaListenerContainerFactory' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory]: Factory method 'kafkaListenerContainerFactory' threw exception; nested exception is java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/springframework/kafka/listener/ConsumerProperties
2020-05-21 10:24:24.779 [restartedMain] INFO  o.s.s.c.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
2020-05-21 10:24:26.812 [restartedMain] INFO  o.s.b.a.l.ConditionEvaluationReportLoggingListener : 

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2020-05-21 10:24:26.820 [restartedMain] ERROR o.s.boot.SpringApplication : Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaListenerContainerFactory' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory]: Factory method 'kafkaListenerContainerFactory' threw exception; nested exception is java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/springframework/kafka/listener/ConsumerProperties
	at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:656)
	at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:636)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1338)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1177)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517)
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323)
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321)
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202)
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:879)
	at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550)
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
	at net.doople.processor.DataProcessorApplication.main(DataProcessorApplication.java:10)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory]: Factory method 'kafkaListenerContainerFactory' threw exception; nested exception is java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/springframework/kafka/listener/ConsumerProperties
	at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
	at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:651)
	... 24 common frames omitted
Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/springframework/kafka/listener/ConsumerProperties
	at org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer.configureContainer(ConcurrentKafkaListenerContainerFactoryConfigurer.java:170)
	at org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer.configure(ConcurrentKafkaListenerContainerFactoryConfigurer.java:146)
	at org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration.kafkaListenerContainerFactory(KafkaAnnotationDrivenConfiguration.java:117)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
	... 25 common frames omitted
Caused by: java.lang.NoClassDefFoundError: org/springframework/kafka/listener/ConsumerProperties
	... 33 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.springframework.kafka.listener.ConsumerProperties
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 33 common frames omitted

 

kafka-clients : 사용자가 원하는대로 사용 가능

 

이런점에서는 kafka-clients는 사용자가 직접 환경설정을 지정해주기 때문에 Consumer, Producer 하나만 셋팅하고 구현해도 문제 없이 구현된다는 점이 편리하다.

 

① pom.xml 파일에 kafka-clients 라이브러리 적용

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

 

② application.yml 기본 설정

spring:
  main:
    web-application-type: servlet
  kafka:
    bootstrap-servers: localhost:9092
    template:
      default-topic: test-data
#    consumer:
#      group-id: foo

 

③ Producer Configuration 설정

package net.doople.consumer.service;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.Properties;

@Service
public class KafkaSenderService {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSenderService.class);

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.template.default-topic}")
    private String topic;

    private KafkaProducer<String, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(getProperties());
    }

    public void sendAsync(String key, String message) {
        long start = System.currentTimeMillis();
        final Producer<String, String> producer = createProducer();

        final ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, message);

        producer.send(record, (metadata, exception) -> {
            long end = System.currentTimeMillis();
            if (metadata != null) {
                System.out.printf("sent record(key=%s value=%s) " +
                                "meta(partition=%d, offset=%d) time=%d\n",
                        record.key(), record.value(), metadata.partition(),
                        metadata.offset(), end-start);
            } else {
                LOGGER.error("KafkaCallBack - Exception", exception.getCause());
                exception.printStackTrace();
            }
        });
    }

    public void createTopic(String topic, int numberOfPartitions, int replicationFactor){
        LOGGER.info("Create topic {} ", topic);
        try(AdminClient adminClient = AdminClient.create(getProperties())){
            NewTopic newTopic = new NewTopic(topic, numberOfPartitions, (short) replicationFactor);
            adminClient.createTopics(Collections.singleton(newTopic)).all().get();

        }catch (Exception e){
            e.printStackTrace();
            LOGGER.error("Create Topic {} failed, {}", topic, e.getMessage() );
        }

    }


}

 

Bean으로 관리해주는 Spring-Kafka은 어플리케이션 실행시점에 카프카와 연동이 정상적인지 바로 확인이 가능하다.  kafka-clients는 kafka 관련 메소드(메세지 전송,수신 기능)를 불러와 실행하고 연동하기 때문에, 메세지를 전송, 수신하는 시점에서야 연동이 되었는지 로그로 확인할 수 있다.

 

Spring for Apache Kafka , kafka-clients 버전 호환 

spring-kaka, kafka-clients 의 버전에 대한 정보이다. 

 

 

 

 

참고 사이트 

https://www.baeldung.com/spring-kafka

https://docs.spring.io/spring-kafka/reference/html/

https://spring.io/projects/spring-kafka