CheerUp_Cheers

아파치 카프카 - 애플리케이션 프로그래밍 with 자바 (3) 본문

미들웨어

아파치 카프카 - 애플리케이션 프로그래밍 with 자바 (3)

meorimori 2024. 12. 23. 23:03

Chapter3

3.1 카프카 브로커,클러스터,주키퍼

데이터 저장/전송

  • 실습한 저장된 파일 시스템 확인

    config/server.properties의 log.dif 옵션에 정의한 디렉토리

      ls /tmp/kafka-logs
    • 파티션 수만큼 생성된 디렉토리 확인

  • 토픽의 0번 파티션에 존재하는 데이터

    • 용어

      log - 메시지와 메타테이터

      index - 메시지의 오프셋

      timeindex - 메시지에 포함된 timestamp값을 기준으로 인덱싱한 정보, 브로커가 적재한 데이터 삭제나 압축

  • 카프카는 파일 시스템에 저장한다.

    느려지지 않는가?
    파일 입출력에대한 속도 이슈가있지만, 페이지 캐시 사용으로 디스크 입출력 속도를 높여서 문제 해결.

    → 브로커의 힙메모리를 크게 설정할 필요가 없다.

데이터 복제, 싱크

  • 복제 단위

    파티션 단위로 이루어지며, 토픽 생성시 옵션을 선택하지않으면 브로커 설정을 따라감.

  • 리더/팔로워 파티션

    팔로워(복제) 파티션은 리더 파티션의 오프셋을 확인하여 차이나는 경우 저장(복제)

    • 단점

      복제 개수 만큼 저장 용량이 늘어남

  • 복제 개수

    1개 or 2개 : 데이터 처리 속도가 중요하다

    3개이상 : 정보 유실이 일어나면 안된다

  • 컨트롤러

    클러스터의 다수 브로커중 한대가 컨트롤러 역할.

    브로커 상태가 안좋으면 클러스터에서 빼내는 역할.

데이터 삭제

로그 세그먼트
카프카의 삭제되는 파일 단위

카프카는 다른 메시징 플랫폼과 다르게 컨슈머가 가져가도 삭제하지 않음.

삭제하지 않고, 메시지 키를 기준으로 압축하는 정책을 가져갈 수 도 있음.

컨슈머 오프셋 저장

  • 위치

    아래의 저장된 기준으로 레코드를 가져가서 처리.

      ls /tmp/kafka-logs/__consumer_offsets

코디네이터

클러스트의 다수 브로커 중, 한대가 역할을 수행.

리밸런싱
컨슈머 그룹의 상태를 체크하고 파티션을 컨슈머와 매칭되도록 배분

  • 주키퍼의 역할

    카프카의 메타 데이터를 관리하는데 사용.

    주키퍼는 카프카 2.8부터는 사용하지 않음.

  • 카프카 서버에 주키퍼 붙이기

    2181포트가 주키퍼 포트

      bin/zookeeper-shell.sh my-kafka:2181
  • 주키퍼 명령어

    • ls /

      root znode의 하위 znode확인

    • get brokers/ids/0

      실승용으로 생성한 카프카 브로커에 대한 정보

    • get /controller

      어느 브로커가 컨트롤러인지

    • get /brokers/topics

      저장된 토픽 확인

  • 주키퍼에서 다수의 카프카 클러스트를 사용하는 방법

    주키퍼의 서로다른 znode에 카프카 클러스터들을 설정하면 됨.

    • 주키퍼 옵션 정의

        파이프라인용 카프카 클러스터 : zookeeper.connect=localhost:2181/pipeline
        실시간 추천용 카프카 클러스터 : zookepper.connect=localhost:2181/recommend

3.2 토픽과 파티션

레코드
파티션에 들어가는 프로듀서들이 보낸 데이터

  • 속도 증가

    파티션과 컨슈머 개수를 늘려서 처리량(병렬처리) 가능

    큐와 비슷한 구조

  • 토픽 이름 제약 조건

    길이제한, 빈문자열 지원하지 않음 등.. 이 있으니 토픽 생성시 확인하여 생성하면 될듯하다.

  • 의미있는 토픽 작명

    • <환경>.<팀명>.<애플레킹션명>.<메시지-타입>
      prd.marketing-team.sms-platform.json

    • <프로젝트-명>.<서비스-명>.<환경>.<이벤트-명>

      commerce.payment.prd.notification

… 등 팀 혹은 사내 정책으로 정하면 좋을 듯하다.

