Connect Worker 基本配置:bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

value.converter.schemas.enable=false

plugin.path=/usr/share/java,/etc/kafka-connect/jars

rest.port=8083

S3 Sink 连接器配置(s3-sink.json):{

"name": "s3-sink",

"config": {

"connector.class": "io.confluent.connect.s3.S3SinkConnector",

"tasks.max": "2",

"topics": "orders",

"s3.bucket.name": "my-bucket",

"s3.region": "us-east-1",

"aws.access.key.id": "AKIA...",

"aws.secret.access.key": "SECRET",

"format.class": "io.confluent.connect.s3.format.json.JsonFormat",

"storage.class": "io.confluent.connect.s3.storage.S3Storage",

"flush.size": "1000",

"rotate.interval.ms": "600000",

"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",

"path.format": "yyyy/MM/dd/HH",

"timezone": "UTC",

"locale": "en",

"timestamp.extractor": "Record",

"schema.compatibility": "NONE"

}

}

通过 REST 创建并查看状态:curl -X POST -H "Content-Type: application/json" --data @s3-sink.json http://localhost:8083/connectors

curl http://localhost:8083/connectors/s3-sink/status

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论
立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部