Java 기반 어플리케이션으로 kafka를 적용할때 사용하는 라이브러리는
spring kafka vs kafka clients 2가지가 있다.
spring-kafka는 kafka-clients보다 레퍼런스가 풍부하며, 고수준의 라이브러리로 사용자가 사용하기 편리한 장점이 있다.
kafka clients는 저수준의 라이브러리로 사용자가 직접 Configuration을 해줘야하며,
spring-kafka : springboot 기반으로 편리하게 사용가능
spring-kafka가 Spring 기반으로 미리 등록된 빈으로 활용하여 환경설정이나 Consumer, Producer 기능을 비교적 쉽게 구현할 수 있다. 아래는 Consumer, Producer 환경 설정의 예시 코드 이다.
① pom.xml에 spring-kafka 라이브러리 추가
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
② application.yml 설정
spring:
main:
web-application-type: servlet
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: log
template:
default-topic: test-data
③ Consumer Configuration 설정
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaReceiveConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String consumer_groupid;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer_groupid);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
④ Producer Configuration 설정
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaSenderConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
다만, spring-kafka라이브러리 사용시 Consumer, Producer 둘 중의 하나의 기능만 구현하고 싶어도 Consumer, Producer 의 환경설정을 모두 해줘야한다는 불편함이 있다. Producer 하나만 환경 설정하고, 어플리케이션 실행시 아래와 같이 Consumer.properties에 대한 파일을 열심히 찾다가 없어서 에러를 출력한다.
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.4.RELEASE)
2020-05-21 10:24:21.021 [restartedMain] INFO n.d.p.DataProcessorApplication : Starting DataProcessorApplication on DESKTOP-000 with PID 15742 (D:\log\target\classes started by log in D:\log)
2020-05-21 10:24:21.024 [restartedMain] INFO n.d.p.DataProcessorApplication : No active profile set, falling back to default profiles: default
2020-05-21 10:24:21.089 [restartedMain] INFO o.s.b.d.e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2020-05-21 10:24:21.089 [restartedMain] INFO o.s.b.d.e.DevToolsPropertyDefaultsPostProcessor : For additional web related logging consider setting the 'logging.level.web' property to 'DEBUG'
2020-05-21 10:24:21.900 [restartedMain] INFO o.s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data Reactive Cassandra repositories in DEFAULT mode.
2020-05-21 10:24:21.921 [restartedMain] INFO o.s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 15ms. Found 0 Reactive Cassandra repository interfaces.
2020-05-21 10:24:21.925 [restartedMain] INFO o.s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data Cassandra repositories in DEFAULT mode.
2020-05-21 10:24:21.930 [restartedMain] INFO o.s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 4ms. Found 0 Cassandra repository interfaces.
2020-05-21 10:24:22.157 [restartedMain] INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$f6c4d4d7] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-05-21 10:24:22.360 [restartedMain] INFO org.xnio : XNIO version 3.3.8.Final
2020-05-21 10:24:22.381 [restartedMain] INFO org.xnio.nio : XNIO NIO Implementation Version 3.3.8.Final
2020-05-21 10:24:22.521 [restartedMain] INFO io.undertow.servlet : Initializing Spring embedded WebApplicationContext
2020-05-21 10:24:22.521 [restartedMain] INFO o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 1431 ms
2020-05-21 10:24:24.381 [restartedMain] INFO o.s.s.c.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-05-21 10:24:24.775 [restartedMain] WARN o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaListenerContainerFactory' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory]: Factory method 'kafkaListenerContainerFactory' threw exception; nested exception is java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/springframework/kafka/listener/ConsumerProperties
2020-05-21 10:24:24.779 [restartedMain] INFO o.s.s.c.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
2020-05-21 10:24:26.812 [restartedMain] INFO o.s.b.a.l.ConditionEvaluationReportLoggingListener :
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2020-05-21 10:24:26.820 [restartedMain] ERROR o.s.boot.SpringApplication : Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaListenerContainerFactory' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory]: Factory method 'kafkaListenerContainerFactory' threw exception; nested exception is java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/springframework/kafka/listener/ConsumerProperties
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:656)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:636)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1338)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1177)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:879)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
at net.doople.processor.DataProcessorApplication.main(DataProcessorApplication.java:10)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory]: Factory method 'kafkaListenerContainerFactory' threw exception; nested exception is java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/springframework/kafka/listener/ConsumerProperties
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:651)
... 24 common frames omitted
Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/springframework/kafka/listener/ConsumerProperties
at org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer.configureContainer(ConcurrentKafkaListenerContainerFactoryConfigurer.java:170)
at org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer.configure(ConcurrentKafkaListenerContainerFactoryConfigurer.java:146)
at org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration.kafkaListenerContainerFactory(KafkaAnnotationDrivenConfiguration.java:117)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
... 25 common frames omitted
Caused by: java.lang.NoClassDefFoundError: org/springframework/kafka/listener/ConsumerProperties
... 33 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.springframework.kafka.listener.ConsumerProperties
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 33 common frames omitted
kafka-clients : 사용자가 원하는대로 사용 가능
이런점에서는 kafka-clients는 사용자가 직접 환경설정을 지정해주기 때문에 Consumer, Producer 하나만 셋팅하고 구현해도 문제 없이 구현된다는 점이 편리하다.
① pom.xml 파일에 kafka-clients 라이브러리 적용
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
② application.yml 기본 설정
spring:
main:
web-application-type: servlet
kafka:
bootstrap-servers: localhost:9092
template:
default-topic: test-data
# consumer:
# group-id: foo
③ Producer Configuration 설정
package net.doople.consumer.service;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.Properties;
@Service
public class KafkaSenderService {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSenderService.class);
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.template.default-topic}")
private String topic;
private KafkaProducer<String, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(getProperties());
}
public void sendAsync(String key, String message) {
long start = System.currentTimeMillis();
final Producer<String, String> producer = createProducer();
final ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, message);
producer.send(record, (metadata, exception) -> {
long end = System.currentTimeMillis();
if (metadata != null) {
System.out.printf("sent record(key=%s value=%s) " +
"meta(partition=%d, offset=%d) time=%d\n",
record.key(), record.value(), metadata.partition(),
metadata.offset(), end-start);
} else {
LOGGER.error("KafkaCallBack - Exception", exception.getCause());
exception.printStackTrace();
}
});
}
public void createTopic(String topic, int numberOfPartitions, int replicationFactor){
LOGGER.info("Create topic {} ", topic);
try(AdminClient adminClient = AdminClient.create(getProperties())){
NewTopic newTopic = new NewTopic(topic, numberOfPartitions, (short) replicationFactor);
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
}catch (Exception e){
e.printStackTrace();
LOGGER.error("Create Topic {} failed, {}", topic, e.getMessage() );
}
}
}
Bean으로 관리해주는 Spring-Kafka은 어플리케이션 실행시점에 카프카와 연동이 정상적인지 바로 확인이 가능하다. kafka-clients는 kafka 관련 메소드(메세지 전송,수신 기능)를 불러와 실행하고 연동하기 때문에, 메세지를 전송, 수신하는 시점에서야 연동이 되었는지 로그로 확인할 수 있다.
Spring for Apache Kafka , kafka-clients 버전 호환
spring-kaka, kafka-clients 의 버전에 대한 정보이다.
참고 사이트
https://www.baeldung.com/spring-kafka
'Programming > Kafka' 카테고리의 다른 글
[kafka] default partition replication-factor option (0) | 2020.05.12 |
---|---|
[zookeeper] zookeeper 실행시 session request 무한 루프 (0) | 2020.05.06 |
[kafka] docker kafka image (1) | 2020.04.29 |