CheerUp_Cheers

아파치 카프카 - 애플리케이션 프로그래밍 with 자바 (4) 본문

미들웨어

아파치 카프카 - 애플리케이션 프로그래밍 with 자바 (4)

meorimori 2025. 1. 12. 22:35

Chapter4

4.1 토픽과 파티션

4.1.1 적정 파티션 개수

  • 토픽 생성 시 파티션 개수 고려사항

    1. 데이터 처리량
    2. 메시지 키 사용 여부
    3. 브로커, 컨슈머 영향도
  • 데이터 처리 속도 올리는 법

    1. 컨슈머의 처리량 늘리는 것

      스케일 업, GC 튜닝

      → 컨슈머 특성상 다른 시스템과 연동으로 일정 수준 힘듬.

    2. 컨슈머를 추가하여 병렬처리량을 늘리기.

      프로듀셔 전송 데이터량 < (컨슈머 데이터 처리량 * 파티션 개수)

      → 컨슈머 랙이 생김.

  • 메시지 키 사용 여부

    메시지 키와 데이터 처리 순서에 대해 고려 해야 함.

    • 기본 파티셔너

      메시지 키를 해시로 변환하여 파티션에 매칭함.

      파티션 개수가 달라지는 순간 메시지키를 사용하는 특정 메시지 키의 순서는 보장 받을 수 없음.

    • 커스텀 파티셔너

      파티션 개수가 변해도 메시지 키의 매칭을 가져갈 수 있게.

      커스터밍하기 싫다면 처음부터 파티셔너를 넉넉하게 생성.

  • 브로커와 컨슈머 영향도

    파티션이 늘어나는 만큼, 브로커에서 접근하는 파일 개수 많아짐.

    브로커에서 접근하는 파일 개수가 정해져있음

    → 카프카 브로커의 개수를 늘리는 방안도 고려.

4.1.2 토픽 정리 정책(cleanup.policy)

토픽의 데이터는 시간 또는 용량에 따라 삭제 규칙 적용.

  • cleanup.policy

    • 삭제

      데이터의 완전 삭제.

    • 압축

      동일 메시지 키의 가장 오래된 데이터 삭제.

  • 토픽 삭제 정책

    토픽 운영 시, 대부분 delete로 설정한거고 명시적으로 토픽의 데이터를 삭제하는것을 뜻함.

    세그먼트 단위로 저장과 삭제.

    세그먼트
    토픽의 데이터를 저장하는 명시적 파일시스템 단위
    파티션별 별개로 생성

    • 삭제 정책 실행 시점과 용량

      시간 : 세그먼트 파일의 마지막 수정시간과 retention.ms 비교

      용량 : 토픽의 최대 데이터 크기를 retention.bytes를 넘으면 제거

삭제되면 복구 불가
  • 토픽 압축 정책

    일반적인 zip이나 tar압축과는 다른 개념.

    동일한 키를 기준으로 오래된 데이터 삭제

    • 사용 예시

      카프카 스트림즈 KTable과 같이 메시지키를 기반으로 데이터 처리시 유용.

      → 데이터 흐름이 아닌 가장 마지막으로 업데이트된 메시지키의 데이터가 중요할 경우.

    • 압축 정책 대상

      액티브 세그먼트를 제외한 나머지 세그먼트 대상.

      • min.cleanable.dirty.ratio : 데이터 압축 시작 시점

        액티브 세그먼트를 제외한 세그먼트에 남아있는 tail 영역의 레코드 개수와 헤드 영역의 레코드 개수 비율을 뜻함.

    테일 > 헤드 > 엑티브순으로 영역을 가짐

    - 테일

        압축이 완료된 클린 로그로 중복된 메시지키 없음

    - 헤드

        압축이 되기전의 더티 로그 중복된 메시지키 보유


    > **더티 비율**
    더티 영역 메시지 개수/ ****(더티 영역 메시지 개수 + 클린 영역 메시지 개수)
    > 

    이 더티 비율이 1에 가까울수록 한번에 많은 압축량이 많다.

    0.1에 가까울소록 자주 압축이 되어 최신데이터만 가져가지만 브로커에 부담이 됨으로 적정 (cleanable.dirty.ratio) 설정이 필요.