3.3 레코드

  • 레코드 구성

    타임스탬프, 메시지 키, 메시지 값, 오프셋, 헤더

  • 레코드 수정

    수정은 불가능하며, 로그 리텐션 기간 또는 용량에 따라 삭제만.

  • 타임스탬프

    프로듀서에서 생성된 시점의 유닉스 타임.

    컨슈머는 언제 생성되었는지 확인은 가능하나, 프로듀서가 생성할떄 임의의 타임스탬프 값을 설정할 수있어 설정에 따라 다를 수 있다.

  • 메시지키와 파티션 관계

    프로듀서가 토픽에 레코드를 전송 할때, 메시지 키의 해시값을 토대로 파티션 지정.

    → 하지만, 파티션 개수가 변경되면, 매칭이 달라짐

  • 메시지 값

    브로커로 전송될때는 직렬화된 메시지 키와 값으로 전달.

    컨슈머가 사용할때는, 동일하게 역직렬화 수행.

  • 오프셋

    오프셋은 지정할수 없고, 이전에 전송된 레코드의 오프셋 +1값.

  • 헤더

    레코드의 추가적인 정보를 담는 저장소 용도.

3.4 카프카 클라이언트

  • 카프카 클라이언트

    카프카 클러스터에 명령을 내리는 라이브러리.

    프레임워크나 애플리케이션 위에서 동작

3.4.1 프로듀셔 API

프로듀셔 애플리케이션은 카픜에 필요한 데이터 선언.

브로커의 특정 토픽의 파티션에 전송.

  • 데이터 전송

    데이터를 직렬화하여 보내기 때문에, 자바의 선언 가능한 모든 형태를 브로커로 전송 가능.

  • 토픽 생성

      bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --create \
      --topic test \
      --partitions 3
  • 프로듀셔 애플리케이션 메시지 발송

    SimpleProducer 실행.

    • 주의점

      hosts 파일에 ec2설정과 같이 my-kafka를 퍼블릭 ip 로 설정

      방화벽도 열어놔야함.

  • 토픽 확인

      bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 --topic test --from-beginning
  • 파티셔너 종류

    메시지키가 있을 경우, 키의 해시값과 파티션을 매칭하여 전송 동일

    • UniformStickyPartitioner

      디폴트

      RoundRobinPartitioner의 단점을 개선, 메시지키가없을때 최대한 동일하게 분배하는 로직 존재.

      높은 처리량, 낮은 리소스 사용률

    • RoundRobinPartitioner

  • 카프카 프로듀셔 압축옵션

    기본적으로는 압축x

    gzip, snappy, lz4, zstd 지원.

    ⇒ 네트워크 처리량에는 이득이지만, 컨슈머에서 처리해야함.

  • 프로듀셔 주요 옵션

    기본값을 잘 파악하여 설정하자!

    • 필수옵션

      • bootstrap.servers

        프로듀셔가 전송할 카프카 클러스터에 속한 브로커의 호스트 이름:포트 (2개 이상 입력하자)

      • key.serializer

      • value.serializer

    • 선택옵션

      • acks

        브로커들에 정상적으로 저장되었는지.

        default : 1

        • 0 : 브로커저장여부 안보고, 전송 즉시
        • -1(all) : 토픽의 min.insync.replicas 개수에 해당하는 리더/팔로워 파티션에 저장되면 성공으로
        • 1 : 리더파티션에 저장되면 성공으로
      • buffer.memory

        브로커로 전송할 데이터를 배치로 모으기위해 설정할 버퍼 메모리양

        default : 32MB

      • retries

        프로듀셔가 브로커로 에러를 받고 재전송 횟수

        default : 2147483647

      • batch.size

        배치로 전송할 레코드 최대 용량.

        너무 작으면 자주 보내어 네트워크 부담 ,너무 크면 메모리를 많이 사용.

        default : 16384

      • linger.ms

        배치를 전송하기전까지 시간

        default : 0

      • partitioner.class

        레코드를 파티션에 전송하기 위해 적용하는 클래스 지정

        default : DefaultPartitioner

      • enable.idempotence

        멱등성 프로듀서로 동작할지 여부. (Chapter4에서 배움)

        default : false

      • transactional.id

        프로듀서가 레코드 전송 시, 레코드를 트랜잭션 단위로 묶을지 여부 (설정 시, 트랜잭션 프로듀서로 동작)

        default : null

  • 메시지 키를 가진 데이터를 전송하는 프로듀셔

    • java - ProducerWithKeyValue

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "key");  
    • 키랑 같이 console로 보기

        bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
        --topic test \
        --property print.key=true \
        --from-beginning
  • 파티션 직접 지정

    • java - ProducerExactPartition

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, partitionNo, "key", "Pangyo");
  • 커스텀 파티셔너 생성 및 설정

    • java - ProducerWithCustomPartitioner

      CustomPartitioner는 키값을 확인하여 파티션 설정.

        configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
    • 콘솔

      --property print.partition=true 추가하면 어느 파티션에서 소비되었는지 확인가능

        bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
        --topic test \
        --property print.key=true \
        --property print.partition=true \
        --from-beginning

  • 브로커 정상 전송 여부 확인하는 프로듀셔

    • java - ProducerWithAsyncCallback

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "PangyoKey", "Pangyo");
        producer.send(record, new ProducerCallback());
    • java - ProducerCallback

      send()는 Future 객체를 반환함으로, recordMetadata를 비동기 반환

      주의 : 빠르지만, 데이터 순서가 중요하다면.. 동기로 받아

        public class ProducerCallback implements Callback {
            private final static Logger logger = LoggerFactory.getLogger(ProducerCallback.class);
      
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e != null)
                    logger.error(e.getMessage(), e);
                else
                    logger.info(recordMetadata.toString());
            }
        }
    • 로그 결과

        [kafka-producer-network-thread | producer-1] INFO com.example.ProducerCallback - test-1@1
      • RecordMetadata

        {토픽이름}-{파티션번호}@{오프셋번호} = test-1@1

