# Kafka Connect 설정

 

필자가 설정하는 Kafka Connect 설정파일은 2개이다.
하나는 connect-distributed.properties 이고,
다른 하나는 connect-log4j.properties 이다.

 

connect-distributed.properties는 kafka connect 기본 설정 파일로 다음과 같다.

bootstrap.servers={kafka-ip1}:9092,{kafka-ip2}:9092,{kafka-ip3}:9092
 
producer.max.request.size=105000000
 
group.id=source-connect-cluster
 
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter.schemas.enable=true
#value.converter.schemas.enable=true

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://{schema-registry-ip1}:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://{schema-registry-ip2}:8081

offset.storage.topic=source-connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
 
config.storage.topic=source-connect-configs
config.storage.replication.factor=3
 
status.storage.topic=source-connect-status
status.storage.replication.factor=3
status.storage.partitions=5
 
offset.flush.interval.ms=10000
 
rest.host.name={kafka-connect-ip1}
rest.port=8083
 
rest.advertised.host.name={kafka-connect-ip1}
rest.advertised.port=8083
 
plugin.path=/home/kafka/plugins 

 

1. bootstrap.servers

Kafka 서버 ip와 port를 명시하면 된다.

 

2. producer.max.request.size

producer(kafka connector)가 메세지를 생산할때 최대 메세지 크기를 설정한다.

디폴트 최대 데이터 크기는 약 1MB 정도이다. 이 크기를 더 증가할 필요가 있을 경우 설정한다.

buffer memory 디폴트 최대 크기(buffer.memory)는 약 33MB이며

max.request.size를 33MB이상 설정하면 buffer memory 제한이 걸린다.

buffer memory 제한을 더 높이면 되지만 그럴 경우는 없고 있으면 문제가 있다는 것이다.

 

3. group.id

Kafka connect도 kafka처럼 클러스터를 형성할 수 있다.

Kafka connect 클러스터의 그룹을 구별하는 id로 같은 클러스터는 동일하게 설정한다.

 

4. key.converter / value.converter

메세지의 key와 value의 변환 클래스를 입력한다.

JsonConverter가 디폴트이며, AvroConverter를 사용하는 경우도 많은 것 같다.

Json보다 Avro 형식이 압축률이 더 좋다고 한다.

 

5. key.converter.schema.registry.url / value.converter.schema.registry.url

schema registry를 사용할 경우 설정한다.

 

6. offset.storage.*

Kafka Connector offset과 관련된 설정이다.

 

7. config.storage.*

Kafka Connector config와 관련된 설정이다.

 

8. status.storage.*

Kafka Connector status과 관련된 설정이다.

 

9. offset.flush.interval.ms

task들의 Kafka Connector offset을 커밋하는 interval 값이다.

 

10. rest.host.name / rest.port

REST API의 hostname과 port를 설정한다.

 

11. rest.advertised.host.name / rest.advertised.port

다른 worker들이 연결하는데 사용될 hostname과 port를 설정한다.

 

12. plugin.path

connector plugin이 들어있는 디렉토리 경로를 설정한다.

connect-log4j.properties 파일은 kafka connect에서 생성되는 로그설정에 관한 파일이다.

log4j.rootLogger=INFO, stdout, connectAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 
log4j.appender.connectAppender=org.apache.log4j.RollingFileAppender
log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log
log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.connectAppender.MaxFileSize=100MB
log4j.appender.connectAppender.MaxBackupIndex=50
log4j.appender.connectAppender.append=true
 
connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n
 
log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}
 
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.reflections=ERROR

 

1. log4j.appender.connectAppender

첫 설치시 DailyRollingFileAppender로 설정이 되어있을 것이다.

DailyRollingFileAppender는 매시간 또는 매일 마다 로그파일이 새로 생기지만

파일 최대크기와 파일 최대개수를 지정할 수 없다는 단점이 있다.

개인적으로는 DailyRollingFileAppender을 추천한다.

 

2. log4j.appender.connectAppender.File

로그 파일이 생성되는 경로와 로그파일 이름을 설정한다.

 

3. log4j.appender.connectAppender.MaxFileSize

로그 파일의 최대 크기를 설정한다.

 

4. log4j.appender.connectAppender.MaxBackupIndex

로그 파일의 최대 개수를 설정한다.

 

5. log4j.appender.connectAppender.append

kafka connect가 restart시 로그파일에 append할 것인지에 대한 설정이다.

 

5. connect.log.pattern

로그 출력시 어떤 형식으로 나올지에 대한 설정이다.

중간에 {connector.context}를 넣으면 해당 로그가 어떤 커넥터에서 나온건지 알 수 있다.

To be continued.........

 

 

 

Made by 꿩

'CDC > Kafka Connect' 카테고리의 다른 글

[Kafka Connect/Debezium] SMT 란?  (0) 2021.10.24
Kafka Connect란?  (0) 2021.01.09

+ Recent posts