4.1.3 ISR(In-Sync-Replicas)

ISR
리더 파티션과 팔로워와 모두 싱크가 된 상태.
팔로워 파티션이 리더 파티션으로부터 복제되는 시간이 걸리기 떄문에 생겨난 용어

  • replica.lag.time.max.ms

    리더 파티션이 이 주기를 가지고, 복제하는지 확인.

    이시간을 넘어서도 가져가지 않는다면 ISR그룹에서 제외.

  • unclean.leader.election.enable

    ISR로 묶인 파티션들만 리더로 선출할 자격이 생기는데.

    • true

      true로 할 경우, ISR그룹이 아닌 경우에도 선출이 되며 데이터 유실이 발생하더라도 서비스를 중단하지 않고 토픽 사용하고싶다는 의미와 같음.

    • false

      이 경우 리더 파티션이 존재하는 브로커가 다시 시작될떄까지 해당 토픽을 사용하는 서비스의 중단을 의미.

→ 운영정책에따라 정하는게 중요

- 토픽별 설정

    토픽 생성시 설정하는 방법은 아래와 같음

    ```java
    bin/kafka-topics.sh --bootstrap-server my-kafka:9092 \
    --create --topic my-topic \
    --config unclean.leader.election.enable=false
    ```

4.2 카프카 프로듀서

4.2.1 acks 옵션

0,1,all(-1)값을 가질 수 있음.

복제 개수가 1인 경우에는 이 옵션에 따른 성능변화는 크지 않고, 복수 개수가 2이싱안 경우에 대한 동작 방식은 아래와 같다.

  • acks=0

    프로듀서가 리더 파티션으로 데이터 전송 후, 저장여부 확인 안한다는 뜻.

    → 몇 번쨰 오프셋에 저장되었는지 리턴안한다는 뜻..

    retries 옵션갑승 무의미함.

    프로듀서와 브로커 사이의 오류나 브로커 이슈등이 생기더라도 지속적으로 데이터 보내기에 훨씬 빠름

    → 데이터 유실이 발생하더라도 전송속도가 중요하다면 이 옵션 사용

  • acks=1

    리더 파티션이 데이터를 적재했음을 보장.

    • 데이터 유실

      팔로워 파티션에 동기화가 안되었을때 리더파티션 브로커에 장애가 발생했을 경우.

→ acks 0보다는 느림
  • akcs=all or acks=-1

    리더와 팔로워 모두 적재되었음 확인. (ISR에 포함된 그룹만!)

    일부 브로커 장애 발생 되었을 떄도 안전하게 전송 및 저장.

    • min.insync.replicas

      옵션값이 1이면 ISR 중 하나의 파티션에 적재되었음.

      리더도 포함임으로 1이면 acks1과 동일.

      2로 설정

      → 운영상 동시에 2개 중단되는일은 드뭄.

      복제개수도 고려 대상

      → 3으로 설정하고 복제를 3개로한다면 1개의 브로커가 이슈가 발생하면 토픽이 전송이 되지않음, 브로커 개수보다 작게 설정

4.2.2 멱등성 프로듀서

멱등성
여러 번 연산을 수행하더라도 동일한 결과를 나타냄.
동일한 데이터를 여러 번 전송하더라도 카프카에 단 한번만 저장.

  • 정확히 한번 전달 (enable.idempotence)

    기본값은 false이며 true로 설정하여 멱등성 프로듀서로 동작하게함.

    PID와 시퀀스넘버를 함꼐 전달하여 동일한 메시지가오면 정확히 한번만 브로커에 적재되도록 동작.

4.2.3 트랜잭션 프로듀서

원자성
전체 데이터 처리하거나 전체를 처리 안함

다수의 파티션에 모든 데이터에 대한 원자성 만족

  • 설정

    프로듀서 - enable.idempotence=true

    컨슈머 - isolation.level=read_committed

  • 트랜잭션 구분

    파티션의 레코드로 구분. ( 실질적 데이터 x)

    레코드의 특성은 가지기에 오프셋 한개 차지.

