在Ubuntu系统下提交Flink任务,需要遵循以下步骤:
-
安装Java环境:
Flink依赖于Java运行环境,因此首先需要确保你的系统中已经安装了Java。可以使用以下命令检查Java是否已安装:
java -version
如果没有安装Java,可以使用以下命令安装OpenJDK:
sudo apt update sudo apt install openjdk-11-jdk
-
下载并解压Flink:
从Flink官方网站(https://flink.apache.org/downloads.html)下载所需版本的Flink,然后在Ubuntu系统上解压:
wget https://downloads.apache.org/flink/flink-1.14.0/flink-1.14.0-bin-scala_2.11.tgz tar xzf flink-1.14.0-bin-scala_2.11.tgz cd flink-1.14.0
-
配置Flink:
根据实际需求,修改Flink的配置文件(位于
conf
目录下),例如flink-conf.yaml
、masters
和workers
等。 -
启动Flink集群:
在完成配置后,可以使用以下命令启动Flink集群:
bin/start-cluster.sh
你可以通过访问Web UI(默认地址为http://localhost:8081)来查看集群状态。
-
编写Flink任务:
使用Java、Scala或Python编写Flink任务。这里以Java为例,创建一个简单的WordCount任务:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream
text = env.fromElements("Hello Flink", "Hello World"); DataStream > counts = text .flatMap(new Tokenizer()) .keyBy(0) .sum(1); counts.print(); env.execute("WordCount Example"); } public static class Tokenizer implements FlatMapFunction > { @Override public void flatMap(String value, Collector > out) { String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } } } -
编译和打包Flink任务:
将编写好的Flink任务编译并打包成JAR文件。例如,如果你使用Maven或Gradle构建项目,可以使用以下命令生成JAR文件:
mvn clean package
-
提交Flink任务:
使用Flink的命令行工具提交任务到集群。假设你的任务JAR文件名为
wordcount.jar
,可以使用以下命令提交任务:bin/flink run -c com.example.WordCount wordcount.jar
其中
-c
选项指定了任务的主类名。
完成以上步骤后,Flink任务将在Ubuntu系统下的集群上运行。