본문 바로가기
BigData/Kafka

Kafka Connect 설치 (on K8s)

by 푸푸망나뇽 2023. 3. 18.
반응형

Kafka가 K8s환경에 설치되어있다는 가정하에 진행한다.

Kafka Connect

Kafka Connect Cluster가 있고 Kafka Connector를 등록해야 sink나 source작업이 시작된다.

1. Kafka  connector 플러그인 준비하기

우선, Connector 이미지에는 connector 플러그인(JDBC connector 등)이 설치되어있지않아 수동으로 설치해야한다.

플러그인 다운은 confulent hub에서 할수있음. https://www.confluent.io/hub/

 

Home

Confluent, founded by the original creators of Apache Kafka®, delivers a complete execution of Kafka for the Enterprise, to help you run your business in real-time.

www.confluent.io

 

Connector 플러그인을  Connect에 등록하는 두가지방법이 있다. 2번을 추천함.

  1. 설치한 plugin폴더를 도커이미지에 포함시키기 - 플러그인 설치마다 도커이미지 새로말아야하는번거로움.
    1. S3 connecto 다운
    2. JDBC connector 다운

 

# Dockerfile
# FROM quay.io/strimzi/kafka:0.27.0-kafka-3.0.0
FROM quay.io/strimzi/kafka:0.29.0-0-kafka-3.1.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./plugins/ /opt/kafka/plugins/debezium/ 
USER 1001 

2. deploy시 build 옵션으로 image말기.

 

build 로 다운받을 url과 type 적어주면 다운받아서 이미지말아줌

build pod가 따로 떠서 빌드하고 connect 클러스터가 생성됨.

 

참고: https://strimzi.io/docs/operators/latest/overview.html#configuration-points-connect_str

깃헙: https://github.com/strimzi/strimzi-kafka-operator/tree/0.30.0/examples/connect

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: hs-kafka-connect
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  replicas: 1
  bootstrapServers: azure-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
    - secretName: azure-cluster-cluster-ca-cert
      certificate: ca.crt
  config:
    group.id: hs-kafka-connect
    offset.storage.topic:  hs-kafka-connect-offsets
    config.storage.topic: hs-kafka-connect-configs
    status.storage.topic: hs-kafka-connect-status
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    config.storage.replication.factor: 2     # Kafka의 min.insync.replicas 설정 수 보다 커야함.
    offset.storage.replication.factor: 2      
    status.storage.replication.factor: 2        
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
    key.converter.schema.registry.url: kafka-schema-registry-cp-schema-registry:8081
    value.converter.schema.registry.url: kafka-schema-registry-cp-schema-registry:8081
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false

  build:
    output:
      type: docker
      # This image will last only for 24 hours and might be overwritten by other users
      # Strimzi will use this tag to push the image. But it will use the digest to pull
      # the container image to make sure it pulls exactly the image we just built. So
      # it should not happen that you pull someone else's container image. However, we
      # recommend changing this to your own container registry or using a different
      # image name for any other than demo purposes.
      image: ttl.sh/strimzi-connect-example-3.2.0:24h
    plugins:
      - name: s3-source-connector
        artifacts:
          - type: zip
            url: https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3-source/versions/2.3.0/confluentinc-kafka-connect-s3-source-2.3.0.zip
      - name: s3-sink-connector
        artifacts:
          - type: zip
            url: https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/10.1.0/confluentinc-kafka-connect-s3-10.1.0.zip
      - name: jdbc-connector
        artifacts:
          - type: zip
            url: https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.5.2/confluentinc-kafka-connect-jdbc-10.5.2.zip
              
 

2. Kafka-connect 메니페스트 작성

Kafka-connect 와 connector 두종류를 작성하여 deploy한다.

Kafka-connect.yaml 은 위에 yaml 그대로 사용

 
 

3. Kafka-connector 메니페스트 작성

connect하고싶은 대상에 대해 sink, source작업을 할수있도록 connector를 작성한다.

각 플러그인마다 설정값이 다르므로 확인하여 설정한다.

 

Mysql source Ex

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mysql-source
  namespace: kafka
  labels:
    strimzi.io/cluster: hs-kafka-connect
spec:
  class:  io.confluent.connect.jdbc.JdbcSourceConnector
  tasksMax: 1
  config:
    connection.url: "jdbc:mysql://mysql.kafka.svc.cluster.local:3306/kafka"
    connection.user: "test"
    connection.password: "test"
    mode: " incrementing"
    incrementing.column.name: "customerNumber"

mysql sink 블로그 참고: https://wecandev.tistory.com/110

config

  • auto.create: 자동 테이블 생성 (절대 하면 안됨)
  • auto.evolve : 자동 컬럼 생성
  • delete.enabled: null record를 삭제로 처리, pk.mode가 record_key 여야 한다.
  • insert.mode : upsert, insert, update
  • topics.regex : topics 대신 정규식으로 topic 받아올 수 있음

Before : logical-name.database-name.table-name
After : table-name

 

결과

mysql source로 connector배포시 table이름으로 Kafka 토픽이 생성되고

Mysql DB에 insert시 해당 row가 Kafka Topic에 저장된 모습이다.

 

 

반응형

'BigData > Kafka' 카테고리의 다른 글

Kafka Kubernetes Helm chart설치 및 테스트  (0) 2021.10.28

댓글