4.3 카프카 컨슈머

4.3.1 멀티 스레드 컨슈머

처리량을 위해서 파티션 개수 = 컨슈머 개수가 가장 좋다.

N개의 파티션을 위해서, N개의 스레드를 가진 1개의 프로세스를 운영하거나 1개의 스레드를 가진 n개의 프로세스를 운영할 수도 있다.

→ 개발자의 선택.

  • 멀티 스레드 컨슈머 고려 사항

    1. 예외적 사항(하나의 컨슈머 스레드가 OOM으로 죽을 경우.)
    2. 컨슈머 비정상종료로 인한 데이터 처리 중복 및 유실
    3. 스레드 세이프한 로직
  • 컨슈머 멀티 스레드 구현

    1. 멀티 워커 스레드

      하나의 컨슈머 스레드에 데이터를 처리하는 워커 스레드를 여러개

    2. 컨슈머 멀티 스레드

      컨슈머 인스턴스에서 poll()메서드로 호출하는 스레드 여러개

  • 카프카 컨슈머 멀티 워커 스레드 전략

    자바의 ExecutorService 자바 라이브러리를 사용하면 레코드 병렬 처리 가능.

    • CachedThreadPool

      작업 이후 스레드가 종료되어야한다면 사용.

      • java - ConsumerWorker

        Runnable 인터페이스 상속

    • 스레드 호출하는 구문

      ExecutorService 사용

      • java - ConsumerWithMultiWorkerThread

          KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
                  consumer.subscribe(Arrays.asList(TOPIC_NAME));
                  ExecutorService executorService = Executors.newCachedThreadPool();
                  while (true) {
                      ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
                      for (ConsumerRecord<String, String> record : records) {
                          ConsumerWorker worker = new ConsumerWorker(record.value());
                          executorService.execute(worker);
                      }
                  }
    • 콘솔 확인

      • 토픽에 메시지 사용

          ./kafka-console-producer.sh --broker-list my-kafka:9092 --topic test
      • 출력 화면

      • 주의점

        1. 이 코드는 데이터 처리가 끝났음을 리턴받지않고 커밋하여 컨슈머 장애 시 데이터 유실 발생 가능

        2. 레코드 처리 역전 현상

          중복이나 역전현상이 중요하지 않은 서비스에 이용

  • 카프카 컨슈머 멀티 스레드 전략

    각 스레드에 각 파티션 할당.

    • 주의점

      토픽의 파티션 개수만큼만 컨슈머 스레드를 운영해야함.

      → 할당되지 못한 컨슈머 스레드는 데이터 처리 못함

    • 컨슈머 스레드

      • java - ConsumerWorker (kafka-multi-consumer-thread-by-partition)

        동일하게 Runnable 인터페이스

      • java - MultiConsumerThread

      • 파티션 확인 및 증설

        파티션이 하나일 경우, thread-0으로만 나오기에 수정해서 확인 필요

           ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --describe
           ./kafka-topics.sh --alter --topic test --partitions 3 --bootstrap-server my-kafka:9092
      • 출력화면

4.3.2 컨슈머 랙

토픽의 최신 오프셋과 컨슈머 오프셋 간의 차이

  • 컨슈머 랙 모니터링

    카프카를 통한 데이터 파이프라인을 운영하는데에 핵심적인 역할.

    컨슈머 장애, 파티션 개수를 정하는데 참고 가능.

  • 컨슈머 이슈 예상사항

    프로듀서가 동일한 데이터양인데도, 컨슈머 랙이 늘어난다면 의심해볼만함.

  • 컨슈머 랙 확인 법

    각각 장단점 존재

    1. 카프카 명령어
    2. 컨슈머 애플리케이션 metrics() 메서드
    3. 외부 모니터링 툴
  • 카프카 명령어를 사용한 컨슈머 랙 조회

      bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 \
      --group my-group --describe
    • 단점

      일회성으로 지속적인 모니터링에는 부족 (테스트용)

  • 컨슈머 metrics() 메서드를 사용하여 컨슈머 랙 조회

    컨슈머 랙 관련 모니터링 지표 3가지 확인

        Map<MetricName, ? extends Metric> metrics = consumer.metrics();
                  metrics.forEach((metricName, metric) -> {
                      if (metricName.name().equals("records-lag-max") |
                              metricName.name().equals("records-lag") |
                              metricName.name().equals("records-lag-avg")) {
                          System.out.printf("Metric: %s = %s%n", metricName.name(), metric.metricValue());
                      }
                  });
    • 단점

      컨슈머가 정상적으로 실행될 경우에만 호출

      컨슈머 모니터링 코드 중복 작성 필요

      → 특정 컨슈머 그룹에 해당하는 컨슈머랙만 한정

  • 외부 모니터링 툴을 사용한 컨슈머 랙 조회 (가장 최선의 방법!)

    • 카프카 클러스터 종합 모니터링 툴
      • 데이터 독
      • 컨플루언트 컨트롤 센터
      • 버로우 (컨슈머 모니터링만을 위한 오픈소스)

