반응형
Spring Boot에서 Kafka 소비자 활성화/비활성화 제어
스프링 부트에서 여러 카프카 소비자를 구성했습니다.kafka.properties는 다음과 같습니다(여기에는 한 소비자에 대한 구성만 나열됨).
kafka.topics=
bootstrap.servers=
group.id=
enable.auto.commit=
auto.commit.interval.ms=
session.timeout.ms=
schema.registry.url=
auto.offset.reset=
kafka.enabled=
다음은 구성입니다.
@Configuration
@PropertySource({"classpath:kafka.properties"})
public class KafkaConsumerConfig {
@Autowired
private Environment env;
@Bean
public ConsumerFactory<String, String> pindropConsumerFactory() {
Map<String, Object> dataRiverProps = new HashMap<>();
dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));
dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));
dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));
dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));
return new DefaultKafkaConsumerFactory<>(dataRiverProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(pindropConsumerFactory());
return factory;
}
}
그리고 이것은 소비자입니다.
@Component
public class KafkaConsumer {
@Autowired
private MessageProcessor messageProcessor;
@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
public void consumeJson(String message) {
// processing message
}
}
이 소비자의 작성 또는 메시지 검색을 제어할 수 있도록 "kafka.enabled"라는 프롭을 사용할 수 있는 방법이 있습니까?감사합니다!
아래와 같이 소비자에서 자동 시작(true/false) 속성을 사용하여 이를 수행할 수 있습니다.
@KafkaListener(id = "foo", topics = "Topic1", groupId = "group_id",
containerFactory = "kafkaListenerContainerFactory",autoStartup = "${listen.auto.start:false}")
public void consume(String message) {
//System.out.println("Consumed message: " + message);
}
Kafka 구성을 비활성화하려면 다음과 같이 합니다.
Kafka ConsumerConfig 주석 달기
@ConditionalOnProperty(value = "kafka.enabled", matchIfMissing = true)
제거한다.
@Component
에KafkaConsumer
클래스를 지정하고 @빈 인으로 정의합니다.KafkaConsumerConfig
.
KafkaConsumer에서 메시지 검색을 제어하려면 다음과 같이 하십시오.
Kafka Consumer 내부의 자산 가치만 확인할 수 있습니다.
@Value("kafka.enabled") private Boolean enabled;
그런 다음 간단한 if 방법으로 주석을 달면 사용합니다.
@KafkaListener
.
언급URL : https://stackoverflow.com/questions/54426658/control-enabling-disabling-kafka-consumers-in-spring-boot
반응형
'IT' 카테고리의 다른 글
숫자를 6 PHP로 나눌 수 있는지 확인하기 (0) | 2023.08.26 |
---|---|
Oracle current_timestamp를 초 단위로 변환 (0) | 2023.08.26 |
CSS로 입력 및 제출 버튼을 스타일화하는 방법은 무엇입니까? (0) | 2023.08.26 |
일치하는 와일드카드가 엄격하지만 'tx:notation-driven' 요소에 대한 선언을 찾을 수 없습니다. (0) | 2023.08.26 |
Numpy를 사용하여 Python에서 TIFF(가져오기, 내보내기) 작업 (0) | 2023.08.26 |