3.4.2 컨슈머 API

적재된 데이터를 사용하기 위해 브로커로 부터 데이터를 가져와 필요한 처리.

동일하게 자바 애플리케이션으로 처리부 생성.

  • 컨슈머 그룹

    컨슈머 목적 분리

    컨슈머가 중단되거나 재시작되어도 컨슈머 그룹의 오프셋 기준으로 데이터 처리.

    없으면, 어떤 그룹에도 속하지않은채 동작.

  • 컨슈머 동작 후, 로그 확인

      [main] INFO com.example.SimpleConsumer - record:ConsumerRecord(
      topic = test,
      partition = 1,
      leaderEpoch = 0, offset = 2,
      CreateTime = 1734101735044, serialized key size = 9,
      serialized value size = 6,
      headers = RecordHeaders(headers = [], isReadOnly = false),
      key = PangyoKey,
      value = Pangyo)

    소비된 파티션, 오프셋 등을 확인 가능.

    leaderEpoch는 현재 리더 파티션의 변경 세대 번호를 의미…

    → 리더변경 시, 증가하며 변경여부 확인가능

  • 컨슈머 중요 개념

    • 운영방법

      운영 방법은 크게 2가지

      • 1개이상의 컨슈머 구성의 컨슈머 그룹 운영
      • 특정 파티션만 구독하는 컨슈머 운영
    • 파티션 개수

      컨슈머 하나에 여러개 파티션 할당 가능.

      컨슈머 개수는 파티션 갯수보다 이하여야함.

      → 유후 상태로 남을 수 있음. (불필요한 스레드)

    • 그룹간 격리

      컨슈머 그룹끼리 영향을 받지 않음.

      → 토픽의 데이터 적재나 처리에따라 컨슈머 그룹을 나누자

    • 컨슈머 그룹에 장애가 발생한다면?

      문제가 생긴 컨슈머를 제거하는 리밸런싱 됨.

      • 리밸런싱 상황
        1. 컨슈머가 추가되는 상황
        2. 컨슈머가 제외되는 상황
    • 그룹 조정자

      리밸런싱을 발동시키는 역할.

      카프카의 브로커 중 한 대가 역할을 수행.

    • 명시/비명시 오프셋 커밋

      enable.auto.commit=true를 통해서 일정 간격마다 오프셋을 커밋하도록 할 수 있음.

      비명시 오프셋 커밋은 편리하지만, 리밸런싱이나 컨슈머 종료시에 데이터 중복 또는 유실 될 수 있다

    • commitSync() & commitAsync()

      • commitSync()

        poll()메서드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋 수행

        → 동일 시간당 데이터 처리량이 줄어듬

      • commitAsync()

        비동기 커밋으로 커밋요청이 실패했을 경우, 현재 처리중인 데이터 순서를 보장하지 않음.

  • 컨슈머 주요 옵션

    얘도 동일하게 필수옵션/선택옵션의 기본값을 잘 활용하자.

    • 필수 옵션

      • bootstrap.servers
      • key.serializer
      • value.serializer
    • 선택 옵션

      • group.id

        컨슈머 그룹아이디, subscribe()로 토픽 구독하여 사용할때 필수로 넣어야함.

        default : null

      • auto.offset.reset

        컨슈머 그룹이 특정파티션을 읽을 때, 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지에 대한 옵션.

        • lastest : 가장 최근에 넣은 오프셋부터

        • earilest : 가장 낮은 오프셋부터

        • none : 커밋한 기록부터 시작하고 없으면 오류 반환.

          default : none

      • enable.auto.commit

        자동 커밋 여부

        default : true

      • auto.cmmit.interval.ms

        자동커밋일 경우, 오프셋 커밋 간격

        default : 5000(5초)

      • session.timeout.ms

        컨슈머와 브로커와 연결이 끊기는 최대시간.

        이시간안에 하트비트를 전송하지 않으면 리밸런싱.

        기본적으로 하트비트 시간의 3배로 설정.

        ⇒ 3배를 사용하는 것은 Kafka 공식 권장 설정. 일시적인 문제를 무시하면서도, 컨슈머의 다운을 빠르게 감지하는 절충안입니다. (네트워크 지연, 일시적 장애에 대한 누락 대비)

        default : 10000(10초)

      • heartbeat.interval.ms

        하트비트 전송 시간간격

        default : 3000(3초)

      • max.poll.interval.ms

        poll() 메서드를 호출하는 간격의 최대시간.

        데이터처리에 너무 많이 걸리면 비정상 판단하여 리밸런싱.

        default : 300000(5분)

      • isolation.level

        트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼경우 사용.

        • read_commited : 커밋이 완료된 레코드만 읽음.

        • read_uncommitted : 커밋 여부 관계없이 파티션의 모든레코드 읽음

          default : read_uncommitted

  • 동기 오프셋 커밋

    poll() 메서드 이후에, commitSync() 메서드를 호출하여 오프셋 커밋을 명시적 수행.

    • java - ConsumerWithSyncCommit

      가장 마지막 레코드 오프셋을 기준으로 커밋

        consumer.commitSync();
    • java - ConsumerWithSyncOffsetCommit

      개별 레코드 단위로 매번 오프셋 커밋하고 싶다

        Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>(); // 개별 레코드 단위로 매번 오프셋 커밋
      
        for (ConsumerRecord<String, String> record : records) {
            logger.info("record:{}", record);
          currentOffset.put(
          new TopicPartition(record.topic(), record.partition()),
          new OffsetAndMetadata(record.offset() + 1, null));
          consumer.commitSync(currentOffset);
         }
  • 비동기 오프셋 커밋

    커밋을 기다리기때문에, 데이터처리가 일시적 중단

    → 데이터 처리성을 위해 비동기 오프셋 사용

    • java - ConsumerWithASyncCommit

      비동기로 커밋 응답을 받기에, callBack함수로 결과를 받을 수 있음.

        consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
                            if (e != null)
                                System.err.println("Commit failed");
                            else
                                System.out.println("Commit succeeded");
                            if (e != null)
                                logger.error("Commit failed for offsets {}", offsets, e);
                        }
        });
  • 리밸런스 리스너를 가진 컨슈머

    • java - RebalanceListener

      ConsumerRebalanceListener 인터페이스를 제공하며 사용!

        public class RebalanceListener implements ConsumerRebalanceListener { // 리밸런싱을 감지하는 인터페이스
            private final static Logger logger = LoggerFactory.getLogger(RebalanceListener.class);
      
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // 리밸런싱 끝난뒤에 파티션 할당 완료되면 호출
                logger.warn("Partitions are assigned : " + partitions.toString());
      
            }
      
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 리밸런스 시작되기 직전에 호출
                logger.warn("Partitions are revoked : " + partitions.toString());
            }
        }
  • 파티션 할당 컨슈머

    • java - ConsumerWithExactPartition

      subscribe() 메서드를 이용하여 구독 형태말고, 직접파티션을 명시하여 사용할 수 있음

      assign() 사용.

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER)));
    • TopicPartition

      카프카 내/외부에서 사용되는 토픽. 파티션 정보를 담는 객체.’

    • 특징

      직접 파티션을 할당하기에 리밸런싱 과정없음.

  • 컨슈머에 할당된 파티션 확인

    assignment()로 토픽이름과 파티션번호 포함.

      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
      consumer.subscripbe(Arrays.asList(TOPIC_NAME));
      Set<TopciPartition> assginedTopciPartition = consumer.assignment();
  • 컨슈머의 안전한 종료

    컨슈머 애플리케이션이 정상적으로 종료되지않으면, 컨슈머가 세션 타임아웃이 발생할때까지 컨슈머 그룹에 남음.

    → 컨슈머랙이 늘어나 데이터 처리지연 발생

    • wakeUp() 메소드

      이 메서드가 실행된 후, poll() 메서도 호출되면 WakeupException 발생

      이 예외를 받았을 경우, 데이터 처리를 위한 자원들을 해제해야함.

      → close()로 안전하게 종료되었다는 것을 알림.

    • java - ConsumerWithSyncOffsetCommit

      셧다운되면서 wakeup() 호출.

      
        Runtime.getRuntime().addShutdownHook(new ShutdownThread());
      
        try {
            while (true) {
                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                        for (ConsumerRecord<String, String> record : records) {
                            logger.info("{}", record);
                        }
                        consumer.commitSync();
                    }
                } catch (WakeupException e) {
                    logger.warn("Wakeup consumer");
                    // 리소스 해제 처리 필요
                } finally {
                    logger.warn("Consumer close");
                    consumer.close(); // 안전하게 종료되었다 명시적
        }
      
        static class ShutdownThread extends Thread {
                public void run() {
                    logger.info("Shutdown hook");
                    consumer.wakeup();
                }
            }