4.3.2.1 카프카 버로우

카프카 클러스터와 연동하여 REST API를 통해 컨슈머 그룹별 랙 조회 가능.

  • 카프카 버로우 REST API
| HTTP 메서드 | 엔드포인트 | 설명 |
| --- | --- | --- |
| GET | `/burrow/admin` | Burrow의 상태를 확인하는 헬스 체크 엔드포인트입니다. |
| GET | `/v3/kafka` | Burrow와 연동된 Kafka 클러스터 목록을 반환합니다. |
| GET | `/v3/kafka/{클러스터 이름}` | 지정된 클러스터의 상세 정보를 조회합니다. |
| GET | `/v3/kafka/{클러스터 이름}/consumer` | 해당 클러스터에 존재하는 컨슈머 그룹 목록을 반환합니다. |
| GET | `/v3/kafka/{클러스터 이름}/topic` | 해당 클러스터에 존재하는 토픽 목록을 반환합니다. |
| GET | `/v3/kafka/{클러스터 이름}/consumer/{컨슈머 그룹 이름}` | 특정 컨슈머 그룹의 오프셋 및 지연 정보를 조회합니다. |
| GET | `/v3/kafka/{클러스터 이름}/consumer/{컨슈머 그룹 이름}/lag` | 특정 컨슈머 그룹의 파티션 상태, 지연 정보를 조회합니다. |
| GET | `/v3/kafka/{클러스터 이름}/topic/{토픽 이름}` | 특정 토픽의 상세 정보를 조회합니다. |
| DELETE | `/v3/kafka/{클러스터 이름}/consumer/{컨슈머 그룹 이름}` | 버로우에서 모니터링 중인 컨슈머 그룹 삭제 |
  • 장점

    1. 한번의 설정으로 다수의 카프카 클러스터 컨슈머 랙 확인 가능.

    2. 파티션 상태를 단순한 컨슈머 랙의 임계치로 나타내지않음

      랙이 특정 시점에 100만이 넘었다고 이슈가 있다고 단정 X.

      임계치마다 알림을 받는 것은 의미가 없음.

  • 컨슈머 랙 평가

    슬라이딩 윈도우로 계산하여 상태 정함

    • 파티션 상태

      OK, STALLED, STOPPED

    • 컨슈머 상태

      OK, WARNING, ERROR

  • 컨슈머 랙 모니터링 아키텍처

    이미 지나간 컨슈머 랙을 개별적으로 모니터링하기 위해서는 별개의 저장소와 대시보드를 사용하는것이 효과적.

    • 준비물

      • 버로우 : REST API 를 통한 컨슈머랙 조회

      • 텔레그래프 : 데이터 수집 및 전달에 특화, 버로우 조회 → 엘라스틱 서치에 전달

      • 엘라스틱서치 : 컨슈머 랙 정보를 담는 저장소

      • 그라파나 : 엘라스틱서치의 정보를 시각화하고 특정조건에 따라 슬랙 알람을 보낼수 있는 대시보드 툴.

        슬랙, 라인 텔레그램 가능.

