• Python REPL
    • 使用
      • Table API
    • 启动
      • Local
      • Remote
      • Yarn Python Shell cluster
      • Yarn Session
    • 完整的参考

    Python REPL

    Flink附带了一个集成的交互式Python Shell。它既能够运行在本地启动的local模式,也能够运行在集群启动的cluster模式下。

    为了使用Flink的Python Shell,你只需要在Flink的binary目录下执行:

    1. bin/pyflink-shell.sh local

    关于如何在一个Cluster集群上运行Python shell,可以参考启动章节介绍。

    使用

    当前Python shell支持Table API的功能。在启动之后,Table Environment的相关内容将会被自动加载。可以通过变量”bt_env”来使用BatchTableEnvironment,通过变量”st_env”来使用StreamTableEnvironment。

    Table API

    下面是一个通过Python Shell 运行的简单示例:

    1. >>> import tempfile
    2. >>> import os
    3. >>> import shutil
    4. >>> sink_path = tempfile.gettempdir() + '/streaming.csv'
    5. >>> if os.path.exists(sink_path):
    6. ... if os.path.isfile(sink_path):
    7. ... os.remove(sink_path)
    8. ... else:
    9. ... shutil.rmtree(sink_path)
    10. >>> s_env.set_parallelism(1)
    11. >>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
    12. >>> st_env.connect(FileSystem().path(sink_path))\
    13. ... .with_format(OldCsv()
    14. ... .field_delimiter(',')
    15. ... .field("a", DataTypes.BIGINT())
    16. ... .field("b", DataTypes.STRING())
    17. ... .field("c", DataTypes.STRING()))\
    18. ... .with_schema(Schema()
    19. ... .field("a", DataTypes.BIGINT())
    20. ... .field("b", DataTypes.STRING())
    21. ... .field("c", DataTypes.STRING()))\
    22. ... .register_table_sink("stream_sink")
    23. >>> t.select("a + 1, b, c")\
    24. ... .insert_into("stream_sink")
    25. >>> st_env.execute("stream_job")
    26. >>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
    27. >>> with open(sink_path, 'r') as f:
    28. ... print(f.read())
    1. >>> import tempfile
    2. >>> import os
    3. >>> import shutil
    4. >>> sink_path = tempfile.gettempdir() + '/batch.csv'
    5. >>> if os.path.exists(sink_path):
    6. ... if os.path.isfile(sink_path):
    7. ... os.remove(sink_path)
    8. ... else:
    9. ... shutil.rmtree(sink_path)
    10. >>> b_env.set_parallelism(1)
    11. >>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
    12. >>> bt_env.connect(FileSystem().path(sink_path))\
    13. ... .with_format(OldCsv()
    14. ... .field_delimiter(',')
    15. ... .field("a", DataTypes.BIGINT())
    16. ... .field("b", DataTypes.STRING())
    17. ... .field("c", DataTypes.STRING()))\
    18. ... .with_schema(Schema()
    19. ... .field("a", DataTypes.BIGINT())
    20. ... .field("b", DataTypes.STRING())
    21. ... .field("c", DataTypes.STRING()))\
    22. ... .register_table_sink("batch_sink")
    23. >>> t.select("a + 1, b, c")\
    24. ... .insert_into("batch_sink")
    25. >>> bt_env.execute("batch_job")
    26. >>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
    27. >>> with open(sink_path, 'r') as f:
    28. ... print(f.read())

    启动

    查看Python Shell提供的可选参数,可以使用:

    1. bin/pyflink-shell.sh --help

    Local

    Python Shell运行在local模式下,只需要执行:

    1. bin/pyflink-shell.sh local

    Remote

    Python Shell运行在一个指定的JobManager上,通过关键字remote和对应的JobManager的地址和端口号来进行指定:

    1. bin/pyflink-shell.sh remote <hostname> <portnumber>

    Yarn Python Shell cluster

    Python Shell可以运行在YARN集群之上。YARN的container的数量可以通过参数-n <arg>进行指定。Python shell在Yarn上部署一个新的Flink集群,并进行连接。除了指定container数量,你也可以指定JobManager的内存,YARN应用的名字等参数。例如,在一个部署了两个TaskManager的Yarn集群上运行Python Shell:

    1. bin/pyflink-shell.sh yarn -n 2

    关于所有可选的参数,可以查看本页面底部的完整说明。

    Yarn Session

    如果你已经通过Flink Yarn Session部署了一个Flink集群,能够通过以下的命令连接到这个集群:

    1. bin/pyflink-shell.sh yarn

    完整的参考

    1. Flink Python Shell
    2. 使用: pyflink-shell.sh [local|remote|yarn] [options] <args>...
    3. 命令: local [选项]
    4. 启动一个部署在localFlink Python shell
    5. 使用:
    6. -h,--help 查看所有可选的参数
    7. 命令: remote [选项] <host> <port>
    8. 启动一个部署在remote集群的Flink Python shell
    9. <host>
    10. JobManager的主机名
    11. <port>
    12. JobManager的端口号
    13. 使用:
    14. -h,--help 查看所有可选的参数
    15. 命令: yarn [选项]
    16. 启动一个部署在Yarn集群的Flink Python Shell
    17. 使用:
    18. -h,--help 查看所有可选的参数
    19. -jm,--jobManagerMemory <arg> 具有可选单元的JobManager
    20. container的内存(默认值:MB)
    21. -n,--container <arg> 需要分配的YARN container
    22. 数量 (=TaskManager的数量)
    23. -nm,--name <arg> 自定义YARN Application的名字
    24. -qu,--queue <arg> 指定YARNqueue
    25. -s,--slots <arg> 每个TaskManagerslots的数量
    26. -tm,--taskManagerMemory <arg> 具有可选单元的每个TaskManager
    27. container的内存(默认值:MB
    28. -h | --help
    29. 打印输出使用文档