• 并行执行
    • 设置并行度
      • 算子层次
      • 执行环境层次
      • 客户端层次
      • 系统层次
    • 设置最大并行度

    并行执行

    本节描述了在 Flink 中配置程序的并行执行。一个 Flink 程序由多个任务 task 组成(转换/算子、数据源和数据接收器)。一个 task 包括多个并行执行的实例,且每一个实例都处理 task 输入数据的一个子集。一个 task 的并行实例数被称为该 task 的 并行度 (parallelism)。

    使用 savepoints 时,应该考虑设置最大并行度。当作业从一个 savepoint 恢复时,你可以改变特定算子或着整个程序的并行度,并且此设置会限定整个程序的并行度的上限。由于在 Flink 内部将状态划分为了 key-groups,且性能所限不能无限制地增加 key-groups,因此设定最大并行度是有必要的。

    • 设置并行度
      • 算子层次
      • 执行环境层次
      • 客户端层次
      • 系统层次
    • 设置最大并行度

    设置并行度

    一个 task 的并行度可以从多个层次指定:

    算子层次

    单个算子、数据源和数据接收器的并行度可以通过调用 setParallelism()方法来指定。如下所示:

    1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    2. DataStream<String> text = [...]
    3. DataStream<Tuple2<String, Integer>> wordCounts = text
    4. .flatMap(new LineSplitter())
    5. .keyBy(0)
    6. .timeWindow(Time.seconds(5))
    7. .sum(1).setParallelism(5);
    8. wordCounts.print();
    9. env.execute("Word Count Example");
    1. val env = StreamExecutionEnvironment.getExecutionEnvironment
    2. val text = [...]
    3. val wordCounts = text
    4. .flatMap{ _.split(" ") map { (_, 1) } }
    5. .keyBy(0)
    6. .timeWindow(Time.seconds(5))
    7. .sum(1).setParallelism(5)
    8. wordCounts.print()
    9. env.execute("Word Count Example")

    执行环境层次

    如此节所描述,Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。

    可以通过调用 setParallelism() 方法指定执行环境的默认并行度。如果想以并行度3来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:

    1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    2. env.setParallelism(3);
    3. DataStream<String> text = [...]
    4. DataStream<Tuple2<String, Integer>> wordCounts = [...]
    5. wordCounts.print();
    6. env.execute("Word Count Example");
    1. val env = StreamExecutionEnvironment.getExecutionEnvironment
    2. env.setParallelism(3)
    3. val text = [...]
    4. val wordCounts = text
    5. .flatMap{ _.split(" ") map { (_, 1) } }
    6. .keyBy(0)
    7. .timeWindow(Time.seconds(5))
    8. .sum(1)
    9. wordCounts.print()
    10. env.execute("Word Count Example")

    客户端层次

    将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。

    在 CLI 客户端中,可以通过 -p 参数指定并行度,例如:

    1. ./bin/flink run -p 10 ../examples/*WordCount-java*.jar

    在 Java/Scala 程序中,可以通过如下方式指定并行度:

    1. try {
    2. PackagedProgram program = new PackagedProgram(file, args);
    3. InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    4. Configuration config = new Configuration();
    5. Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
    6. // set the parallelism to 10 here
    7. client.run(program, 10, true);
    8. } catch (ProgramInvocationException e) {
    9. e.printStackTrace();
    10. }
    1. try {
    2. PackagedProgram program = new PackagedProgram(file, args)
    3. InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
    4. Configuration config = new Configuration()
    5. Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())
    6. // set the parallelism to 10 here
    7. client.run(program, 10, true)
    8. } catch {
    9. case e: Exception => e.printStackTrace
    10. }

    系统层次

    可以通过设置 ./conf/flink-conf.yaml 文件中的 parallelism.default 参数,在系统层次来指定所有执行环境的默认并行度。你可以通过查阅配置文档获取更多细节。

    设置最大并行度

    最大并行度可以在所有设置并行度的地方进行设定(客户端和系统层次除外)。与调用 setParallelism() 方法修改并行度相似,你可以通过调用 setMaxParallelism() 方法来设定最大并行度。

    默认的最大并行度等于将 operatorParallelism + (operatorParallelism / 2) 值四舍五入到大于等于该值的一个整型值,并且这个整型值是 2 的幂次方,注意默认最大并行度下限为 128,上限为 32768

    注意 为最大并行度设置一个非常大的值将会降低性能,因为一些 state backends 需要维持内部的数据结构,而这些数据结构将会随着 key-groups 的数目而扩张(key-group 是状态重新分配的最小单元)。