Kafka 消费者通过 Group ID 来将来自一个主题的消息分发给多个消费者
-
使用命令行工具创建消费者组:
你可以使用
kafka-consumer-groups.sh
工具来查看已存在的消费者组,或者使用--new-consumer-group
参数创建一个新的消费者组。例如:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --create --group my-new-group
这将创建一个名为
my-new-group
的新消费者组。 -
使用 Kafka 管理工具(如 Confluent Control Center、Kafka Manager 等)创建消费者组:
这些工具通常提供了一个 Web 界面,你可以在其中管理 Kafka 集群、主题和消费者组。在这些工具中,你可以找到创建消费者组的功能,并按照提示操作即可。
-
使用编程语言(如 Java、Python、Go 等)创建消费者组:
在你的应用程序中,你需要使用 Kafka 客户端库来创建一个消费者,并指定消费者组 ID。以下是一个使用 Python 的
confluent_kafka
库创建消费者组的示例:from confluent_kafka import Consumer, KafkaError conf = { 'bootstrap.servers': 'localhost:9092', 'group.id': 'my-new-group' } consumer = Consumer(conf) consumer.subscribe(['my-topic']) try: while True: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: print(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") else: raise KafkaException(msg.error()) else: print(f"Received message: {msg.value().decode('utf-8')}") except KeyboardInterrupt: pass finally: consumer.close()
这段代码创建了一个名为
my-new-group
的消费者组,并订阅了名为my-topic
的主题。当运行此代码时,消费者将从该主题中读取消息。