3.4.3 어드민 API

  • 내부 옵션을 확인하는 방법.

    1. 직접 브로커 들어가서 확인

    2. 카프카 커맨드 라인 인터페이스로 명령을 내려 확인

    3. AdminClient 클래스 사용

      내부옵션 설정, 조회

  • 예시

    1. 구독하는 토픽의 파티션 개수만큼 스레드 생성하고 싶을 때, 어드민 API로 확인하여 가져올 수 있음.
    2. 웹 대시보드를 구현하여 클러스터 접근 권한 규칙 제어가능
    3. 토픽의 양을 감지하여 파티션을 늘리수있음!
  • 선언

    추가 설정없이 클러스터에 대한 정보만 하면 됨.

      Properties configs = new Properties();
      configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");
      AdminClient admin = AdminClient.create(configs);
    
      admin.close(); // 명시적 종료
  • 주요 메소드

    • 브로커 정보 조회
    • 토픽 리스트 조회
    • 컨슈머 그룹조회
    • 신규 토픽 생성
    • 파티션 개수 변경
    • 접근 제어 규칙 생성
  • 주의점

    어드민 API는 클러스터의 버전과 클라이언트의 버전을 맞춰서 사용해야함.

    → 어드민 API는 버전이 올라가며 자주 바뀜

