상황
카프카를 운영하다 보니, 가끔 일부 컨슈머에서 이유를 알 수 없는 지연이 발생할 때가 있었다. 현재 컨슈머에 문제가 생기면 자동으로 재시작되도록 설계되어 있어 큰 문제가 되지 않는다. 그런데 인터널 토픽을 사용하는 스트림에서는 상황이 좀 다르다. 인터널 토픽에서 지연이 발생하면 컨슈머가 재시작되지 않는 문제가 있는 것 같다.
이를 해결하기 위하여 종종 수동으로 statefulset 재시작하였고, 추후 재시작 자동화를 위해서 하기 아래 포스팅과 같이 세팅을 해주었다.
그런데… 기댓값대로 동작하지 않는 이슈가 있었다.
원인
Kafka Metric에서 records-lag와 records-lag-max에 해당하는 값에 대한 로깅을 찍어보니 계속해서 0으로 찍히고 있었다.
즉, lag의 값을 들고오지 못하여서 lag이 증가하는지 여부조차 체크할 수 없는 것이다.
해결 방안
1. Kafka Metric이 제대로 쌓이도록 엑츄에이터 및 프로메테우스 설정을 조정한다.
스프링 엑츄에이터 설정을 찾아 매트릭 값에 대한 조정을 생각해볼수도 있었으나, 그 부분이 오히려 공수가 클 듯 하였고, 만약 기존에 남기지 않던 매트릭을 남긴다면 프로메테우스 서버에 불필요한 매트릭이 쌓이고, 카프카 스트림 어플리케이션의 힙 영역도 해당 매트릭으로 인해 튈수도 있는 등 여러가지 사이드 이펙트를 고려하지 않을 수 없었다.
2. Kafka Consumer를 이용하여 repartition topic의 end offset과 commit offset 정보를 가져와서 Lag을 직접 산출한다.
하기의 코드와 같이 end offset과 commit offset 정보를 가져와서 Lag을 직접 계산하는 방식이다.
private void cacheRepartitionMetrics(KafkaStreams streams) {
cachedRepartitionMetricMap.clear();
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(streams.config().originals());
streams.localThreadsMetadata().forEach(threadMetadata -> {
threadMetadata.activeTasks().forEach(task -> {
task.topicPartitions().forEach(tp -> {
if (isRepartitionTopic(tp.topic())) {
// KafkaConsumer를 사용하여 endOffsets와 committedOffsets를 가져오기
consumer.assign(Collections.singleton(tp));
consumer.seekToEnd(Collections.singleton(tp));
long endOffset = consumer.position(tp);
OffsetAndMetadata committed = consumer.committed(tp);
long committedOffset = (committed != null) ? committed.offset() : 0L;
long lag = endOffset - committedOffset;
// partitionFullName을 규약에 맞게 생성하여 map에 저장
String partitionFullName = getPartitionFullName(tp);
cachedRepartitionMetricMap.put(partitionFullName, (double) lag);
logger.info("Topic: {}, Partition: {}, Lag: {}", tp.topic(), tp.partition(), lag);
consumer.close();
}
});
});
});
}
그러나 이렇게 했을 경우 코드를 봐도 알겠지만, 컨슈머의 어사인과 close가 지속적으로 일어나 오히려 리소스를 잡아먹을 수 있는 단점이 있다. 따라서 고민끝에 기각하였다.
3. Kafka Admin Client API를 이용하여 repartition topic의 end offset과 commit offset 정보를 가져와서 Lag을 직접 산출한다.
consumer가 아닌 Kafka Admin Client API를 이용하여 lag을 직접 산출하였다.
이를 위하여 admin properties 세팅 -> admin client 생성 -> 비즈니스 로직 수정의 단계를 거쳤다.
public Properties getAdminProperties() {
final Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, applicationId + "-admin");
return properties;
}
1. kafka admin client 관련 properties 추가 및 메소드 생성
this.adminClient = AdminClient.create(kafkaConfigStatistics.getAdminProperties());
2. kafka admin client 생성
public void cacheRepartitionMetrics() {
cachedRepartitionMetricMap.clear();
try {
// kafkaConfig에서 application Id를 가져옴
String consumerGroupId = kafkaConfig.getApplicationId();
AdminClient adminClient = statisticsStreamService.getAdminClient();
// consumer group으로 ListConsumerGroupOffsetsResult 세팅
ListConsumerGroupOffsetsResult offsetsResult = adminClient.listConsumerGroupOffsets(consumerGroupId);
Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets = offsetsResult.partitionsToOffsetAndMetadata().get();
Map<TopicPartition, OffsetSpec> requestLatestOffsets = consumerGroupOffsets.keySet().stream()
.collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
ListOffsetsResult listOffsetsResult = adminClient.listOffsets(requestLatestOffsets);
Map<TopicPartition, ListOffsetsResultInfo> endOffsetsInfo = listOffsetsResult.all().get();
// ListOffsetsResultInfo에서 오프셋 값 추출
Map<TopicPartition, Long> endOffsets = endOffsetsInfo.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset()));
// repartition 토픽의 렉 계산
consumerGroupOffsets.forEach((tp, offsetAndMetadata) -> {
if (isRepartitionTopic(tp.topic())) {
long committedOffset = offsetAndMetadata.offset();
long endOffset = endOffsets.get(tp);
long lag = endOffset - committedOffset;
// cachedRepartitionMetricMap에 적재
String partitionFullName = getPartitionFullName(tp);
cachedRepartitionMetricMap.put(partitionFullName, (double) lag);
logger.info("partition full name = {}", partitionFullName);
logger.info("Topic: {}, Partition: {}, Lag: {}", tp.topic(), tp.partition(), lag);
}
});
} catch (InterruptedException | ExecutionException e) {
logger.error("Error fetching offsets from AdminClient", e);
}
}
3. 비즈니스 로직 수정
참고 자료
'트러블 슈팅 > 인프라' 카테고리의 다른 글
(ES) 엘라스틱 서치 용량 부족으로 인한 미적재 이슈 (0) | 2024.10.06 |
---|---|
(Redis) 레디스 클러스터를 사용하는 어플리케이션의 커넥션 타임아웃 에러 (0) | 2024.06.17 |
(Kafka) Kafka Streams에서 Repartition Topic을 Consume하지 못하는 장애 (feat: Metric으로 Lag값 추출) (0) | 2024.05.18 |
(Kafka) 유효하지 않은 레코드가 토픽에 들어와 카프카 어플리케이션 장애가 났을 때 (2) | 2024.05.08 |
(Airflow) 배치 날짜값 오차로 인한 통계값 오류가 발생한 이슈 2 (8) | 2024.05.01 |