상황
카프카를 운영하다 보니, 가끔 일부 컨슈머에서 이유를 알 수 없는 지연이 발생할 때가 있었다. 현재 컨슈머에 문제가 생기면 자동으로 재시작되도록 설계되어 있어 큰 문제가 되지 않는다. 그런데 인터널 토픽을 사용하는 스트림에서는 상황이 좀 다르다. 인터널 토픽에서 지연이 발생하면 컨슈머가 재시작되지 않는 문제가 있는 것 같다.
카프카 Streams에서는 내부적으로 Repartition Topic을 사용하는데 이 중 repartition Topic에서 장애가 발생한 것이다.
이를 해결하기 위하여 종종 수동으로 statefulset 재시작하였다.
원인
우선 저수준에서 일어나는 일을 분석하여 보았다.
컨슈머 지연(Lag)의 근본적인 원인은 컨슈머 스레드와 파티션 매핑이 꼬여 발생하는 것으로 판단되었다.
n-1회차 리밸런싱: 특정 파티션이 A 컨슈머 스레드에 매핑됨.
n회차 리밸런싱: 특정 파티션이 B 컨슈머 스레드에 매핑됨.
이후 Consumer Lag 조회: 특정 파티션이 여전히 A 스레드에 매핑되어 있는 문제가 나타남.
n회차 리밸런싱에서 매핑된 다른 파티션과 스레드는 제대로 매핑되어 있는 것을 확인할 수 있다.
해결 방안
우선, 리스타트 liveness로 재시작 하는 조건은 러프하게 잡았을 시 자칫 무한 재기동하는 불상사가 발생할 수 있다. 따라서 하기와 같이 세팅하였다.
리스타트 조건
- 스트림의 state가 'running'으로 변경된 시점부터 20분이 지나야 함
- 스트림 부팅 후 20분 아닌, 스트림 상태 'running' 변경 후 20분 설정 이유는 다음과 같음.
- 스트림 부팅 후 시간만 기준으로 하면 리밸런싱 2회 이상 동시에 발생시 리스타트 발생 가능.
- 실제 장애 상황 로그 분석 결과, 이런 상황 충분히 발생 가능함 확인.
- 리밸런싱이 1회만 발생하더라도 10분 이상 리밸런싱을 하게되면 가능성이 있음. (리밸런싱 직후, 1~4 조건 모두 만족하게 됨)
- 따라서 조건의 값을 갱신함.
- 스트림 부팅 후 시간만 기준으로 하면 리밸런싱 2회 이상 동시에 발생시 리스타트 발생 가능.
- 스트림 부팅 후 20분 아닌, 스트림 상태 'running' 변경 후 20분 설정 이유는 다음과 같음.
- 10분 동안 lag이 지속적으로 증가해야 함.
- 코드가 복잡해짐에도 불구하고 lag의 증가하는 값(분)을 조건으로 잡은 이유
- 장애 상황의 핵심은 특정 파티션이 레코드를 전혀 소비하지 못하는 것임. 따라서 10분 동안 lag이 계속 쌓이는지 추적하는 것 중요함.
- 코드가 복잡해짐에도 불구하고 lag의 증가하는 값(분)을 조건으로 잡은 이유
- repartition 토픽의 max partition lag 값이 10K 이상이어야 함
- repartition 토픽의 합이 아니라 max 값 기준을 사용한 이유
- Repartition 토픽에서 행이 걸리면 여러 파티션에 lag이 생기는 것이 아니라 특정 파티션에만 lag이 발생함.
- 다른 repartition들의 lag이 음수인 경우도 있어 장애 식별에 방해요소가 될 수 있음.
- 10K 기준을 설정한 이유
- 10분 동안 lag이 증가하는 것만 확인할 경우, lag이 음수이면서 절댓값이 점차 낮아지는 경우에도 restart가 발생할 수 있음
- 따라서 그라파나 알람 값을 차용하여 10K라는 절대적인 수치 확인을 추가함
- 물론 Grafana는 Consumer Group 기준이고, 비즈니스 로직은 lag이 가장 높은 단일 repartition 토픽의 파티션 값이므로 아주 같진 않음
- repartition 토픽의 합이 아니라 max 값 기준을 사용한 이유
- stream의 상태가 running 일 것
- 재시작 판단 조건에 직접 포함되진 않지만, 스케줄러의 도입부에서 이 조건을 체크하도록 되어 있음.
- 해당 조건이 없으면 리밸런싱때도 재시작 되게 됨
그리고 성능을 위하여 Kafka 매트릭에서 repartition Topic만 캐싱하였다.
매트릭 조회 캐싱(성능 고려)
- 매트릭 정보를 키값으로 바로 조회하기 위해서는
- topic명
- partition명
- client-id명(컨슈머 스레드 명)
이 필요함. 그런데 TaskMetadata에는 클라이언트 정보가 없음.
캐싱을 하지 않으면 매트릭 전체를 조회하여 TaskMetadata 내에있는 topic, partition 정보를 하나씩 순회하여 판별해야함 (TaskMetadata에서 토픽과 파티션 정보를 Set으로 관리함)
- 캐싱 하지 않았을 때 로직
- topicPartition Set을 순회
- 순회때마다 전체 Metric 조회
- 전체 Metric에서 topic, partition 값으로 찾아냄
- 즉, 각 partition의 매트릭을 찾기 위해 매트릭의 전체 목록을 매번 검색해야 하기 때문에 시간이 오래 걸림.
- partition 수 * Matric 수 (O(N^2))의 시간복잡도 발생.
- 즉, 각 partition의 매트릭을 찾기 위해 매트릭의 전체 목록을 매번 검색해야 하기 때문에 시간이 오래 걸림.
따라서 이를 캐싱하여
- 전체 매트릭에서 리파티션 토픽 관련 매트릭만 캐시하여 저장
- 캐시된 데이터에서 필요한 매트릭을 추출해 사용하는 로직 사용
자 이제 코드를 살펴보자!
코드
@Slf4j
@Component
@RequiredArgsConstructor
public class RepartitionTopicLagCheckService {
private final StreamService streamService;
private final KafkaStreamStateListener kafkaStreamStateListener;
private static final Double TEN_K = 10000.0;
private static final int TEN_MINUTES = 10;
private final Map<String, Double> cachedRepartitionMetricMap = new ConcurrentHashMap<>();
private final Map<String, Double> previousPartitionLag = new ConcurrentHashMap<>();
private final Map<String, Integer> partitionLagIncreaseCount = new ConcurrentHashMap<>();
public static final String RECORDS_LAG = "records-lag";
public static final String CONSUMER_FETCH_MANAGER_METRICS = "consumer-fetch-manager-metrics";
public static final String TOPIC = "topic";
public static final String PARTITION = "partition";
public static final String REPARTITION = "repartition";
/**
* 전체적인 로직은 이렇다
* 1. 데이터 (매트릭) 전처리
* 1.1. 데이터 캐싱
* 1.2. 캐싱한 데이터를 통하여 prev 값과 비교
* 1.2.1. 비교하여 prev 값보다 높을 시 increaseCount 증가
* 1.2.2. 비교하여 prev 값보다 작거나 같을 시 increaseCount값 유지
* 2. 리스타트 확인
* 2.1. 리스타트 필요 시 리스타트
*/
@Scheduled(fixedRate = 60000)
public void checkLagAndHandleStreamRestart() {
try {
KafkaStreams streams = streamService.getStreams();
if (!isStreamsAvailable(streams)) {
return;
}
preprocessMetrics(streams);
if (shouldRestart()) {
restartStreams();
}
} catch (Exception e) {
logger.error("Error during scheduled task", e);
}
}
// 리스타트 여부 판단
private boolean isStreamsAvailable(KafkaStreams streams) {
if (streams == null) {
logger.warn("Kafka Streams instance is null");
return false;
}
if (streams.state() != RUNNING) {
logger.warn("Kafka Streams instance is not running");
return false;
}
return true;
}
private void preprocessMetrics(KafkaStreams streams) {
cacheRepartitionMetrics(streams);
streams.localThreadsMetadata().forEach(this::processThreadMetadata);
}
private void cacheRepartitionMetrics(KafkaStreams streams) {
cachedRepartitionMetricMap.clear();
streams.metrics().entrySet().stream()
.filter(entry -> isConsumerLagMetricForRepartitionTopic(entry.getKey()))
.forEach(entry -> {
String partitionFullName = getPartitionFullName(entry.getKey());
Object value = entry.getValue().metricValue();
if (value instanceof Double) {
Double doubleValue = (Double) value;
cachedRepartitionMetricMap.put(partitionFullName, doubleValue);
} else {
logger.warn("Unexpected metric type for {}, value: {}", partitionFullName, value);
}
});
}
private boolean isConsumerLagMetricForRepartitionTopic(MetricName name) {
return RECORDS_LAG.equals(name.name())
&& CONSUMER_FETCH_MANAGER_METRICS.equals(name.group())
&& isRepartitionTopic(name.tags().getOrDefault(TOPIC, ""));
}
private String getPartitionFullName(MetricName metricName) {
return String.format("%s-%s", metricName.tags().get(TOPIC), metricName.tags().get(PARTITION));
}
private void processThreadMetadata(ThreadMetadata threadMetadata) {
threadMetadata.activeTasks().forEach(this::processTaskMetadata);
}
private void processTaskMetadata(TaskMetadata taskMetadata) {
taskMetadata.topicPartitions().forEach(this::processTopicPartition);
}
private void processTopicPartition(TopicPartition tp) {
if (!isRepartitionTopic(tp.topic())) {
return;
}
String partitionFullName = String.format("%s-%d", tp.topic(), tp.partition());
Double lag = getLagForPartitionFullName(partitionFullName);
if (!Double.isNaN(lag)) {
updatePartitionLagIncreaseCount(partitionFullName, lag);
}
}
private boolean isRepartitionTopic(String topic) {
return topic.contains(REPARTITION);
}
private Double getLagForPartitionFullName(String partitionFullName) {
return cachedRepartitionMetricMap.getOrDefault(partitionFullName, Double.NaN);
}
private void updatePartitionLagIncreaseCount(String partitionFullName, Double currentLag) {
Double previousLag = previousPartitionLag.getOrDefault(partitionFullName, 0.0);
if (previousLag < currentLag) {
int newLagIncreaseCount = partitionLagIncreaseCount.getOrDefault(partitionFullName, 0) + 1;
partitionLagIncreaseCount.put(partitionFullName, newLagIncreaseCount);
} else {
partitionLagIncreaseCount.put(partitionFullName, 0);
}
previousPartitionLag.put(partitionFullName, currentLag);
}
private boolean shouldRestart() {
long elapsedMinutes = Duration.between(kafkaStreamStateListener.getLastStreamRunningTime(), Instant.now()).toMinutes();
Double maxLag = cachedRepartitionMetricMap.values().stream().max(Double::compareTo).orElse(Double.NaN);
int maxLagIncreaseCount = getMaxLagIncreaseCount();
return TEN_MINUTES < maxLagIncreaseCount
&& TEN_MINUTES < elapsedMinutes
&& TEN_K < maxLag;
}
private void restartStreams() {
// 리스타트 전 전체 상황 로깅
int maxLagIncreaseCount = getMaxLagIncreaseCount();
partitionLagIncreaseCount.entrySet().stream()
.filter(entry -> entry.getValue().equals(maxLagIncreaseCount))
.forEach(entry -> logger.error(
"Attempting to restart streams due to high lag in some partitions. Partition Name: {}, Lag Count: {}, Increase Count: {}",
entry.getKey(),
entry.getValue(), maxLagIncreaseCount));
streamService.restartStream(); //코드에서는 추상화 하였으나 쿠버네티스 liveness 설정으로 재시작 하였다.
}
private Integer getMaxLagIncreaseCount() {
return partitionLagIncreaseCount.values()
.stream()
.max(Integer::compareTo)
.orElse(null);
}
참고
strimzi 버전 | kafka 버전 | schema registry 버전 |
ksql 버전(무관) |
0.20.1 | 2.6.0 | 6.1.0 | 6.1.0 |
'트러블 슈팅 > 인프라' 카테고리의 다른 글
(Redis) 레디스 클러스터를 사용하는 어플리케이션의 커넥션 타임아웃 에러 (0) | 2024.06.17 |
---|---|
(Kafka) Kafka Streams에서 Repartition Topic을 Consume하지 못하는 장애(feat: kafkaAdminCli) (0) | 2024.06.16 |
(Kafka) 유효하지 않은 레코드가 토픽에 들어와 카프카 어플리케이션 장애가 났을 때 (2) | 2024.05.08 |
(Airflow) 배치 날짜값 오차로 인한 통계값 오류가 발생한 이슈 2 (8) | 2024.05.01 |
(Airflow) 배치 날짜값 오차로 인한 통계값 오류가 발생한 이슈 1 (2) | 2024.04.28 |