3.5 카프카 스트림즈

토픽에 적재된 데이터를 상태/비상태 기반으로 실시간 변환하여 다른 토픽에 적재하는 라이브러리.

  • 비슷한 오픈소스

    아파치 스파크

    아파치 플링크

    아파치 스톰…

  • 카프카 스트림즈 사용해야하는 이유?

    1. 자바 라이브러리와 완벽호환(같이 릴리즈)
    2. 정확히 한번을 구현하는 장애 허용 시스템
  • 프로듀서와 컨슈머의 조합말고 스트림즈를 사용하는이유?

    1. 데이터처리, 장애허용등의 특징을 완벽히 구현어려움

    2. 스트림즈가 제공못하는 경우만 사용

      소스토픽과 싱크토픽의 카프카 클러스터가 다른 경우.

  • 구성

    스트림즈 애플리케이션은 내부적으로 스레드 1개이상생성.

    스레드는 1개 이상의 테스크

    테스크
    데이터 처리 최소 단위

    • 예시

      3개의 파티션으로 이루어진 토픽을 처리하려면 내부의 3개의 태스크.

    • 운영예시

      실 운영은 안정적 운영을 위해, 2개이상의 서버로 구성.

  • 카프카 스트림즈 구조 - 토폴로지

    토폴로지
    2개이상의 노드(프로세서)와 선(스트림)으로 이루어진 집합

    • 프로세서

      1. 소스 프로세서

        데이터 처리를 위해 최초 선언 노드

        하나 이상의 토픽에서 데이터 가져옴

      2. 스트림 프로세서

        다른 프로세서가 반환하는 데이터 처리

      3. 싱크 프로세서

        특정 카프카 토픽으로 저장하는 스트림즈

        최종 종착지

    • 데이터 처리 방식

      • 스트림즈DSL로 구현하는 데이터 처리 예시

        메시지 값을 기반으로 토픽 분기

        지난 10분간 들어온 데이터 개수 집계

        토픽과 다른 토픽의 결합으로 새로운 데이터 생성

      • 프로세서 API로 구현하는 데이터 처리 예시

        메시지 값의 종류에따라 토픽을 가변적 전송

        일정한 시간 간격으로 데이터 처리

