在 Spring Boot 中使用 Apache Flink,你需要先添加 Flink 的依赖项到你的项目中。以下是一个简单的例子,展示了如何在 Spring Boot 应用程序中配置 Flink 数据源(Source)和数据接收器(Sink)。
- 首先,在你的
pom.xml
文件中添加 Flink 的依赖项:
...org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version}
- 创建一个 Flink 配置类,用于定义数据源和数据接收器:
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Properties; @Configuration public class FlinkConfiguration { @Value("${kafka.bootstrap.servers}") private String kafkaBootstrapServers; @Value("${kafka.input.topic}") private String inputTopic; @Value("${kafka.output.topic}") private String outputTopic; @Bean public StreamExecutionEnvironment streamExecutionEnvironment() { return StreamExecutionEnvironment.getExecutionEnvironment(); } @Bean public FlinkKafkaConsumerkafkaConsumer() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", kafkaBootstrapServers); properties.setProperty("group.id", "flink-spring-boot"); return new FlinkKafkaConsumer<>(inputTopic, new SimpleStringSchema(), properties); } @Bean public FlinkKafkaProducer kafkaProducer() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", kafkaBootstrapServers); return new FlinkKafkaProducer<>(outputTopic, new SimpleStringSchema(), properties); } @Bean public DataStream dataStream(StreamExecutionEnvironment env, FlinkKafkaConsumer consumer) { return env.addSource(consumer); } }
- 在你的
application.properties
文件中配置 Kafka 相关参数:
kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input-topic kafka.output.topic=output-topic
- 最后,在你的 Spring Boot 应用程序中使用 Flink 数据源和数据接收器:
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class FlinkSpringBootApplication implements CommandLineRunner { @Autowired private StreamExecutionEnvironment env; @Autowired private DataStreamdataStream; @Autowired private FlinkKafkaProducer kafkaProducer; public static void main(String[] args) { SpringApplication.run(FlinkSpringBootApplication.class, args); } @Override public void run(String... args) throws Exception { // Process the data stream as needed dataStream.map(value -> value.toUpperCase()).addSink(kafkaProducer); // Execute the Flink job env.execute("Flink Spring Boot Example"); } }
这个例子展示了如何在 Spring Boot 应用程序中配置 Flink 数据源(从 Kafka 读取数据)和数据接收器(将处理后的数据写入 Kafka)。你可以根据自己的需求修改数据处理逻辑。