# Kafka Connect란?

 

Kafka의 구성은 Producer, Consumer, broker로 되어있다.

 

Producer는 카프카 메세지를 생산하는 주체이고,
Consumer는 카프카 메세지를 소비하는 주체이며,
Broker는 카프카 메세지를 저장하는 서버이다.

 

각 Broker는 여러개 topic을 가지고 있으며 이 topic에 메세지가 저장된다.

 

Kafka Connect는 데이터소스와 Kafka를 연결해주는 매개체이다.
Kafka Connect에는 Connector를 이용하여 데이터소스와 연결한다.
다시말해서, Kafka Connector가 Producer와 Consumer 역할을 한다고 보면 된다.

 

Producer 역할을 하는 Kafka Connector를 Source Connector라 하고,
Consumer 역할을 하는 Kafka Connector를 Sink Connector라 한다.

출처: https://debezium.io/documentation/reference/1.3/architecture.html

 

위의 그림은 kafka Source Connector 오픈소스인 Debezium 공식문서에서 가져온 그림이다.
그림을 보면, MySQL과 PostgreSQL의 데이터 소스에서 데이터가 Debezium Source Connector에 의해 Kafka에 저장이 된다. 그리고 메세지들은 Sink Connector에 의해 다양한 데이터 소스로 전달할 수 있다.

 

다음은 Kafka Connector가 만들어 내는 메세지의 구조이다.

 

Message는 Key와 Value로 구성되어 있으며,
각 Key와 Value는 Schema와 Payload로 구성되어 있다.

 

여기서 Key는 PK와 같이 데이터를 식별할 수 있는 정보가 들어있고
Value는 데이터의 전체 값이 들어있다.

 

Payload는 데이터 값이 저장이 되며, 이 데이터 값의 데이터 타입이 Schema에 명시되어 있다.

 

동일한 메세지가 계속 들어오는 경우 Schema가 중복이 되어 불필요하게 데이터 용량을 잡아먹게 된다.
이를 위해 Kafka Connect는 Schema Registry를 활용한다.

출처: https://medium.com/@gaemi/kafka-%EC%99%80-confluent-schema-registry-%EB%A5%BC-%EC%82%AC%EC%9A%A9%ED%95%9C-%EC%8A%A4%ED%82%A4%EB%A7%88-%EA%B4%80%EB%A6%AC-2-bfa96622a974

 

Schema Registry는 Key와 Value에 명시된 Schema를 따로 저장하며,
Kafka Connector는 Schema 대신 Schema Registry의 Schema 번호를 명시하여 메세지 크기를 줄인다.

 

Kafka Connect는 REST API를 이용하여 Connector를 설정할 수 있다.

# kafka connect 실행
./bin/connect-distributed.sh -daemon ./config/connect-distributed.properties

# kafka connect 실행확인
curl -s "http://{ip}:8083"           
{"version":"2.4.1","commit":"c57222ae8cd7866b","kafka_cluster_id":"nZrPY3j5SKOUwqeGk68dsg"}

# 사용할 수 있는 플러그인
curl -X GET -s "http://{ip}:8083/connector-plugins"
[{"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"5.5.1"},{"class":"io.confluent.connect.jdbc.JdbcSinkConnector","type":"sink","version":"5.5.1"},{"class":"io.confluent.connect.jdbc.JdbcSourceConnector","type":"source","version":"5.5.1"},{"class":"io.confluent.connect.s3.S3SinkConnector","type":"sink","version":"5.5.1"},{"class":"io.confluent.connect.storage.tools.SchemaSourceConnector","type":"source","version":"2.4.1"},{"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.3.1.Final"},{"class":"io.debezium.connector.postgresql.PostgresConnector","type":"source","version":"1.2.1.Final"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.4.1"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.4.1"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]

# kafka connector 확인
curl -X GET -s "http://{ip}:8083/connectors"
["test-connector"]

# kafka connector 설정확인
curl -X GET -s "http://{ip}:8083/connectors/test-connector/config"

# kafka connector 상태확인
curl -X GET -s "http://{ip}:8083/connectors/test-connector/status"

# kafka connector 삭제
curl -X DELETE -s "http://{ip}:8083/connectors/test-connector"

 

Kafka Connect를 사용해보고자 한다면,

 

Source Connector 오픈소스인 Debezium을 추천한다.
Sink Connector는 Confluent사에서 개발한 Connector를 이용하자.

 

Confluent hub(https://www.confluent.io/hub/)에는 Confluent에서 개발한 다양한 Connector를 다운받을 수 있다.
각 Connector가 Source인지 Sink인지 명시가 되어 있으며 무료 라이센스일 것이다.

 

 

참고자료
debezium.io/documentation/reference/1.3/architecture.html
mongsil-jeong.tistory.com/35

 

To be continued.........

 

 

 

Made by 꿩

'CDC > Kafka Connect' 카테고리의 다른 글

[Kafka Connect/Debezium] SMT 란?  (0) 2021.10.24
Kafka Connect 설정  (0) 2021.01.09

+ Recent posts