3.5.1 스트림즈DSL

  • 구성

    레코드의 흐름을 추상화한 3가지 개념

    KStream, KTable, GlobalKTable로 스트림즈DSL에만 국한.

  • KStream

    레코드의 흐름 표현으로 메시지 키와 값으로 이루어짐.

    컨슈머로 토픽을 구현하는것과 동일 선상.

  • KTable

    메시지 키를 기준으로 사용.

    조회 시, 가장 최신에 추가된 레코드 출력.

  • GlobalKTable

    메시지 키를 기준으로 사용.

    KTable의 선언된 토픽은 1개 파티션이 1개의 테스크 할당

    GlobalKTable은 모든 파티션 데이터가 각 테스크에 할당되어 사용.

    • 예시

      Kstream과 KTABLE이 데이터 조인할떄.

      코파티셔닝이 되어있어야함.

      • 코파티셔닝

        조인을 하려는 2개의 데이터의 파티션 개수가 동일하고 파티셔닝 전략을 맞추는 작업.

        → 동일한 메시지 키를 가진 데이터는 동일한 태스크로 들어가는 것 보장.

      • 리파티셔닝

        코파티셔닝이 되어 있지않다면, 리파티셔닝 과정을 거침.

        새로운 토픽에 새로운 케시지 키를 가지도록 재배열 과정.

    • 주의점

      스트림즈 애플리케이션의 모든 태스크에 동일하게 사용됨

      1. 로컬 스토리징 사용량이 증가

      2. 브로커 부하 증가

        → 작은 용량의 데이터만 사용, 데이터가 많으면 리파티셔닝을 통해 KTable사용 권장.

  • 스트림즈DSL 주요 옵션

    • 필수 옵션

      • bootstrap.servers
      • application.id : 스트림즈 구분을 위한 고유 아이디, 다른 로직을 가진 스트림즈 애플리케이션은 다른 아이디를 가져야함.
    • 선택 옵션

      • default.key.serde

        레코드 메시지 키 직렬/역직렬 클래스 지정.

        default : Serdes.ByteArray().getClass().getName()

      • default.value.serde

        레코드 메시지 값 직렬/역직렬 클래스

        default : Serdes.ByteArray().getClass().getName()

      • num.stream.threads

        스트림 프로세싱 실행 시 실행될 스레드 개수

        default :1

      • state.dir

        rocksDB 저장소가 위치할 디렉토리 지정

        rockDB
        페이북이 개발한 고성능 key-value DB
        스트림즈가 상태기반 데이터 처리를 할때 로컬 저장소로 이용

        default : tmp/kafka-streams

  • 스트림즈DSL - stream(), to()

    간단하게 토픽의 메세지를 다른토픽으로 전달하는것.

    • java - SimpleStreamApplication

      stream_log > stream_log_copy로 전달하는 소스

      소스프로세서(stream())와 싱크프로세서(to())만 존재.

    • 테스트

      • 토픽생성

          bin/kafka-topics.sh --create \
          --bootstrap-server my-kafka:9092 \
          --partitions 3 \
          --topic stream_log
      • 콘솔로 stream 확인

        기본적인것은 아래로 쉘 생성해서 토픽만입력하도록..

        • console.sh

          ```java
          #!/bin/bash

          사용자에게 토픽 이름 입력받기

          read -p "Enter the topic name: " topic_name

          입력값 검증 (빈 입력 방지)

          if [ -z "$topic_name" ]; then
          echo "Topic name cannot be empty."
          exit 1
          fi

          Kafka 소비자 실행

          bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \

        • -topic "$topic_name" \

        • -property print.key=true \

        • -from-beginning

      • 똑같이 Copy됨을 확인

  • 스트림즈DSL - filter()

    필터를 추가하여 데이터 처리 후, 토픽 전달

    stream_log > stream_log_filter

    • java - StreamsFilter

      stream_log > stream_log_filter로 전달하는 소스

    • 필터된 상황만 Copy됨을 확인

  • 스트림즈DSL - KTable과 KStream을 join()

    데이터베이스에 저장하지 않고도 실시간으로 스트리밍 처리 가능.

    • 자바 - KStreamJoinKTable

    • 토픽생성

      • KTable

          bin/kafka-topics.sh --create \
          --bootstrap-server my-kafka:9092 \
          --partitions 3 \
          --topic address
      • KStream

          bin/kafka-topics.sh --create \
          --bootstrap-server my-kafka:9092 \
          --partitions 3 \
          --topic order
      • 조인될 토픽

          bin/kafka-topics.sh --create \
          --bootstrap-server my-kafka:9092 \
          --partitions 3 \
          --topic order_join
        • console

            bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
            --topic order_join \
            --property print.key=true \
            --property key.separator=":"
            --from-beginning
    • 조인 확인

  • 스트림즈DSL - GlobalKTable과 KStream을 join()

    앞선 사례는 코파티셔닝이 된 상태여서 가능.

    • java - KStreamJoinGlobalKTable

    • 조인처리

      1. 리파티셔닝 이후, 코파티셔닝 상태에서 조인
      2. KTable로 사용하는 토픽을 GlobalKTable로 선언 사용
    • 예시

      address_v2라는 토픽을 새로 생성, 파티션 2개로 테스트

      → or

        bin/kafka-topics.sh --create \
        --bootstrap-server my-kafka:9092 \
        --partitions 2 \
        --topic address_v2
    • 결과 예시

      결과물을 보면 KTable과 별반 다르지 않음처럼 보임.

      • 다른점

        KStream의 키와 값에도 매칭가능

        GlobalKTable으로 선언한 토픽은 태스크마다 저장, 조인

3.5.2 프로세서 api

스트림즈DSL처럼 토폴리지 기준으로 데이터 처리.

추가적인 상세 로직 구현을 위해 사용.

KStream, KTable, GlobalKTable 개념 없음.

  • 자바 - SimpleKafkaProcessor, FilterProcessor

    길이가 5이상이면 필터링하여 다른 토픽 저장

  • 토픽 처리

    stream_log > 길이 5이상만 > stream_log_filter

3.6 카프카 커넥트

카프카 오픈소스 툴
데이터 파이프라인 생성 시, 반복 작업을 줄이고 효율적 전송을 위한 애플리케이션
특정 작업 형태를 템플릿으로 만들어 놓음

  • 종류

    소스 커텍터 : 프로듀서 역할

    싱크 커텍터 : 컨슈머 역할

  • 오픈소스 커넥터

    jar파일로 S2, JDBC 커넥터등 제공.

  • 컨버터

    데이터를 처리하기전 스키마를 변경을 도움

    • 예시

      Json컨버터, String컨버터..

    • 트랜스폼

      데이터 처리 시, 각 메시지 단위로 변환 가능

  • 커넥트 실행하는 방법

    • 실행

      • 단일 모드 커넥트

        1개 프로세스로만, SPOF 발생

      • 분산 모드 커넥트

        2개 이상의 커넥트가 클러스터.

    • Rest API

      실행중인 커넥트 정보나, 설정값, 변경요청 가능.

      connectors api 참고

  • 단일 모드 커넥트

    • 위치

      /config/connect-standalone.properties

    • 파일

        bootstrap.servers=localhost:9092 # 커넥트와 연동할 카프카 클러스터
      
        # 카프카 저장/가져올떄 변환에 사용, 스키마형태가 싫으면 false
        key.converter=org.apache.kafka.connect.json.JsonConverter
        value.converter=org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable=true
        value.converter.schemas.enable=true
      
        # 데이터 처리 시점, 
        offset.storage.file.filename=/tmp/connect.offsets
        offset.flush.interval.ms=10000 # 오프셋 커밋 주기
      
        # 플러그인 형태로 추가할 커넥터의 디렉토리 주소
        # 콤마로 구분, 오픈소스나 직접개발한 커넥터 jar
        #plugin.path=
      
    • 파일 소스 커넥터 (connect-file-source.properties)

      기본으로 제공하며, 특정 위치에 있는 파일을 읽어서 토픽으로 데이터 저장하는 커넥터.

        name=local-file-source
        connector.class=FileStreamSource # 사용할 커넥터
        tasks.max=1 # 태스크 개수, 병렬처리
        file=test.txt # 읽을 파일 위치
        topic=connect-test # 저장할 토픽 이름
      • 실행

        파라미터로 커텍트, 커넥터 설정파일을 차례로.

          bin/connect-standalone.sh config/connect-standalone.properties \
          config/connect-file-source.properties
  • 분산 모드 커텍트

    다른 커넥트가 이슈 발생하더라도, 살아있는 나머지가 커넥터를 이어 받아서 파이프라인을 지속적 실행.

    • 위치

      /config/connect-distributed.properties

    • 파일

      
        bootstrap.servers=localhost:9092
        group.id=connect-cluster # 다수의 커텍트 프로세스들을 묶을 그룹 이름 지정
      
        key.converter=org.apache.kafka.connect.json.JsonConverter
        value.converter=org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable=true
        value.converter.schemas.enable=true
      
        # 내부 토픽 오프셋 처리 시점, 복제개수는 3이상
        offset.storage.topic=connect-offsets
        offset.storage.replication.factor=1
        config.storage.topic=connect-configs
        config.storage.replication.factor=1
        status.storage.topic=connect-status
        status.storage.replication.factor=1
      
        offset.flush.interval.ms=10000
      
        #외부 플러그인.
        #plugin.path=
      
    • 실행

      2대 이상의 분리된 서버에서 서버마다 1개의 분산 모드 커넥트 실행 필요.

        bin/connect-distributed.sh config/connect-distributed.properties
    • 커넥트 상태 확인

      상태, 생성, 조회, 수정, 중단 명령 가능

      • 사용가능한 플러그인 조회.

          curl -X GET http://localhost:8083/connector-plugins

      • 커넥터 실행

          curl -X POST -H "Content-Type: application/json" \
          --data '{
          "name": "local-file-source",
          "config":
          {
              "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
              "file": "/tmp/test.txt",
              "tasks.max": "1",
              "topic": "connect-test"
          }    
          }' \
          http://localhost:8083/connectors
      • 커넥터 조회

        FileStreamSourceConnector 실행

          curl -X GET http://localhost:8083/connectors/local-file-source/status
          {
          "name": "local-file-source",
          "connector":
          {
              "state": "RUNNING",
              "worker_id": "127.0.0.1:8083"
          },
          "tasks":
          [{
              "id": 0,
              "state": "RUNNING",
              "worker_id": "127.0.0.1:8083"
          }],
          "type": "source"
          }

3.6.1 소스 커넥터

소스 애플리케이션 or 소스 파일로부터 데이터를 가져와 토픽으로 넣는 역할

오픈소스 소스 커넥터를 사용해도 되지만, 직접 개발해야하는 경우 SourceConnector, SourceTask 클래스를 사용해서 직접 구현.

  • SourceConnector

    커넥터 설정파일을 초기화하고 어떤 태스크 클래스를 사용할지 정의.

    데이터 처리부는 없음

      public class Test extends SourceConnector {
    
          @Override
          public String version() { // 커넥터 지속적 유지보수를 위함
          }
    
          @Override
          public void start(Map<String, String> props) {
          } // json, config형태로 입력한 값 초기화
    
          @Override
          public Class<? extends Task> taskClass() {
          } // 커넥터가 사용할 태스크 클래스 지정
    
          @Override
          public List<Map<String, String>> taskConfigs(int maxTasks) {
          } //태스크 개수가 2개이상일 경우, 태스크마다 다른 옵션
    
          @Override
          public ConfigDef config() {
          } // 커넥터 사용할 설정값에 대한 정보
    
          @Override
          public void stop() {
          } // 커넥터 종료 시 필요한 로직
      }
    
  • SourceTask

    소스애플리케이션, 소스 파일 > 토픽 보내는 역할.

    자체적은 오프셋을 쓰기에 중복 전송을 방지

      public class Test extends SourceTask {
    
          @Override
          public String version() {
          // 커넥터와 동일하게가 일반
          }
    
          @Override
          public void start(Map<String, String> props) {
          // 테스크 시작시 필요한 로직, 리소스 초기화
          // JDBC 소스 커넥터라면 JDBC와 커넥션을 맺음
          }
    
          @Override
          public List<SourceRecord> poll() {
          // 소스파일로 읽으면 SourcesRecord형태로 변환해서 리턴
          }
    
          @Override
          public void stop() {
          // 종료 필요 로직, JDBC 커넥션을 맺었다면 끊는 부분
          }
      }
    
  • 파일 소스 커넥터 구현

    카프카 커넥터를 구현할 때에는, 직접 작성한 클래스와 참조 라이브러리도 함꼐 빌드하여 jar로 압축.

    → 없으면 ClassNotFound

    • java - SingleFileSourceConnector,SingleFileSourceConnectorConfig,

      SingleFileSourceTask

3.6.2 싱크 커넥터

토픽 데이터를 타깃 애플리케이션 또는 타깃 파일로 저장하는 역할

  • SinkConnector

    사용자로 받은 입력으로 초기화.

    태스크 클래스를 사용할 것인지.

      public class TestSinkConnector extends SinkConnector {
      // 기본적으로 소스 커넥터와 동일
      ...
  • SinkTask

    실제로 데이터를 처리하는 부분.

      public class SingleFileSinkTask extends SinkTask {
    
          @Override
          public String version() {
          }
    
          @Override
          public void start(Map<String, String> props) {
          }
    
          @Override
          public void put(Collection<SinkRecord> records) {
          // 저장할 데이터를 주기적으로 가져오는 부분
          }
    
          @Override
          public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
          // put으로 가져온 데이터를 저장할떄 사용하는 로직.
          // ex) JDBC 커넥터는 put()에서 insert하고 여기서 commit
          }
    
          @Override
          public void stop() {
          }
      }
  • 파일 싱크 커넥터 구현

    • java - SingleFileSinkTask, SingleFileSinkConnectorConfig, SingleFileSinkConnector

3.7 카프카 미러메이커2

서로 다른 두개의 카프카 클러스터 간에 토픽을 복제하는 애플리케이션

  • 필요성

    프로듀서와 컨슈머로도 가능하지만, 토픽의 모든것을 복제할 필요가 있을때.

  • 미러메이커1

    복제하는 토픽이 달라지면, 미러메이커를 재시작해야했음.

    정확히 한번 전달을 보장하지 못하여 데이터 유실 및 중복 가능성.

    양방향 토픽 복제 지원 못함.

  • 미러메이커2를 활용한 단반향 토픽 복제

    • 위치

      config/connect-mirror-maker.properties

    • 파일

      카프카 클러스터 A, 카프카 클러스터 B가 있을 경우를 가정.

        clusters = A, B # 토픽이 복제될때 접두사로 붙게 됨, ex) A.chick_log
      
        A.bootstrap.servers = a-kafka:9092 # 접속할 클러스터 정보
        B.bootstrap.servers = b-kafka:9092
      
        A->B.enabled = true # A->B로 복제할지, 어떤토픽으로할지
        A->B.topics = .*
      
        B->A.enabled = true 
        B->A.topics = .*
      
        replication.factor=1 # 복제되어 신규 생성된 토픽의 복제 개수
      
        ## 토픽 복제에 필요한 데이터를 저장하는 내부 토픽 복제 개수
        checkpoints.topic.replication.factor=1
        heartbeats.topic.replication.factor=1
        offset-syncs.topic.replication.factor=1
      
        offset.storage.replication.factor=1
        status.storage.replication.factor=1
        config.storage.replication.factor=1
      
        # customize as needed
        # replication.policy.separator = _
        # sync.topic.acls.enabled = false
        # emit.heartbeats.interval.seconds = 5
      
    • 실행

      • 미러링

        bin/connect-mirror-maker.sh config/connect-mirror-maker.properties
      • a-kafka 토픽생성

        bin/kafka-console-producer.sh --bootstrap-server a-kafka:9092 --topic test
      • b-kafka 콘솔

        bin/kafka-console-consumer.sh --bootstrap-server b-kafka:9092 --topic A.test --from-beginning

3.7.1 미러메이커2를 활용할 지리적 복제

카프카 클러스터 단위의 활용도 가능

단방향, 양방향 복제, ACL복제, 새 토픽 자동감지의 기능은 클러스터가 2개 이상일때 빛남.

  • 액티브-스탠바이 클러스터 운영

    안정성을 위한 가장좋은 방법은 데이터 센터를 물리적 방식으로 분리하는것.

    액티브 > 스탠바이로 복제할때에 랙이 발생하여 스탠바이에 모든정보가 복제되지 않을 수 있음

    → 스탠바이로 전환될 때, 유실,중복 일어날수 있음.

  • 액티브-액티브 클러스터 운영

    글로벌 서비스의 경우, 통신지연을 최소화를 위해 2개의 클러스터로 미러링하며 운영할 수 있음.

    또한 필요한 데이터만 저장하고 사용할 수 있음.

  • 허브 앤 스포크 클러스터 운영

    팀마다 소규머 카프카 클러스터를 이용할때, 한개의 카프카 클러스터로 모아 데이터 레이크로 사용하고 싶을때 사용.