GOOD4ME
[Kafka] Kafka connector로 실시간 MSSQL 테이블 변경 내용 조회 본문
반응형
저번 글에 이어
이번엔 MSSQL(SQL Server)에서 CDC 기능과
Kafka Connect를 연동하여 Kafka topic을 통해
실시간 데이터 변경 내용을 확인하는 방법에 대해 포스팅해보겠다.
MSSQL 설정
우선 테스트할 데이터베이스, 테이블을 생성하였고
생성한 DB와 테이블에 CDC를 적용하였다.
CREATE DATABASE test
GO
CREATE TABLE debezium_test
(
ID VARCHAR(5) NOT NULL PRIMARY KEY,
EMAIL VARCHAR(30) NOT NULL,
PHONE VARCHAR(15) NOT NULL
)
GO
use test
EXEC sys.sp_cdc_enable_db
GO
EXEC sys.sp_cdc_enable_table
@source_schemaa = N'dbo',
@source_name = N'debezium_test',
@role_name = NULL,
@supports_net_changes = 1
GO
그리고 CDC가 잘 적용되었는지 확인을 했다.
이건 무조건 하는 것을 추천한다.
필자는 이 단계를 뛰어넘었다가, 테이블 CDC 적용 구문을 커밋하지도 않고
작업을 진행하면서 불쑥불쑥 튀어나오는 에러 원인을 찾느라 삽질했었다...😂😂😂
SELECT name, is_cdc_enabeld FROM sys.databases
GO
name is_cdc_enabled
----------------------- --------------
master 0
tempdb 0
model 0
msdb 0
test 1
use test
GO
EXEC sys.sp_cdc_help_change_data_capture
GO
테이블에 CDC 설정이 제대로 되었다면
EXEC sys.sp_cdc_help_change_data_capture를 커밋하여
source_table이 debezium_test인 row를 확인할 수 있을 것이다.
이후, Kafka 컨테이너에서
하단의 구문을 쉘에서 입력하여
connector를 생성하고, 생성한 connector을 확인해본다.
$ curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "sql-server-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "[sql server 컨테이너 IP 또는 호스트네임]",
"database.port": "1433",
"database.user": "SA",
"database.password": "[비밀번호]",
"database.dbname": "test",
"table.include.list": "dbo.debezium_test",
"database.server.name": "testdb",
"database.history.kafka.bootstrap.servers": "kafka1:9092",
"database.history.kafka.topic": "schema-changes.testdb.history",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": false,
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "source.name",
"transforms.unwrap.add.fields.prefix": ""
}
}'
$ curl --location --request GET 'http://localhost:8083/connectors'
["sql-server-connector"]
그리고 Kafka topic 리스트를 확인하고,
Kafka 콘솔 컨슈머를 실행시켜놓은 후,
CDC 설정을 한 테이블에 데이터를 삽입해보았다.
$ /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
connect-configs
connect-offsets
connect-status
schema-changes.testdb.history
testdb
testdb.dbo.debezium_test 👉 사용할 Kafka Topic
$ /opt/kafka/bin/kafka-console-consumer.sh --topic testdb.dbo.debezium_test --bootstrap-server localhost:9092 --from-beginning
INSERT INTO debezium_test VALUES(1, "test@def.abc", "010-0000-0001")
GO
$ /opt/kafka/bin/kafka-console-consumer.sh --topic testdb.dbo.debezium_test --bootstrap-server localhost:9092 --from-beginning
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"NAME"},{"type":"int32","optional":false,"field":"AGE"},{"type":"string","optional":true,"field":"source_name"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"testdb.dbo.debezium_test.Value"},"payload":{"ID":"20","EMAIL":"test@def.abc","PHONE":"010-0000-0001","source_name":"testdb","__deleted":"false"}}
테이블 CDC 적용을 하지 않아 발생했던 에러를 제외하면
테스트는 순조롭게 이루어졌다.😄
한 가지 아쉬웠던 점은
데이터가 삽입되면 Kafka Topic에서 바로 확인할 수 없었고
약 2~3초 정도 기다려야 확인할 수 있었다.
다음은 상기의 latency가 발생한 원인을 분석하여
퍼포먼스를 강화하는 방법이나
PySpark를 추가 연동하여
실시간 데이터 처리에 대한 글을 포스팅해보겠다.
반응형
'개발.오류.정리 > ETC.' 카테고리의 다른 글
[Grafana] Grafana CSV plugin time 설정 방법 / 경로 설정 방법 (0) | 2023.01.05 |
---|---|
[Ubuntu] ssh permission denied 오류 (0) | 2023.01.03 |
[Kafka] Kafka connector로 실시간 파일 내용 조회 (0) | 2022.12.22 |
vscode server: Downloading with wget (stuck) (0) | 2022.11.17 |
Comments