应用属性与拓扑:Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "orders-app");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);

props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

props.put(StreamsConfig.producerPrefix("transaction.timeout.ms"), 60000);

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> src = builder.stream("orders");

KStream<String, Long> agg = src.groupByKey()

.count(Materialized.as("orders-count"))

.toStream();

agg.to("orders_count", Produced.with(Serdes.String(), Serdes.Long()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);

streams.start();

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论
立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部