Kafka가 K8s환경에 설치되어있다는 가정하에 진행한다.
Kafka Connect
Kafka Connect Cluster가 있고 Kafka Connector를 등록해야 sink나 source작업이 시작된다.
1. Kafka connector 플러그인 준비하기
우선, Connector 이미지에는 connector 플러그인(JDBC connector 등)이 설치되어있지않아 수동으로 설치해야한다.
플러그인 다운은 confulent hub에서 할수있음. https://www.confluent.io/hub/
Connector 플러그인을 Connect에 등록하는 두가지방법이 있다. 2번을 추천함.
- 설치한 plugin폴더를 도커이미지에 포함시키기 - 플러그인 설치마다 도커이미지 새로말아야하는번거로움.
# Dockerfile
# FROM quay.io/strimzi/kafka:0.27.0-kafka-3.0.0
FROM quay.io/strimzi/kafka:0.29.0-0-kafka-3.1.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./plugins/ /opt/kafka/plugins/debezium/
USER 1001
2. deploy시 build 옵션으로 image말기.
build 로 다운받을 url과 type 적어주면 다운받아서 이미지말아줌
build pod가 따로 떠서 빌드하고 connect 클러스터가 생성됨.
참고: https://strimzi.io/docs/operators/latest/overview.html#configuration-points-connect_str
깃헙: https://github.com/strimzi/strimzi-kafka-operator/tree/0.30.0/examples/connect
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: hs-kafka-connect
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
replicas: 1
bootstrapServers: azure-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: azure-cluster-cluster-ca-cert
certificate: ca.crt
config:
group.id: hs-kafka-connect
offset.storage.topic: hs-kafka-connect-offsets
config.storage.topic: hs-kafka-connect-configs
status.storage.topic: hs-kafka-connect-status
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
config.storage.replication.factor: 2 # Kafka의 min.insync.replicas 설정 수 보다 커야함.
offset.storage.replication.factor: 2
status.storage.replication.factor: 2
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
key.converter.schema.registry.url: kafka-schema-registry-cp-schema-registry:8081
value.converter.schema.registry.url: kafka-schema-registry-cp-schema-registry:8081
key.converter.schemas.enable: false
value.converter.schemas.enable: false
build:
output:
type: docker
# This image will last only for 24 hours and might be overwritten by other users
# Strimzi will use this tag to push the image. But it will use the digest to pull
# the container image to make sure it pulls exactly the image we just built. So
# it should not happen that you pull someone else's container image. However, we
# recommend changing this to your own container registry or using a different
# image name for any other than demo purposes.
image: ttl.sh/strimzi-connect-example-3.2.0:24h
plugins:
- name: s3-source-connector
artifacts:
- type: zip
url: https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3-source/versions/2.3.0/confluentinc-kafka-connect-s3-source-2.3.0.zip
- name: s3-sink-connector
artifacts:
- type: zip
url: https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/10.1.0/confluentinc-kafka-connect-s3-10.1.0.zip
- name: jdbc-connector
artifacts:
- type: zip
url: https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.5.2/confluentinc-kafka-connect-jdbc-10.5.2.zip
2. Kafka-connect 메니페스트 작성
Kafka-connect 와 connector 두종류를 작성하여 deploy한다.
Kafka-connect.yaml 은 위에 yaml 그대로 사용
- spec.config 란에는 아래 설정을 참고하여 설정할수있다.
- https://docs.confluent.io/platform/current/connect/references/allconfigs.html#override-the-worker-configuration
3. Kafka-connector 메니페스트 작성
connect하고싶은 대상에 대해 sink, source작업을 할수있도록 connector를 작성한다.
각 플러그인마다 설정값이 다르므로 확인하여 설정한다.
Mysql source Ex
source config: https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html
sink config: https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/sink_config_options.html
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mysql-source
namespace: kafka
labels:
strimzi.io/cluster: hs-kafka-connect
spec:
class: io.confluent.connect.jdbc.JdbcSourceConnector
tasksMax: 1
config:
connection.url: "jdbc:mysql://mysql.kafka.svc.cluster.local:3306/kafka"
connection.user: "test"
connection.password: "test"
mode: " incrementing"
incrementing.column.name: "customerNumber"
mysql sink 블로그 참고: https://wecandev.tistory.com/110
config
- auto.create: 자동 테이블 생성 (절대 하면 안됨)
- auto.evolve : 자동 컬럼 생성
- delete.enabled: null record를 삭제로 처리, pk.mode가 record_key 여야 한다.
- insert.mode : upsert, insert, update
- topics.regex : topics 대신 정규식으로 topic 받아올 수 있음
토픽 이름 RegrexRouter
https://docs.confluent.io/platform/current/connect/transforms/regexrouter.html
https://debezium.io/blog/2017/09/25/streaming-to-another-database/
Before : logical-name.database-name.table-name
After : table-name
결과
mysql source로 connector배포시 table이름으로 Kafka 토픽이 생성되고
Mysql DB에 insert시 해당 row가 Kafka Topic에 저장된 모습이다.
'BigData > Kafka' 카테고리의 다른 글
Kafka Kubernetes Helm chart설치 및 테스트 (0) | 2021.10.28 |
---|
댓글