4.3.3 컨슈머 배포 프로세스

  • 중단 배포

    한정된 서버 자원을 운영하는 기업.

    컨슈머 랙이 늘어남.

    • 장점

      신규 애플리케이션의 실행 전후를 명확하게 특정 오프셋지점으로 나눌 수 있음.

      버로우로 배포시점의 오프셋로깅한다면 롤백할때도 재처리 용이.

  • 무중단 배포

    • 종류

      1. 블루/그린 배포
      2. 롤링
      3. 카나리 배포
    • 블루/그린 배포

      이전 버전 애플리케이션과 신규 버전 애플리케이션 동시 띄어놓고 트래픽 전환.

      파티션 개수와 컨슈머 개수가 동일한 애플리케이션에 유용.

      → 동일 컨슈머 그룹으로 구독하도록 시랭하면 파티션 할당없이 유휴 상태로 대기 가능

      → 신규 애플리케이션이 준비되면 기존을 모두 중단하여 리밸런싱으로 신규 컨슈머와 연동됨.

    • 롤링 배포

      블루/그린 배포의 인스턴스 할당과 반환으로 인한 리소스 낭비 줄일 수 있음.

      파티션 개수가 인스턴스 개수와 같아야함.

      • 파티션이 2개면 2번의 리밸런싱, 파티션 개수가 많을 수록 오래걸림으로 개수가 많지않음에 적합.

    • 카나리 배포

      작은 위험을 통해 큰 위험을 예방하는 방법의 배포.

      100개의 파티션으로 운영하는 경우, 1개의 파티션 테스트

      이후, 롤링 또는 블루/그린 배포

4.4 스프링 카프카

스프링 프레임워크에서 효과적으로 사용하도록 만들어진 라이브러리.

ㄴ 카프카 클라이언트의 여러가지 패턴 제공 (Concurrency 옵션)

  • build.gradle

      implementation 'org.springframework.kafka:spring-kafka:2.5.10.RELEASE'

4.4.1 스프링 카프카 프로듀서

  • 카프카 템플릿

    스프링 카프카는 카프카 템플릿으로 데이터 전송 가능.

    프로듀서 팩토리 클래스를 통해 생성.

    • 두가지 생성 방법
      1. 기본 카프카 템플릿
      2. 사용자가 생성한 카프카 템플릿 프로듀서 팩토리로 생성
  • 기본 카프카 템플릿

    application.yaml에 프로듀서 옵션값과 도일함.

    하지만 스프링 카프카는 옵션을 적지않아도 자동설정되어 실행됨.

    • 기본 설정

        bootstrap-servers : localhost:9092
        key-serializer, value-serializer : StringSerializer
    • yaml

      연결하고자하는 대상서버 my-kakfa:9092고 acks=all 설정.

        spring:
            kafka:
                producer:
                bootstrap-servers: my-kafka:9092
            acks: all
    • java

        @Autowired
        private KafkaTemplate<Integer, String> template;
  • 커스텀 카프카 템플릿

    프로듀서 팩토리를 통해 카프카 템플릿 객체를 빈으로 등록하여 사용하는 것.

    A클러스터, B클러스터로 정송하는 프로듀서를 동시에 사용하고 싶으면 두개 등록하여 사용 가능.

    • KafkaTemplateConfiguration - java

        @Configuration
        public class KafkaTemplateConfiguration {
      
            @Bean
            public KafkaTemplate<String, String> customKafkaTemplate() {
      
                Map<String, Object> props = new HashMap<>();
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
                props.put(ProducerConfig.ACKS_CONFIG, "all");
      
                ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
      
                return new KafkaTemplate<>(pf);
            }
        }
      
    • SpringProducerApplication - java

        @SpringBootApplication
        public class SpringProducerApplication implements CommandLineRunner {
      
            private static String TOPIC_NAME = "test";
      
            @Autowired
            private KafkaTemplate<String, String> customKafkaTemplate; // 빈 객체와 동일하게 선언
      
                ...
      
            @Override
            public void run(String... args) {
                ListenableFuture<SendResult<String, String>> future = customKafkaTemplate.send(TOPIC_NAME, "test");
                future.addCallback(new KafkaSendCallback<String, String>() { // 콜백을 통해 브로커 적재 여부 비동기로 확인
                    @Override
                    public void onSuccess(SendResult<String, String> result) { // 정상 적재
      
                    }
      
                    @Override
                    public void onFailure(KafkaProducerException ex) { // 적재 x, 이슈 발생
      
                    }
                });
                System.exit(0);
            }
        }
      

