# [Kafka Connect/Debezium] SMT 란?

 

SMT는 Single Message Transformation의 약자이다.

 

Kafka Connect에 대해서 복기해보자.

여러 데이터베이스로부터 데이터를 추출하여 Kafka topic에 넣고

Kafka topic에 있는 데이터를 다른 데이터 소스에 전달하기 위해 만들어졌다.

https://debezium.io/documentation/reference/1.7/architecture.html

 

즉, 다양한 Database로부터 메세지를 생산하여

메세지들이 Kafka topic에 저장하고

다양한 데이터 소스들에게 메시지를 소비하는 것이다.

 

근데 debezium으로 메세지를 만들어 본 사람은 알겠지만

row 한 줄 당 메세지 한 개이다.

그리고 그 메세지는 매우 많은 정보가 들어가 있으며 그만큼 크기가 크다.

예시로 PostgreSQL Debezium에서 생성된 기본 메세지 형식을 보자.

{
  "schema": {
    ...
  },
  "payload": {
    "before": null,
    "after": {
      "id": 2,
      "product": "candle",
      "price": 13000,
      "ins_timestamp": 1635084000213
    },
    "source": {
      "version": "1.3.1.Final",
      "connector": "postgresql",
      "name": "pg_test",
      "ts_ms": 1635052376362,
      "snapshot": "last",
      "db": "test",
      "schema": "public",
      "table": "t_market",
      "txId": 113669095,
      "lsn": 905751868160,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1635052376362,
    "transaction": null
  }
}

schema 부분은 너무 내용이 많아 생략했고

payload 부분을 보면 before과 after로 나뉘어져 있는 것을 볼 수 있다.

그리고 해당 소스에 대한 정보가 들어가 있다. 

row 한 줄마다 데이터의 before 정보와 소스 정보가 모두 들어간다고 보면 된다.

그리고 해당 정보에 대한 스키마도 모두 포함된다.

참고로 해당 메세지는 insert 동작으로 before는 null임을 알 수 있다.

 

그렇다면 SMT를 적용한 메세지 형식을 알아보자.

우선, SMT를 적용할 경우 다음 transform을 추가해주면 된다.
(참고로 mongodb는 다른 transform type을 사용해야 한다.)

        "transforms": "unwrap",
        "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones":false,
        "transforms.unwrap.delete.handling.mode":"rewrite",
        "transforms.unwrap.add.fields":"op,schema"

 

위의 설정들은

debezium 공식문서(https://debezium.io/documentation/reference/transformations/event-flattening.html)

에서 확인하면 되고...

이렇게 SMT로 변환하도록 추가한 뒤, 메세지 형식은 다음과 같이 변경된다.

# insert
{
  "schema": {
     ...
  },
  "payload": {
    "id": 3,
    "product": "mask",
    "price": 1500,
    "ins_timestamp": 1635086072124,
    "__op": "c",
    "__schema": "public",
    "__deleted": "false"
  }
}

# update
{
  "schema": {
    ...
  },
  "payload": {
    "id": 3,
    "product": "mask",
    "price": 1490,
    "ins_timestamp": 1635086072124,
    "__op": "u",
    "__schema": "public",
    "__deleted": "false"
  }
}

# delete
{
  "schema": {
    ...
  },
  "payload": {
    "id": 3,
    "product": null,
    "price": null,
    "ins_timestamp": null,
    "__op": "d",
    "__schema": "public",
    "__deleted": "true"
  }
}
null

메세지가 훨씬 간결해지고

before 값이 없어지고 after 값만 나오는 것을 확인할 수 있다.

그리고 소스 정보도 위의 설정에서 명시한 내용만 나오도록 변형했다.

https://www.confluent.io/blog/kafka-connect-single-message-transformation-tutorial-with-examples/

이처럼 kafka topic에 메세지를 넣기 전

transformation을 이용하여 메세지를 변형하여

필요한 정보만 kafka topic에 넣도록 하여

메세지 크기를 줄일 수 있다.

 

transform은 SMT 말고도 더 다른 기능들도 있다.

저장되고자 하는 kafka topic을 임의로 설정한다든지

특정 데이터를 필터링 한다든지

Debezium이 릴리즈 되면서

점점 다양한 기능들이 추가되고 있는것 같다.

그런 기능들을 조사 및 연구해서 사용하면

좀 더 편리하게 원하는 대로 데이터 파이프라인을 구축할 수 있을 것이다.

To be continued.........

 

 

Made by 꿩

 

'CDC > Kafka Connect' 카테고리의 다른 글

Kafka Connect 설정  (0) 2021.01.09
Kafka Connect란?  (0) 2021.01.09

# Kafka Connect 설정

 

필자가 설정하는 Kafka Connect 설정파일은 2개이다.
하나는 connect-distributed.properties 이고,
다른 하나는 connect-log4j.properties 이다.

 

connect-distributed.properties는 kafka connect 기본 설정 파일로 다음과 같다.

bootstrap.servers={kafka-ip1}:9092,{kafka-ip2}:9092,{kafka-ip3}:9092
 
producer.max.request.size=105000000
 
group.id=source-connect-cluster
 
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter.schemas.enable=true
#value.converter.schemas.enable=true

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://{schema-registry-ip1}:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://{schema-registry-ip2}:8081

offset.storage.topic=source-connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
 
config.storage.topic=source-connect-configs
config.storage.replication.factor=3
 
status.storage.topic=source-connect-status
status.storage.replication.factor=3
status.storage.partitions=5
 
offset.flush.interval.ms=10000
 
rest.host.name={kafka-connect-ip1}
rest.port=8083
 
rest.advertised.host.name={kafka-connect-ip1}
rest.advertised.port=8083
 
plugin.path=/home/kafka/plugins 

 

1. bootstrap.servers

Kafka 서버 ip와 port를 명시하면 된다.

 

2. producer.max.request.size

producer(kafka connector)가 메세지를 생산할때 최대 메세지 크기를 설정한다.

디폴트 최대 데이터 크기는 약 1MB 정도이다. 이 크기를 더 증가할 필요가 있을 경우 설정한다.

buffer memory 디폴트 최대 크기(buffer.memory)는 약 33MB이며

max.request.size를 33MB이상 설정하면 buffer memory 제한이 걸린다.

buffer memory 제한을 더 높이면 되지만 그럴 경우는 없고 있으면 문제가 있다는 것이다.

 

3. group.id

Kafka connect도 kafka처럼 클러스터를 형성할 수 있다.

Kafka connect 클러스터의 그룹을 구별하는 id로 같은 클러스터는 동일하게 설정한다.

 

4. key.converter / value.converter

메세지의 key와 value의 변환 클래스를 입력한다.

JsonConverter가 디폴트이며, AvroConverter를 사용하는 경우도 많은 것 같다.

Json보다 Avro 형식이 압축률이 더 좋다고 한다.

 

5. key.converter.schema.registry.url / value.converter.schema.registry.url

schema registry를 사용할 경우 설정한다.

 

6. offset.storage.*

Kafka Connector offset과 관련된 설정이다.

 

7. config.storage.*

Kafka Connector config와 관련된 설정이다.

 

8. status.storage.*

Kafka Connector status과 관련된 설정이다.

 

9. offset.flush.interval.ms

task들의 Kafka Connector offset을 커밋하는 interval 값이다.

 

10. rest.host.name / rest.port

REST API의 hostname과 port를 설정한다.

 

11. rest.advertised.host.name / rest.advertised.port

다른 worker들이 연결하는데 사용될 hostname과 port를 설정한다.

 

12. plugin.path

connector plugin이 들어있는 디렉토리 경로를 설정한다.

connect-log4j.properties 파일은 kafka connect에서 생성되는 로그설정에 관한 파일이다.

log4j.rootLogger=INFO, stdout, connectAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 
log4j.appender.connectAppender=org.apache.log4j.RollingFileAppender
log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log
log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.connectAppender.MaxFileSize=100MB
log4j.appender.connectAppender.MaxBackupIndex=50
log4j.appender.connectAppender.append=true
 
connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n
 
log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}
 
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.reflections=ERROR

 

1. log4j.appender.connectAppender

첫 설치시 DailyRollingFileAppender로 설정이 되어있을 것이다.

DailyRollingFileAppender는 매시간 또는 매일 마다 로그파일이 새로 생기지만

파일 최대크기와 파일 최대개수를 지정할 수 없다는 단점이 있다.

개인적으로는 DailyRollingFileAppender을 추천한다.

 

2. log4j.appender.connectAppender.File

로그 파일이 생성되는 경로와 로그파일 이름을 설정한다.

 

3. log4j.appender.connectAppender.MaxFileSize

로그 파일의 최대 크기를 설정한다.

 

4. log4j.appender.connectAppender.MaxBackupIndex

로그 파일의 최대 개수를 설정한다.

 

5. log4j.appender.connectAppender.append

kafka connect가 restart시 로그파일에 append할 것인지에 대한 설정이다.

 

5. connect.log.pattern

로그 출력시 어떤 형식으로 나올지에 대한 설정이다.

중간에 {connector.context}를 넣으면 해당 로그가 어떤 커넥터에서 나온건지 알 수 있다.

To be continued.........

 

 

 

Made by 꿩

'CDC > Kafka Connect' 카테고리의 다른 글

[Kafka Connect/Debezium] SMT 란?  (0) 2021.10.24
Kafka Connect란?  (0) 2021.01.09

# Kafka Connect란?

 

Kafka의 구성은 Producer, Consumer, broker로 되어있다.

 

Producer는 카프카 메세지를 생산하는 주체이고,
Consumer는 카프카 메세지를 소비하는 주체이며,
Broker는 카프카 메세지를 저장하는 서버이다.

 

각 Broker는 여러개 topic을 가지고 있으며 이 topic에 메세지가 저장된다.

 

Kafka Connect는 데이터소스와 Kafka를 연결해주는 매개체이다.
Kafka Connect에는 Connector를 이용하여 데이터소스와 연결한다.
다시말해서, Kafka Connector가 Producer와 Consumer 역할을 한다고 보면 된다.

 

Producer 역할을 하는 Kafka Connector를 Source Connector라 하고,
Consumer 역할을 하는 Kafka Connector를 Sink Connector라 한다.

출처: https://debezium.io/documentation/reference/1.3/architecture.html

 

위의 그림은 kafka Source Connector 오픈소스인 Debezium 공식문서에서 가져온 그림이다.
그림을 보면, MySQL과 PostgreSQL의 데이터 소스에서 데이터가 Debezium Source Connector에 의해 Kafka에 저장이 된다. 그리고 메세지들은 Sink Connector에 의해 다양한 데이터 소스로 전달할 수 있다.

 

다음은 Kafka Connector가 만들어 내는 메세지의 구조이다.

 

Message는 Key와 Value로 구성되어 있으며,
각 Key와 Value는 Schema와 Payload로 구성되어 있다.

 

여기서 Key는 PK와 같이 데이터를 식별할 수 있는 정보가 들어있고
Value는 데이터의 전체 값이 들어있다.

 

Payload는 데이터 값이 저장이 되며, 이 데이터 값의 데이터 타입이 Schema에 명시되어 있다.

 

동일한 메세지가 계속 들어오는 경우 Schema가 중복이 되어 불필요하게 데이터 용량을 잡아먹게 된다.
이를 위해 Kafka Connect는 Schema Registry를 활용한다.

출처: https://medium.com/@gaemi/kafka-%EC%99%80-confluent-schema-registry-%EB%A5%BC-%EC%82%AC%EC%9A%A9%ED%95%9C-%EC%8A%A4%ED%82%A4%EB%A7%88-%EA%B4%80%EB%A6%AC-2-bfa96622a974

 

Schema Registry는 Key와 Value에 명시된 Schema를 따로 저장하며,
Kafka Connector는 Schema 대신 Schema Registry의 Schema 번호를 명시하여 메세지 크기를 줄인다.

 

Kafka Connect는 REST API를 이용하여 Connector를 설정할 수 있다.

# kafka connect 실행
./bin/connect-distributed.sh -daemon ./config/connect-distributed.properties

# kafka connect 실행확인
curl -s "http://{ip}:8083"           
{"version":"2.4.1","commit":"c57222ae8cd7866b","kafka_cluster_id":"nZrPY3j5SKOUwqeGk68dsg"}

# 사용할 수 있는 플러그인
curl -X GET -s "http://{ip}:8083/connector-plugins"
[{"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"5.5.1"},{"class":"io.confluent.connect.jdbc.JdbcSinkConnector","type":"sink","version":"5.5.1"},{"class":"io.confluent.connect.jdbc.JdbcSourceConnector","type":"source","version":"5.5.1"},{"class":"io.confluent.connect.s3.S3SinkConnector","type":"sink","version":"5.5.1"},{"class":"io.confluent.connect.storage.tools.SchemaSourceConnector","type":"source","version":"2.4.1"},{"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.3.1.Final"},{"class":"io.debezium.connector.postgresql.PostgresConnector","type":"source","version":"1.2.1.Final"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.4.1"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.4.1"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]

# kafka connector 확인
curl -X GET -s "http://{ip}:8083/connectors"
["test-connector"]

# kafka connector 설정확인
curl -X GET -s "http://{ip}:8083/connectors/test-connector/config"

# kafka connector 상태확인
curl -X GET -s "http://{ip}:8083/connectors/test-connector/status"

# kafka connector 삭제
curl -X DELETE -s "http://{ip}:8083/connectors/test-connector"

 

Kafka Connect를 사용해보고자 한다면,

 

Source Connector 오픈소스인 Debezium을 추천한다.
Sink Connector는 Confluent사에서 개발한 Connector를 이용하자.

 

Confluent hub(https://www.confluent.io/hub/)에는 Confluent에서 개발한 다양한 Connector를 다운받을 수 있다.
각 Connector가 Source인지 Sink인지 명시가 되어 있으며 무료 라이센스일 것이다.

 

 

참고자료
debezium.io/documentation/reference/1.3/architecture.html
mongsil-jeong.tistory.com/35

 

To be continued.........

 

 

 

Made by 꿩

'CDC > Kafka Connect' 카테고리의 다른 글

[Kafka Connect/Debezium] SMT 란?  (0) 2021.10.24
Kafka Connect 설정  (0) 2021.01.09

+ Recent posts