要正确配置Kafka依赖,请按照以下步骤操作:
- 添加Kafka客户端依赖
在Maven项目的pom.xml文件中,添加Kafka客户端的依赖项。将以下代码添加到
标签内:
org.apache.kafka kafka-clients 2.8.0
对于Gradle项目,在build.gradle文件的dependencies部分添加以下代码:
implementation 'org.apache.kafka:kafka-clients:2.8.0' // 请使用适合您项目的版本
- 配置Kafka属性
在项目的配置文件(如application.properties或application.yml)中,添加Kafka相关配置。以下是一些常用配置示例:
application.properties:
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=my-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
application.yml:
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
请根据您的实际需求修改这些配置,例如Kafka服务器地址、消费者组ID、序列化/反序列化器等。
- 在代码中使用Kafka
在您的项目中创建Kafka生产者(Producer)和消费者(Consumer)类,并使用上面配置的属性进行实例化。以下是一个简单的示例:
Kafka生产者示例(Producer):
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class MyKafkaProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducerproducer = new KafkaProducer<>(props); producer.send(new ProducerRecord<>("my-topic", "key", "value")); producer.close(); } }
Kafka消费者示例(Consumer):
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class MyKafkaConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my-topic")); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = https://www.yisu.com/ask/%s%n", record.offset(), record.key(), record.value()); } } } }
确保Kafka服务器正在运行,并根据需要修改生产者和消费者代码中的主题(topic)名称。现在,您已经正确配置了Kafka依赖并在项目中使用了Kafka生产者与消费者。