4.4.2 스프링 카프카 컨슈머

컨슈머 기존 컨슈머를 2개의 타입으로 나누고 커밋을 7가지 세분화.

  • 컨슈머 타입

    • 레코드 리스너

      단 1개의 레코드를 처리

    • 배치 리스너

      poll() 메서드 리턴받은 ConsumerRecords처럼 한번에 여러 개 레코드 처리.

  • 이외 리스너 파생된 형태

    • Acknowledging이 붙은 리스너 - 메뉴얼 커밋
    • ConsumerAware이 붙은 리스너 - 카프카 컨슈머 인스턴스 직접 컨트롤 하고 싶다
  • 커밋 구현

    카프카 컨슈머에서 커밋을 직접 구현할 떄는 오토 커밋, 동기 커밋, 비동기 커밋 세가지로 나뉘지만, 실제 운영환경에서는 다양한 종류의 커밋을 구현해서 사용.

    • 스프링 카프카 커밋
    | AcksMode | **설명** |
    | --- | --- |
    | RECORD | 레코드 단위로 프로세싱 이후 커밋 |
    | BATCH | poll() 메서드 호출 레코드가 모두 처리된 이후 커밋 
    **기본 값** |
    | TIME | 특정 시간 이후 커밋 
    시간 간격 선언하는 AckTime 옵션 설정 필요 |
    | COUNT | 특정 개수만큼 레코드가 처리된 이후에 커밋
    레코드 개수 AckCount 옵션 설정 필요 |
    | COUNT_TIME | TIME, COUNT 옵션 중, 하나라도 맞으면 커밋 |
    | MANUAL | 컨슈머 로직에서 명시적 acknowlegde() 메소드 호출해 커밋, 이후에는 BATCH 옵션과 동일 동작
    - AcknowledginMessageListner, BatchAcknowledgingMessageListener |
    | MANUAL_IMMEDIATE | 컨슈머 로직에서 명시적 acknowlegde() 메소드 호출한 즉시 커밋.
    - AcknowledginMessageListner, BatchAcknowledgingMessageListener |

리스너를 생성하고 사용하는 방식은 크게 두가지.

- 리스너 생성
    1. 기본 리스너 컨테이너 사용
    2. 컨테이너 팩토리 사용하여 직접 리스너 만드는 것
  • 기본 리스너 컨테이너

    기본 리스너 컨테이너는 application.yaml 컨슈머와 리스너 옵션을 놓고 사용하며 자동 오버라이드 됨.

    • 스프링 카프카 사용되는 옵션

      https://docs.spring.io/spring-boot/index.html

    • application.yaml

      AckMode 옵션 예시

        spring:
          kafka:
            consumer:
              bootstrap-servers: my-kafka:9092
            listener:
              type: BATCH
              ack-mode: MANUAL_IMMEDIATE
    • 레코드 리스너 (spring-kafka-record-listener )

      type: RECORD

    • 배치 리스너 ( spring-kafka-batch-listener )

      type: BATCH

    • 배치 커밋 컨슈머 리스너(spring-kafka-listener-container)

      type: MANUAL_IMMEDIATE

      AckMode도 사용하고, 컨슈머도 사용하고 싶다면 사용.

  • 커스텀 리스너 컨테이너

    서로 다른 설정을 가진 2개 이상의 리스너 구현 및 리밸런스 리스너 구현.

    카프카 리스너 컨테이너 팩토리 인스턴스 생성하고 빈으로 등록.

    • java (spring-kafka-listener-container -ListenerContainerConfiguration)

      카스카 리스너 컨테이너 팩토리

    • java (spring-kafka-listener-container -
      SpringConsumerApplication)

      커스텀 컨슈머 리스너 팩토리 빈 객체를 사용하는 커스텀 리스너 컨테이너.