• Connect to External Systems
    • Dependencies
      • Connectors
      • Formats
    • Overview
    • Table Schema
      • Rowtime Attributes
      • Type Strings
    • Update Modes
    • Table Connectors
      • File System Connector
      • Kafka Connector
      • Elasticsearch Connector
    • Table Formats
      • CSV Format
      • JSON Format
      • Apache Avro Format
      • Old CSV Format
    • Further TableSources and TableSinks
      • OrcTableSource
      • CsvTableSink
      • JDBCAppendTableSink
      • CassandraAppendTableSink

    Connect to External Systems

    Flink’s Table API & SQL programs can be connected to other external systems for reading and writing both batch and streaming tables. A table source provides access to data which is stored in external systems (such as a database, key-value store, message queue, or file system). A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, Parquet, or ORC.

    This page describes how to declare built-in table sources and/or table sinks and register them in Flink. After a source or sink has been registered, it can be accessed by Table API & SQL statements.

    Attention If you want to implement your own custom table source or sink, have a look at the user-defined sources & sinks page.

    • Dependencies
      • Connectors
      • Formats
    • Overview
    • Table Schema
      • Rowtime Attributes
      • Type Strings
    • Update Modes
    • Table Connectors
      • File System Connector
      • Kafka Connector
      • Elasticsearch Connector
    • Table Formats
      • CSV Format
      • JSON Format
      • Apache Avro Format
      • Old CSV Format
    • Further TableSources and TableSinks
      • OrcTableSource
      • CsvTableSink
      • JDBCAppendTableSink
      • CassandraAppendTableSink

    Dependencies

    The following tables list all available connectors and formats. Their mutual compatibility is tagged in the corresponding sections for table connectors and table formats. The following tables provide dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

    Connectors

    NameVersionMaven dependencySQL Client JAR
    Filesystem Built-inBuilt-in
    Elasticsearch6flink-connector-elasticsearch6Download
    Apache Kafka0.8flink-connector-kafka-0.8Not available
    Apache Kafka0.9flink-connector-kafka-0.9Download
    Apache Kafka0.10flink-connector-kafka-0.10Download
    Apache Kafka0.11flink-connector-kafka-0.11Download
    Apache Kafka0.11+ (universal)flink-connector-kafkaDownload

    Formats

    NameMaven dependencySQL Client JAR
    Old CSV (for files)Built-inBuilt-in
    CSV (for Kafka)flink-csvDownload
    JSONflink-jsonDownload
    Apache Avroflink-avroDownload

    Overview

    Beginning from Flink 1.6, the declaration of a connection to an external system is separated from the actual implementation.

    Connections can be specified either

    • programmatically using a Descriptor under org.apache.flink.table.descriptors for Table & SQL API
    • or declaratively via YAML configuration files for the SQL Client.

    This allows not only for better unification of APIs and SQL Client but also for better extensibility in case of custom implementations without changing the actual declaration.

    Every declaration is similar to a SQL CREATE TABLE statement. One can define the name of the table, the schema of the table, a connector, and a data format upfront for connecting to an external system.

    The connector describes the external system that stores the data of a table. Storage systems such as Apacha Kafka or a regular file system can be declared here. The connector might already provide a fixed format with fields and schema.

    Some systems support different data formats. For example, a table that is stored in Kafka or in files can encode its rows with CSV, JSON, or Avro. A database connector might need the table schema here. Whether or not a storage system requires the definition of a format, is documented for every connector. Different systems also require different types of formats (e.g., column-oriented formats vs. row-oriented formats). The documentation states which format types and connectors are compatible.

    The table schema defines the schema of a table that is exposed to SQL queries. It describes how a source maps the data format to the table schema and a sink vice versa. The schema has access to fields defined by the connector or format. It can use one or more fields for extracting or inserting time attributes. If input fields have no deterministic field order, the schema clearly defines column names, their order, and origin.

    The subsequent sections will cover each definition part (connector, format, and schema) in more detail. The following example shows how to pass them:

    1. tableEnvironment
    2. .connect(...)
    3. .withFormat(...)
    4. .withSchema(...)
    5. .inAppendMode()
    6. .registerTableSource("MyTable")
    1. table_environment \
    2. .connect(...) \
    3. .with_format(...) \
    4. .with_schema(...) \
    5. .in_append_mode() \
    6. .register_table_source("MyTable")
    1. name: MyTable
    2. type: source
    3. update-mode: append
    4. connector: ...
    5. format: ...
    6. schema: ...
    1. tableEnvironment.sqlUpdate("CREATE TABLE MyTable (...) WITH (...)")

    The table’s type (source, sink, or both) determines how a table is registered. In case of table type both, both a table source and table sink are registered under the same name. Logically, this means that we can both read and write to such a table similarly to a table in a regular DBMS.

    For streaming queries, an update mode declares how to communicate between a dynamic table and the storage system for continuous queries.

    The following code shows a full example of how to connect to Kafka for reading Avro records.

    1. tableEnvironment
    2. // declare the external system to connect to
    3. .connect(
    4. new Kafka()
    5. .version("0.10")
    6. .topic("test-input")
    7. .startFromEarliest()
    8. .property("zookeeper.connect", "localhost:2181")
    9. .property("bootstrap.servers", "localhost:9092")
    10. )
    11. // declare a format for this system
    12. .withFormat(
    13. new Avro()
    14. .avroSchema(
    15. "{" +
    16. " \"namespace\": \"org.myorganization\"," +
    17. " \"type\": \"record\"," +
    18. " \"name\": \"UserMessage\"," +
    19. " \"fields\": [" +
    20. " {\"name\": \"timestamp\", \"type\": \"string\"}," +
    21. " {\"name\": \"user\", \"type\": \"long\"}," +
    22. " {\"name\": \"message\", \"type\": [\"string\", \"null\"]}" +
    23. " ]" +
    24. "}"
    25. )
    26. )
    27. // declare the schema of the table
    28. .withSchema(
    29. new Schema()
    30. .field("rowtime", Types.SQL_TIMESTAMP)
    31. .rowtime(new Rowtime()
    32. .timestampsFromField("timestamp")
    33. .watermarksPeriodicBounded(60000)
    34. )
    35. .field("user", Types.LONG)
    36. .field("message", Types.STRING)
    37. )
    38. // specify the update-mode for streaming tables
    39. .inAppendMode()
    40. // register as source, sink, or both and under a name
    41. .registerTableSource("MyUserTable");
    1. table_environment \
    2. .connect( # declare the external system to connect to
    3. Kafka()
    4. .version("0.10")
    5. .topic("test-input")
    6. .start_from_earliest()
    7. .property("zookeeper.connect", "localhost:2181")
    8. .property("bootstrap.servers", "localhost:9092")
    9. ) \
    10. .with_format( # declare a format for this system
    11. Avro()
    12. .avro_schema(
    13. "{"
    14. " \"namespace\": \"org.myorganization\","
    15. " \"type\": \"record\","
    16. " \"name\": \"UserMessage\","
    17. " \"fields\": ["
    18. " {\"name\": \"timestamp\", \"type\": \"string\"},"
    19. " {\"name\": \"user\", \"type\": \"long\"},"
    20. " {\"name\": \"message\", \"type\": [\"string\", \"null\"]}"
    21. " ]"
    22. "}"
    23. )
    24. ) \
    25. .with_schema( # declare the schema of the table
    26. Schema()
    27. .field("rowtime", DataTypes.TIMESTAMP())
    28. .rowtime(
    29. Rowtime()
    30. .timestamps_from_field("timestamp")
    31. .watermarks_periodic_bounded(60000)
    32. )
    33. .field("user", DataTypes.BIGINT())
    34. .field("message", DataTypes.STRING())
    35. ) \
    36. .in_append_mode() \
    37. .register_table_source("MyUserTable")
    38. # specify the update-mode for streaming tables and
    39. # register as source, sink, or both and under a name
    1. tables:
    2. - name: MyUserTable # name the new table
    3. type: source # declare if the table should be "source", "sink", or "both"
    4. update-mode: append # specify the update-mode for streaming tables
    5. # declare the external system to connect to
    6. connector:
    7. type: kafka
    8. version: "0.10"
    9. topic: test-input
    10. startup-mode: earliest-offset
    11. properties:
    12. - key: zookeeper.connect
    13. value: localhost:2181
    14. - key: bootstrap.servers
    15. value: localhost:9092
    16. # declare a format for this system
    17. format:
    18. type: avro
    19. avro-schema: >
    20. {
    21. "namespace": "org.myorganization",
    22. "type": "record",
    23. "name": "UserMessage",
    24. "fields": [
    25. {"name": "ts", "type": "string"},
    26. {"name": "user", "type": "long"},
    27. {"name": "message", "type": ["string", "null"]}
    28. ]
    29. }
    30. # declare the schema of the table
    31. schema:
    32. - name: rowtime
    33. type: TIMESTAMP
    34. rowtime:
    35. timestamps:
    36. type: from-field
    37. from: ts
    38. watermarks:
    39. type: periodic-bounded
    40. delay: "60000"
    41. - name: user
    42. type: BIGINT
    43. - name: message
    44. type: VARCHAR
    1. CREATE TABLE MyUserTable (
    2. `user` BIGINT,
    3. message VARCHAR,
    4. ts VARCHAR
    5. ) WITH (
    6. -- declare the external system to connect to
    7. 'connector.type' = 'kafka',
    8. 'connector.version' = '0.10',
    9. 'connector.topic' = 'topic_name',
    10. 'connector.startup-mode' = 'earliest-offset',
    11. 'connector.properties.0.key' = 'zookeeper.connect',
    12. 'connector.properties.0.value' = 'localhost:2181',
    13. 'connector.properties.1.key' = 'bootstrap.servers',
    14. 'connector.properties.1.value' = 'localhost:9092',
    15. 'update-mode' = 'append',
    16. -- declare a format for this system
    17. 'format.type' = 'avro',
    18. 'format.avro-schema' = '{
    19. "namespace": "org.myorganization",
    20. "type": "record",
    21. "name": "UserMessage",
    22. "fields": [
    23. {"name": "ts", "type": "string"},
    24. {"name": "user", "type": "long"},
    25. {"name": "message", "type": ["string", "null"]}
    26. ]
    27. }'
    28. )

    In both ways the desired connection properties are converted into normalized, string-based key-value pairs. So-called table factories create configured table sources, table sinks, and corresponding formats from the key-value pairs. All table factories that can be found via Java’s Service Provider Interfaces (SPI) are taken into account when searching for exactly-one matching table factory.

    If no factory can be found or multiple factories match for the given properties, an exception will be thrown with additional information about considered factories and supported properties.

    Table Schema

    The table schema defines the names and types of columns similar to the column definitions of a SQL CREATE TABLE statement. In addition, one can specify how columns are mapped from and to fields of the format in which the table data is encoded. The origin of a field might be important if the name of the column should differ from the input/output format. For instance, a column user_name should reference the field $$-user-name from a JSON format. Additionally, the schema is needed to map types from an external system to Flink’s representation. In case of a table sink, it ensures that only data with valid schema is written to an external system.

    The following example shows a simple schema without time attributes and one-to-one field mapping of input/output to table columns.

    1. .withSchema(
    2. new Schema()
    3. .field("MyField1", Types.INT) // required: specify the fields of the table (in this order)
    4. .field("MyField2", Types.STRING)
    5. .field("MyField3", Types.BOOLEAN)
    6. )
    1. .with_schema(
    2. Schema()
    3. .field("MyField1", DataTypes.INT()) # required: specify the fields of the table (in this order)
    4. .field("MyField2", DataTypes.STRING())
    5. .field("MyField3", DataTypes.BOOLEAN())
    6. )
    1. schema:
    2. - name: MyField1 # required: specify the fields of the table (in this order)
    3. type: INT
    4. - name: MyField2
    5. type: VARCHAR
    6. - name: MyField3
    7. type: BOOLEAN

    For each field, the following properties can be declared in addition to the column’s name and type:

    1. .withSchema(
    2. new Schema()
    3. .field("MyField1", Types.SQL_TIMESTAMP)
    4. .proctime() // optional: declares this field as a processing-time attribute
    5. .field("MyField2", Types.SQL_TIMESTAMP)
    6. .rowtime(...) // optional: declares this field as a event-time attribute
    7. .field("MyField3", Types.BOOLEAN)
    8. .from("mf3") // optional: original field in the input that is referenced/aliased by this field
    9. )
    1. .with_schema(
    2. Schema()
    3. .field("MyField1", DataTypes.TIMESTAMP())
    4. .proctime() # optional: declares this field as a processing-time attribute
    5. .field("MyField2", DataTypes.TIMESTAMP())
    6. .rowtime(...) # optional: declares this field as a event-time attribute
    7. .field("MyField3", DataTypes.BOOLEAN())
    8. .from_origin_field("mf3") # optional: original field in the input that is referenced/aliased by this field
    9. )
    1. schema:
    2. - name: MyField1
    3. type: TIMESTAMP
    4. proctime: true # optional: boolean flag whether this field should be a processing-time attribute
    5. - name: MyField2
    6. type: TIMESTAMP
    7. rowtime: ... # optional: wether this field should be a event-time attribute
    8. - name: MyField3
    9. type: BOOLEAN
    10. from: mf3 # optional: original field in the input that is referenced/aliased by this field

    Time attributes are essential when working with unbounded streaming tables. Therefore both processing-time and event-time (also known as “rowtime”) attributes can be defined as part of the schema.

    For more information about time handling in Flink and especially event-time, we recommend the general event-time section.

    Rowtime Attributes

    In order to control the event-time behavior for tables, Flink provides predefined timestamp extractors and watermark strategies.

    The following timestamp extractors are supported:

    1. // Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute.
    2. .rowtime(
    3. new Rowtime()
    4. .timestampsFromField("ts_field") // required: original field name in the input
    5. )
    6. // Converts the assigned timestamps from a DataStream API record into the rowtime attribute
    7. // and thus preserves the assigned timestamps from the source.
    8. // This requires a source that assigns timestamps (e.g., Kafka 0.10+).
    9. .rowtime(
    10. new Rowtime()
    11. .timestampsFromSource()
    12. )
    13. // Sets a custom timestamp extractor to be used for the rowtime attribute.
    14. // The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`.
    15. .rowtime(
    16. new Rowtime()
    17. .timestampsFromExtractor(...)
    18. )
    1. # Converts an existing BIGINT or TIMESTAMP field in the input into the rowtime attribute.
    2. .rowtime(
    3. Rowtime()
    4. .timestamps_from_field("ts_field") # required: original field name in the input
    5. )
    6. # Converts the assigned timestamps into the rowtime attribute
    7. # and thus preserves the assigned timestamps from the source.
    8. # This requires a source that assigns timestamps (e.g., Kafka 0.10+).
    9. .rowtime(
    10. Rowtime()
    11. .timestamps_from_source()
    12. )
    13. # Sets a custom timestamp extractor to be used for the rowtime attribute.
    14. # The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`.
    15. # Due to python can not accept java object, so it requires a full-qualified class name of the extractor.
    16. .rowtime(
    17. Rowtime()
    18. .timestamps_from_extractor(...)
    19. )
    1. # Converts an existing BIGINT or TIMESTAMP field in the input into the rowtime attribute.
    2. rowtime:
    3. timestamps:
    4. type: from-field
    5. from: "ts_field" # required: original field name in the input
    6. # Converts the assigned timestamps from a DataStream API record into the rowtime attribute
    7. # and thus preserves the assigned timestamps from the source.
    8. rowtime:
    9. timestamps:
    10. type: from-source

    The following watermark strategies are supported:

    1. // Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
    2. // observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
    3. // are not late.
    4. .rowtime(
    5. new Rowtime()
    6. .watermarksPeriodicAscending()
    7. )
    8. // Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
    9. // Emits watermarks which are the maximum observed timestamp minus the specified delay.
    10. .rowtime(
    11. new Rowtime()
    12. .watermarksPeriodicBounded(2000) // delay in milliseconds
    13. )
    14. // Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
    15. // underlying DataStream API and thus preserves the assigned watermarks from the source.
    16. .rowtime(
    17. new Rowtime()
    18. .watermarksFromSource()
    19. )
    1. # Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
    2. # observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
    3. # are not late.
    4. .rowtime(
    5. Rowtime()
    6. .watermarks_periodic_ascending()
    7. )
    8. # Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
    9. # Emits watermarks which are the maximum observed timestamp minus the specified delay.
    10. .rowtime(
    11. Rowtime()
    12. .watermarks_periodic_bounded(2000) # delay in milliseconds
    13. )
    14. # Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
    15. # underlying DataStream API and thus preserves the assigned watermarks from the source.
    16. .rowtime(
    17. Rowtime()
    18. .watermarks_from_source()
    19. )
    1. # Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
    2. # observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
    3. # are not late.
    4. rowtime:
    5. watermarks:
    6. type: periodic-ascending
    7. # Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
    8. # Emits watermarks which are the maximum observed timestamp minus the specified delay.
    9. rowtime:
    10. watermarks:
    11. type: periodic-bounded
    12. delay: ... # required: delay in milliseconds
    13. # Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
    14. # underlying DataStream API and thus preserves the assigned watermarks from the source.
    15. rowtime:
    16. watermarks:
    17. type: from-source

    Make sure to always declare both timestamps and watermarks. Watermarks are required for triggering time-based operations.

    Type Strings

    Because type information is only available in a programming language, the following type strings are supported for being defined in a YAML file:

    1. VARCHAR
    2. BOOLEAN
    3. TINYINT
    4. SMALLINT
    5. INT
    6. BIGINT
    7. FLOAT
    8. DOUBLE
    9. DECIMAL
    10. DATE
    11. TIME
    12. TIMESTAMP
    13. MAP<fieldtype, fieldtype> # generic map; e.g. MAP<VARCHAR, INT> that is mapped to Flink's MapTypeInfo
    14. MULTISET<fieldtype> # multiset; e.g. MULTISET<VARCHAR> that is mapped to Flink's MultisetTypeInfo
    15. PRIMITIVE_ARRAY<fieldtype> # primitive array; e.g. PRIMITIVE_ARRAY<INT> that is mapped to Flink's PrimitiveArrayTypeInfo
    16. OBJECT_ARRAY<fieldtype> # object array; e.g. OBJECT_ARRAY<POJO(org.mycompany.MyPojoClass)> that is mapped to
    17. # Flink's ObjectArrayTypeInfo
    18. ROW<fieldtype, ...> # unnamed row; e.g. ROW<VARCHAR, INT> that is mapped to Flink's RowTypeInfo
    19. # with indexed fields names f0, f1, ...
    20. ROW<fieldname fieldtype, ...> # named row; e.g., ROW<myField VARCHAR, myOtherField INT> that
    21. # is mapped to Flink's RowTypeInfo
    22. POJO<class> # e.g., POJO<org.mycompany.MyPojoClass> that is mapped to Flink's PojoTypeInfo
    23. ANY<class> # e.g., ANY<org.mycompany.MyClass> that is mapped to Flink's GenericTypeInfo
    24. ANY<class, serialized> # used for type information that is not supported by Flink's Table & SQL API

    Update Modes

    For streaming queries, it is required to declare how to perform the conversion between a dynamic table and an external connector. The update mode specifies which kind of messages should be exchanged with the external system:

    Append Mode: In append mode, a dynamic table and an external connector only exchange INSERT messages.

    Retract Mode: In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages. An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for the updating (new) row. In this mode, a key must not be defined as opposed to upsert mode. However, every update consists of two messages which is less efficient.

    Upsert Mode: In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages. This mode requires a (possibly composite) unique key by which updates can be propagated. The external connector needs to be aware of the unique key attribute in order to apply messages correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as DELETE messages. The main difference to a retract stream is that UPDATE changes are encoded with a single message and are therefore more efficient.

    Attention The documentation of each connector states which update modes are supported.

    1. .connect(...)
    2. .inAppendMode() // otherwise: inUpsertMode() or inRetractMode()
    1. .connect(...) \
    2. .in_append_mode() # otherwise: in_upsert_mode() or in_retract_mode()
    1. tables:
    2. - name: ...
    3. update-mode: append # otherwise: "retract" or "upsert"
    1. CREATE TABLE MyTable (
    2. ...
    3. ) WITH (
    4. 'update-mode' = 'append' -- otherwise: 'retract' or 'upsert'
    5. )

    See also the general streaming concepts documentation for more information.

    Table Connectors

    Flink provides a set of connectors for connecting to external systems.

    Please note that not all connectors are available in both batch and streaming yet. Furthermore, not every streaming connector supports every streaming mode. Therefore, each connector is tagged accordingly. A format tag indicates that the connector requires a certain type of format.

    File System Connector

    Source: BatchSource: Streaming Append ModeSink: BatchSink: Streaming Append ModeFormat: CSV-only

    The file system connector allows for reading and writing from a local or distributed filesystem. A filesystem can be defined as:

    1. .connect(
    2. new FileSystem()
    3. .path("file:///path/to/whatever") // required: path to a file or directory
    4. )
    1. .connect(
    2. FileSystem()
    3. .path("file:///path/to/whatever") # required: path to a file or directory
    4. )
    1. connector:
    2. type: filesystem
    3. path: "file:///path/to/whatever" # required: path to a file or directory
    1. CREATE TABLE MyUserTable (
    2. ...
    3. ) WITH (
    4. 'connector.type' = 'filesystem', -- required: specify to connector type
    5. 'connector.path' = 'file:///path/to/whatever' -- required: path to a file or directory
    6. )

    The file system connector itself is included in Flink and does not require an additional dependency. A corresponding format needs to be specified for reading and writing rows from and to a file system.

    Attention Make sure to include Flink File System specific dependencies.

    Attention File system sources and sinks for streaming are only experimental. In the future, we will support actual streaming use cases, i.e., directory monitoring and bucket output.

    Kafka Connector

    Source: Streaming Append ModeSink: Streaming Append ModeFormat: Serialization SchemaFormat: Deserialization Schema

    The Kafka connector allows for reading and writing from and to an Apache Kafka topic. It can be defined as follows:

    1. .connect(
    2. new Kafka()
    3. .version("0.11") // required: valid connector versions are
    4. // "0.8", "0.9", "0.10", "0.11", and "universal"
    5. .topic("...") // required: topic name from which the table is read
    6. // optional: connector specific properties
    7. .property("zookeeper.connect", "localhost:2181")
    8. .property("bootstrap.servers", "localhost:9092")
    9. .property("group.id", "testGroup")
    10. // optional: select a startup mode for Kafka offsets
    11. .startFromEarliest()
    12. .startFromLatest()
    13. .startFromSpecificOffsets(...)
    14. // optional: output partitioning from Flink's partitions into Kafka's partitions
    15. .sinkPartitionerFixed() // each Flink partition ends up in at-most one Kafka partition (default)
    16. .sinkPartitionerRoundRobin() // a Flink partition is distributed to Kafka partitions round-robin
    17. .sinkPartitionerCustom(MyCustom.class) // use a custom FlinkKafkaPartitioner subclass
    18. )
    1. .connect(
    2. Kafka()
    3. .version("0.11") # required: valid connector versions are
    4. # "0.8", "0.9", "0.10", "0.11", and "universal"
    5. .topic("...") # required: topic name from which the table is read
    6. # optional: connector specific properties
    7. .property("zookeeper.connect", "localhost:2181")
    8. .property("bootstrap.servers", "localhost:9092")
    9. .property("group.id", "testGroup")
    10. # optional: select a startup mode for Kafka offsets
    11. .start_from_earliest()
    12. .start_from_latest()
    13. .start_from_specific_offsets(...)
    14. # optional: output partitioning from Flink's partitions into Kafka's partitions
    15. .sink_partitioner_fixed() # each Flink partition ends up in at-most one Kafka partition (default)
    16. .sink_partitioner_round_robin() # a Flink partition is distributed to Kafka partitions round-robin
    17. .sink_partitioner_custom("full.qualified.custom.class.name") # use a custom FlinkKafkaPartitioner subclass
    18. )
    1. connector:
    2. type: kafka
    3. version: "0.11" # required: valid connector versions are
    4. # "0.8", "0.9", "0.10", "0.11", and "universal"
    5. topic: ... # required: topic name from which the table is read
    6. properties: # optional: connector specific properties
    7. - key: zookeeper.connect
    8. value: localhost:2181
    9. - key: bootstrap.servers
    10. value: localhost:9092
    11. - key: group.id
    12. value: testGroup
    13. startup-mode: ... # optional: valid modes are "earliest-offset", "latest-offset",
    14. # "group-offsets", or "specific-offsets"
    15. specific-offsets: # optional: used in case of startup mode with specific offsets
    16. - partition: 0
    17. offset: 42
    18. - partition: 1
    19. offset: 300
    20. sink-partitioner: ... # optional: output partitioning from Flink's partitions into Kafka's partitions
    21. # valid are "fixed" (each Flink partition ends up in at most one Kafka partition),
    22. # "round-robin" (a Flink partition is distributed to Kafka partitions round-robin)
    23. # "custom" (use a custom FlinkKafkaPartitioner subclass)
    24. sink-partitioner-class: org.mycompany.MyPartitioner # optional: used in case of sink partitioner custom
    1. CREATE TABLE MyUserTable (
    2. ...
    3. ) WITH (
    4. 'connector.type' = 'kafka',
    5. 'connector.version' = '0.11', -- required: valid connector versions are
    6. -- "0.8", "0.9", "0.10", "0.11", and "universal"
    7. 'connector.topic' = 'topic_name', -- required: topic name from which the table is read
    8. 'update-mode' = 'append', -- required: update mode when used as table sink,
    9. -- only support append mode now.
    10. 'connector.properties.0.key' = 'zookeeper.connect', -- optional: connector specific properties
    11. 'connector.properties.0.value' = 'localhost:2181',
    12. 'connector.properties.1.key' = 'bootstrap.servers',
    13. 'connector.properties.1.value' = 'localhost:9092',
    14. 'connector.properties.2.key' = 'group.id',
    15. 'connector.properties.2.value' = 'testGroup',
    16. 'connector.startup-mode' = 'earliest-offset', -- optional: valid modes are "earliest-offset",
    17. -- "latest-offset", "group-offsets",
    18. -- or "specific-offsets"
    19. -- optional: used in case of startup mode with specific offsets
    20. 'connector.specific-offsets.0.partition' = '0',
    21. 'connector.specific-offsets.0.offset' = '42',
    22. 'connector.specific-offsets.1.partition' = '1',
    23. 'connector.specific-offsets.1.offset' = '300',
    24. 'connector.sink-partitioner' = '...', -- optional: output partitioning from Flink's partitions
    25. -- into Kafka's partitions valid are "fixed"
    26. -- (each Flink partition ends up in at most one Kafka partition),
    27. -- "round-robin" (a Flink partition is distributed to
    28. -- Kafka partitions round-robin)
    29. -- "custom" (use a custom FlinkKafkaPartitioner subclass)
    30. -- optional: used in case of sink partitioner custom
    31. 'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner'
    32. )

    Specify the start reading position: By default, the Kafka source will start reading data from the committed group offsets in Zookeeper or Kafka brokers. You can specify other start positions, which correspond to the configurations in section Kafka Consumers Start Position Configuration.

    Flink-Kafka Sink Partitioning: By default, a Kafka sink writes to at most as many partitions as its own parallelism (each parallel instance of the sink writes to exactly one partition). In order to distribute the writes to more partitions or control the routing of rows into partitions, a custom sink partitioner can be provided. The round-robin partitioner is useful to avoid an unbalanced partitioning. However, it will cause a lot of network connections between all the Flink instances and all the Kafka brokers.

    Consistency guarantees: By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with checkpointing enabled.

    Kafka 0.10+ Timestamps: Since Kafka 0.10, Kafka messages have a timestamp as metadata that specifies when the record was written into the Kafka topic. These timestamps can be used for a rowtime attribute by selecting timestamps: from-source in YAML and timestampsFromSource() in Java/Scala respectively.

    Kafka 0.11+ Versioning: Since Flink 1.7, the Kafka connector definition should be independent of a hard-coded Kafka version. Use the connector version universal as a wildcard for Flink’s Kafka connector that is compatible with all Kafka versions starting from 0.11.

    Make sure to add the version-specific Kafka dependency. In addition, a corresponding format needs to be specified for reading and writing rows from and to Kafka.

    Elasticsearch Connector

    Sink: Streaming Append ModeSink: Streaming Upsert ModeFormat: JSON-only

    The Elasticsearch connector allows for writing into an index of the Elasticsearch search engine.

    The connector can operate in upsert mode for exchanging UPSERT/DELETE messages with the external system using a key defined by the query.

    For append-only queries, the connector can also operate in append mode for exchanging only INSERT messages with the external system. If no key is defined by the query, a key is automatically generated by Elasticsearch.

    The connector can be defined as follows:

    1. .connect(
    2. new Elasticsearch()
    3. .version("6") // required: valid connector versions are "6"
    4. .host("localhost", 9200, "http") // required: one or more Elasticsearch hosts to connect to
    5. .index("MyUsers") // required: Elasticsearch index
    6. .documentType("user") // required: Elasticsearch document type
    7. .keyDelimiter("$") // optional: delimiter for composite keys ("_" by default)
    8. // e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
    9. .keyNullLiteral("n/a") // optional: representation for null fields in keys ("null" by default)
    10. // optional: failure handling strategy in case a request to Elasticsearch fails (fail by default)
    11. .failureHandlerFail() // optional: throws an exception if a request fails and causes a job failure
    12. .failureHandlerIgnore() // or ignores failures and drops the request
    13. .failureHandlerRetryRejected() // or re-adds requests that have failed due to queue capacity saturation
    14. .failureHandlerCustom(...) // or custom failure handling with a ActionRequestFailureHandler subclass
    15. // optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
    16. .disableFlushOnCheckpoint() // optional: disables flushing on checkpoint (see notes below!)
    17. .bulkFlushMaxActions(42) // optional: maximum number of actions to buffer for each bulk request
    18. .bulkFlushMaxSize("42 mb") // optional: maximum size of buffered actions in bytes per bulk request
    19. // (only MB granularity is supported)
    20. .bulkFlushInterval(60000L) // optional: bulk flush interval (in milliseconds)
    21. .bulkFlushBackoffConstant() // optional: use a constant backoff type
    22. .bulkFlushBackoffExponential() // or use an exponential backoff type
    23. .bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries
    24. .bulkFlushBackoffDelay(30000L) // optional: delay between each backoff attempt (in milliseconds)
    25. // optional: connection properties to be used during REST communication to Elasticsearch
    26. .connectionMaxRetryTimeout(3) // optional: maximum timeout (in milliseconds) between retries
    27. .connectionPathPrefix("/v1") // optional: prefix string to be added to every REST communication
    28. )
    1. .connect(
    2. Elasticsearch()
    3. .version("6") # required: valid connector versions are "6"
    4. .host("localhost", 9200, "http") # required: one or more Elasticsearch hosts to connect to
    5. .index("MyUsers") # required: Elasticsearch index
    6. .document_type("user") # required: Elasticsearch document type
    7. .key_delimiter("$") # optional: delimiter for composite keys ("_" by default)
    8. # e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
    9. .key_null_literal("n/a") # optional: representation for null fields in keys ("null" by default)
    10. # optional: failure handling strategy in case a request to Elasticsearch fails (fail by default)
    11. .failure_handler_fail() # optional: throws an exception if a request fails and causes a job failure
    12. .failure_handler_ignore() # or ignores failures and drops the request
    13. .failure_handler_retry_rejected() # or re-adds requests that have failed due to queue capacity saturation
    14. .failure_handler_custom(...) # or custom failure handling with a ActionRequestFailureHandler subclass
    15. # optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
    16. .disable_flush_on_checkpoint() # optional: disables flushing on checkpoint (see notes below!)
    17. .bulk_flush_max_actions(42) # optional: maximum number of actions to buffer for each bulk request
    18. .bulk_flush_max_size("42 mb") # optional: maximum size of buffered actions in bytes per bulk request
    19. # (only MB granularity is supported)
    20. .bulk_flush_interval(60000) # optional: bulk flush interval (in milliseconds)
    21. .bulk_flush_backoff_constant() # optional: use a constant backoff type
    22. .bulk_flush_backoff_exponential() # or use an exponential backoff type
    23. .bulk_flush_backoff_max_retries(3) # optional: maximum number of retries
    24. .bulk_flush_backoff_delay(30000) # optional: delay between each backoff attempt (in milliseconds)
    25. # optional: connection properties to be used during REST communication to Elasticsearch
    26. .connection_max_retry_timeout(3) # optional: maximum timeout (in milliseconds) between retries
    27. .connection_path_prefix("/v1") # optional: prefix string to be added to every REST communication
    28. )
    1. connector:
    2. type: elasticsearch
    3. version: 6 # required: valid connector versions are "6"
    4. hosts: # required: one or more Elasticsearch hosts to connect to
    5. - hostname: "localhost"
    6. port: 9200
    7. protocol: "http"
    8. index: "MyUsers" # required: Elasticsearch index
    9. document-type: "user" # required: Elasticsearch document type
    10. key-delimiter: "$" # optional: delimiter for composite keys ("_" by default)
    11. # e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
    12. key-null-literal: "n/a" # optional: representation for null fields in keys ("null" by default)
    13. # optional: failure handling strategy in case a request to Elasticsearch fails ("fail" by default)
    14. failure-handler: ... # valid strategies are "fail" (throws an exception if a request fails and
    15. # thus causes a job failure), "ignore" (ignores failures and drops the request),
    16. # "retry-rejected" (re-adds requests that have failed due to queue capacity
    17. # saturation), or "custom" for failure handling with a
    18. # ActionRequestFailureHandler subclass
    19. # optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
    20. flush-on-checkpoint: true # optional: disables flushing on checkpoint (see notes below!) ("true" by default)
    21. bulk-flush:
    22. max-actions: 42 # optional: maximum number of actions to buffer for each bulk request
    23. max-size: 42 mb # optional: maximum size of buffered actions in bytes per bulk request
    24. # (only MB granularity is supported)
    25. interval: 60000 # optional: bulk flush interval (in milliseconds)
    26. back-off: # optional: backoff strategy ("disabled" by default)
    27. type: ... # valid strategies are "disabled", "constant", or "exponential"
    28. max-retries: 3 # optional: maximum number of retries
    29. delay: 30000 # optional: delay between each backoff attempt (in milliseconds)
    30. # optional: connection properties to be used during REST communication to Elasticsearch
    31. connection-max-retry-timeout: 3 # optional: maximum timeout (in milliseconds) between retries
    32. connection-path-prefix: "/v1" # optional: prefix string to be added to every REST communication
    1. CREATE TABLE MyUserTable (
    2. ...
    3. ) WITH (
    4. 'connector.type' = 'elasticsearch', -- required: specify this table type is elasticsearch
    5. 'connector.version' = '6', -- required: valid connector versions are "6"
    6. 'connector.hosts.0.hostname' = 'host_name', -- required: one or more Elasticsearch hosts to connect to
    7. 'connector.hosts.0.port' = '9092',
    8. 'connector.hosts.0.protocol' = 'http',
    9. 'connector.index' = 'MyUsers', -- required: Elasticsearch index
    10. 'connector.document-type' = 'user', -- required: Elasticsearch document type
    11. 'update-mode' = 'append', -- optional: update mode when used as table sink.
    12. 'connector.key-delimiter' = '$', -- optional: delimiter for composite keys ("_" by default)
    13. -- e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
    14. 'connector.key-null-literal' = 'n/a', -- optional: representation for null fields in keys ("null" by default)
    15. 'connector.failure-handler' = '...', -- optional: failure handling strategy in case a request to
    16. -- Elasticsearch fails ("fail" by default).
    17. -- valid strategies are
    18. -- "fail" (throws an exception if a request fails and
    19. -- thus causes a job failure),
    20. -- "ignore" (ignores failures and drops the request),
    21. -- "retry-rejected" (re-adds requests that have failed due
    22. -- to queue capacity saturation),
    23. -- or "custom" for failure handling with a
    24. -- ActionRequestFailureHandler subclass
    25. -- optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
    26. 'connector.flush-on-checkpoint' = 'true', -- optional: disables flushing on checkpoint (see notes below!)
    27. -- ("true" by default)
    28. 'connector.bulk-flush.max-actions' = '42', -- optional: maximum number of actions to buffer
    29. -- for each bulk request
    30. 'connector.bulk-flush.max-size' = '42 mb', -- optional: maximum size of buffered actions in bytes
    31. -- per bulk request
    32. -- (only MB granularity is supported)
    33. 'connector.bulk-flush.interval' = '60000', -- optional: bulk flush interval (in milliseconds)
    34. 'connector.bulk-flush.back-off.type' = '...', -- optional: backoff strategy ("disabled" by default)
    35. -- valid strategies are "disabled", "constant",
    36. -- or "exponential"
    37. 'connector.bulk-flush.back-off.max-retries' = '3', -- optional: maximum number of retries
    38. 'connector.bulk-flush.back-off.delay' = '30000', -- optional: delay between each backoff attempt
    39. -- (in milliseconds)
    40. -- optional: connection properties to be used during REST communication to Elasticsearch
    41. 'connector.connection-max-retry-timeout' = '3', -- optional: maximum timeout (in milliseconds)
    42. -- between retries
    43. 'connector.connection-path-prefix' = '/v1' -- optional: prefix string to be added to every
    44. -- REST communication
    45. )

    Bulk flushing: For more information about characteristics of the optional flushing parameters see the corresponding low-level documentation.

    Disabling flushing on checkpoint: When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests.

    Key extraction: Flink automatically extracts valid keys from a query. For example, a query SELECT a, b, c FROM t GROUP BY a, b defines a composite key of the fields a and b. The Elasticsearch connector generates a document ID string for every row by concatenating all key fields in the order defined in the query using a key delimiter. A custom representation of null literals for key fields can be defined.

    Attention A JSON format defines how to encode documents for the external system, therefore, it must be added as a dependency.

    Table Formats

    Flink provides a set of table formats that can be used with table connectors.

    A format tag indicates the format type for matching with a connector.

    CSV Format

    Format: Serialization SchemaFormat: Deserialization Schema

    The CSV format aims to comply with RFC-4180 (“Common Format andMIME Type for Comma-Separated Values (CSV) Files”) proposed by the Internet Engineering Task Force (IETF).

    The format allows to read and write CSV data that corresponds to a given format schema. The format schema can bedefined either as a Flink type or derived from the desired table schema.

    If the format schema is equal to the table schema, the schema can also be automatically derived. This allows fordefining schema information only once. The names, types, and fields’ order of the format are determined by thetable’s schema. Time attributes are ignored if their origin is not a field. A from definition in the tableschema is interpreted as a field renaming in the format.

    The CSV format can be used as follows:

    1. .withFormat(
    2. new Csv()
    3. // required: define the schema either by using type information
    4. .schema(Type.ROW(...))
    5. // or use the table's schema
    6. .deriveSchema()
    7. .fieldDelimiter(';') // optional: field delimiter character (',' by default)
    8. .lineDelimiter("\r\n") // optional: line delimiter ("\n" by default;
    9. // otherwise "\r" or "\r\n" are allowed)
    10. .quoteCharacter('\'') // optional: quote character for enclosing field values ('"' by default)
    11. .allowComments() // optional: ignores comment lines that start with '#' (disabled by default);
    12. // if enabled, make sure to also ignore parse errors to allow empty rows
    13. .ignoreParseErrors() // optional: skip fields and rows with parse errors instead of failing;
    14. // fields are set to null in case of errors
    15. .arrayElementDelimiter("|") // optional: the array element delimiter string for separating
    16. // array and row element values (";" by default)
    17. .escapeCharacter('\\') // optional: escape character for escaping values (disabled by default)
    18. .nullLiteral("n/a") // optional: null literal string that is interpreted as a
    19. // null value (disabled by default)
    20. )
    1. .with_format(
    2. Csv()
    3. # required: define the schema either by using type information
    4. .schema(DataTypes.ROW(...))
    5. # or use the table's schema
    6. .derive_schema()
    7. .field_delimiter(';') # optional: field delimiter character (',' by default)
    8. .line_delimiter("\r\n") # optional: line delimiter ("\n" by default;
    9. # otherwise "\r" or "\r\n" are allowed)
    10. .quote_character('\'') # optional: quote character for enclosing field values ('"' by default)
    11. .allow_comments() # optional: ignores comment lines that start with '#' (disabled by default);
    12. # if enabled, make sure to also ignore parse errors to allow empty rows
    13. .ignore_parse_errors() # optional: skip fields and rows with parse errors instead of failing;
    14. # fields are set to null in case of errors
    15. .array_element_delimiter("|") # optional: the array element delimiter string for separating
    16. # array and row element values (";" by default)
    17. .escape_character('\\') # optional: escape character for escaping values (disabled by default)
    18. .null_literal("n/a") # optional: null literal string that is interpreted as a
    19. # null value (disabled by default)
    20. )
    1. format:
    2. type: csv
    3. # required: define the schema either by using type information
    4. schema: "ROW(lon FLOAT, rideTime TIMESTAMP)"
    5. # or use the table's schema
    6. derive-schema: true
    7. field-delimiter: ";" # optional: field delimiter character (',' by default)
    8. line-delimiter: "\r\n" # optional: line delimiter ("\n" by default; otherwise "\r" or "\r\n" are allowed)
    9. quote-character: "'" # optional: quote character for enclosing field values ('"' by default)
    10. allow-comments: true # optional: ignores comment lines that start with "#" (disabled by default);
    11. # if enabled, make sure to also ignore parse errors to allow empty rows
    12. ignore-parse-errors: true # optional: skip fields and rows with parse errors instead of failing;
    13. # fields are set to null in case of errors
    14. array-element-delimiter: "|" # optional: the array element delimiter string for separating
    15. # array and row element values (";" by default)
    16. escape-character: "\\" # optional: escape character for escaping values (disabled by default)
    17. null-literal: "n/a" # optional: null literal string that is interpreted as a
    18. # null value (disabled by default)
    1. CREATE TABLE MyUserTable (
    2. ...
    3. ) WITH (
    4. 'format.type' = 'csv', -- required: specify the schema type
    5. 'format.fields.0.name' = 'lon', -- required: define the schema either by using type information
    6. 'format.fields.0.type' = 'FLOAT',
    7. 'format.fields.1.name' = 'rideTime',
    8. 'format.fields.1.type' = 'TIMESTAMP',
    9. 'format.derive-schema' = 'true', -- or use the table's schema
    10. 'format.field-delimiter' = ';', -- optional: field delimiter character (',' by default)
    11. 'format.line-delimiter' = '\r\n', -- optional: line delimiter ("\n" by default; otherwise
    12. -- "\r" or "\r\n" are allowed)
    13. 'format.quote-character' = '''', -- optional: quote character for enclosing field values ('"' by default)
    14. 'format.allow-comments' = true, -- optional: ignores comment lines that start with "#"
    15. -- (disabled by default);
    16. -- if enabled, make sure to also ignore parse errors to allow empty rows
    17. 'format.ignore-parse-errors' = 'true', -- optional: skip fields and rows with parse errors instead of failing;
    18. -- fields are set to null in case of errors
    19. 'format.array-element-delimiter' = '|', -- optional: the array element delimiter string for separating
    20. -- array and row element values (";" by default)
    21. 'format.escape-character' = '\\', -- optional: escape character for escaping values (disabled by default)
    22. 'format.null-literal' = 'n/a' -- optional: null literal string that is interpreted as a
    23. -- null value (disabled by default)
    24. )

    The following table lists supported types that can be read and written:

    Supported Flink SQL Types
    ROW
    VARCHAR
    ARRAY[_]
    INT
    BIGINT
    FLOAT
    DOUBLE
    BOOLEAN
    DATE
    TIME
    TIMESTAMP
    DECIMAL
    NULL (unsupported yet)

    Numeric types: Value should be a number but the literal "null" can also be understood. An empty string isconsidered null. Values are also trimmed (leading/trailing white space). Numbers are parsed usingJava’s valueOf semantics. Other non-numeric strings may cause a parsing exception.

    String and time types: Value is not trimmed. The literal "null" can also be understood. Time typesmust be formatted according to the Java SQL time format with millisecond precision. For example:2018-01-01 for date, 20:43:59 for time, and 2018-01-01 20:43:59.999 for timestamp.

    Boolean type: Value is expected to be a boolean ("true", "false") string or "null". Empty strings areinterpreted as false. Values are trimmed (leading/trailing white space). Other values result in an exception.

    Nested types: Array and row types are supported for one level of nesting using the array element delimiter.

    Primitive byte arrays: Primitive byte arrays are handled in Base64-encoded representation.

    Line endings: Line endings need to be considered even for row-based connectors (such as Kafka)to be ignored for unquoted string fields at the end of a row.

    Escaping and quoting: The following table shows examples of how escaping and quoting affect the parsingof a string using * for escaping and ' for quoting:

    CSV FieldParsed String
    123'4**123'4
    '123''4*'123'4
    'a;b*'c'a;b'c
    'a;b''c'a;b'c

    Make sure to add the CSV format as a dependency.

    JSON Format

    Format: Serialization SchemaFormat: Deserialization Schema

    The JSON format allows to read and write JSON data that corresponds to a given format schema. The format schema can be defined either as a Flink type, as a JSON schema, or derived from the desired table schema. A Flink type enables a more SQL-like definition and mapping to the corresponding SQL data types. The JSON schema allows for more complex and nested structures.

    If the format schema is equal to the table schema, the schema can also be automatically derived. This allows for defining schema information only once. The names, types, and fields’ order of the format are determined by the table’s schema. Time attributes are ignored if their origin is not a field. A from definition in the table schema is interpreted as a field renaming in the format.

    The JSON format can be used as follows:

    1. .withFormat(
    2. new Json()
    3. .failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default
    4. // required: define the schema either by using type information which parses numbers to corresponding types
    5. .schema(Type.ROW(...))
    6. // or by using a JSON schema which parses to DECIMAL and TIMESTAMP
    7. .jsonSchema(
    8. "{" +
    9. " type: 'object'," +
    10. " properties: {" +
    11. " lon: {" +
    12. " type: 'number'" +
    13. " }," +
    14. " rideTime: {" +
    15. " type: 'string'," +
    16. " format: 'date-time'" +
    17. " }" +
    18. " }" +
    19. "}"
    20. )
    21. // or use the table's schema
    22. .deriveSchema()
    23. )
    1. .with_format(
    2. Json()
    3. .fail_on_missing_field(True) # optional: flag whether to fail if a field is missing or not, False by default
    4. # required: define the schema either by using type information which parses numbers to corresponding types
    5. .schema(DataTypes.ROW(...))
    6. # or by using a JSON schema which parses to DECIMAL and TIMESTAMP
    7. .json_schema(
    8. "{"
    9. " type: 'object',"
    10. " properties: {"
    11. " lon: {"
    12. " type: 'number'"
    13. " },"
    14. " rideTime: {"
    15. " type: 'string',"
    16. " format: 'date-time'"
    17. " }"
    18. " }"
    19. "}"
    20. )
    21. # or use the table's schema
    22. .derive_schema()
    23. )
    1. format:
    2. type: json
    3. fail-on-missing-field: true # optional: flag whether to fail if a field is missing or not, false by default
    4. # required: define the schema either by using a type string which parses numbers to corresponding types
    5. schema: "ROW(lon FLOAT, rideTime TIMESTAMP)"
    6. # or by using a JSON schema which parses to DECIMAL and TIMESTAMP
    7. json-schema: >
    8. {
    9. type: 'object',
    10. properties: {
    11. lon: {
    12. type: 'number'
    13. },
    14. rideTime: {
    15. type: 'string',
    16. format: 'date-time'
    17. }
    18. }
    19. }
    20. # or use the table's schema
    21. derive-schema: true
    1. CREATE TABLE MyUserTable (
    2. ...
    3. ) WITH (
    4. 'format.type' = 'json', -- required: specify the format type
    5. 'format.fail-on-missing-field' = 'true' -- optional: flag whether to fail if a field is missing or not, false by default
    6. 'format.fields.0.name' = 'lon', -- required: define the schema either by using a type string which parses numbers to corresponding types
    7. 'format.fields.0.type' = 'FLOAT',
    8. 'format.fields.1.name' = 'rideTime',
    9. 'format.fields.1.type' = 'TIMESTAMP',
    10. 'format.json-schema' = -- or by using a JSON schema which parses to DECIMAL and TIMESTAMP
    11. '{
    12. "type": "object",
    13. "properties": {
    14. "lon": {
    15. "type": "number"
    16. },
    17. "rideTime": {
    18. "type": "string",
    19. "format": "date-time"
    20. }
    21. }
    22. }',
    23. 'format.derive-schema' = 'true' -- or use the table's schema
    24. )

    The following table shows the mapping of JSON schema types to Flink SQL types:

    JSON schemaFlink SQL
    objectROW
    booleanBOOLEAN
    arrayARRAY[_]
    numberDECIMAL
    integerDECIMAL
    stringVARCHAR
    string with format: date-timeTIMESTAMP
    string with format: dateDATE
    string with format: timeTIME
    string with encoding: base64ARRAY[TINYINT]
    nullNULL (unsupported yet)

    Currently, Flink supports only a subset of the JSON schema specification draft-07. Union types (as well as allOf, anyOf, not) are not supported yet. oneOf and arrays of types are only supported for specifying nullability.

    Simple references that link to a common definition in the document are supported as shown in the more complex example below:

    1. {
    2. "definitions": {
    3. "address": {
    4. "type": "object",
    5. "properties": {
    6. "street_address": {
    7. "type": "string"
    8. },
    9. "city": {
    10. "type": "string"
    11. },
    12. "state": {
    13. "type": "string"
    14. }
    15. },
    16. "required": [
    17. "street_address",
    18. "city",
    19. "state"
    20. ]
    21. }
    22. },
    23. "type": "object",
    24. "properties": {
    25. "billing_address": {
    26. "$ref": "#/definitions/address"
    27. },
    28. "shipping_address": {
    29. "$ref": "#/definitions/address"
    30. },
    31. "optional_address": {
    32. "oneOf": [
    33. {
    34. "type": "null"
    35. },
    36. {
    37. "$ref": "#/definitions/address"
    38. }
    39. ]
    40. }
    41. }
    42. }

    Missing Field Handling: By default, a missing JSON field is set to null. You can enable strict JSON parsing that will cancel the source (and query) if a field is missing.

    Make sure to add the JSON format as a dependency.

    Apache Avro Format

    Format: Serialization SchemaFormat: Deserialization Schema

    The Apache Avro format allows to read and write Avro data that corresponds to a given format schema. The format schema can be defined either as a fully qualified class name of an Avro specific record or as an Avro schema string. If a class name is used, the class must be available in the classpath during runtime.

    The Avro format can be used as follows:

    1. .withFormat(
    2. new Avro()
    3. // required: define the schema either by using an Avro specific record class
    4. .recordClass(User.class)
    5. // or by using an Avro schema
    6. .avroSchema(
    7. "{" +
    8. " \"type\": \"record\"," +
    9. " \"name\": \"test\"," +
    10. " \"fields\" : [" +
    11. " {\"name\": \"a\", \"type\": \"long\"}," +
    12. " {\"name\": \"b\", \"type\": \"string\"}" +
    13. " ]" +
    14. "}"
    15. )
    16. )
    1. .with_format(
    2. Avro()
    3. # required: define the schema either by using an Avro specific record class
    4. .record_class("full.qualified.user.class.name")
    5. # or by using an Avro schema
    6. .avro_schema(
    7. "{"
    8. " \"type\": \"record\","
    9. " \"name\": \"test\","
    10. " \"fields\" : ["
    11. " {\"name\": \"a\", \"type\": \"long\"},"
    12. " {\"name\": \"b\", \"type\": \"string\"}"
    13. " ]"
    14. "}"
    15. )
    16. )
    1. format:
    2. type: avro
    3. # required: define the schema either by using an Avro specific record class
    4. record-class: "org.organization.types.User"
    5. # or by using an Avro schema
    6. avro-schema: >
    7. {
    8. "type": "record",
    9. "name": "test",
    10. "fields" : [
    11. {"name": "a", "type": "long"},
    12. {"name": "b", "type": "string"}
    13. ]
    14. }
    1. CREATE TABLE MyUserTable (
    2. ...
    3. ) WITH (
    4. 'format.type' = 'avro', -- required: specify the schema type
    5. 'format.record-class' = 'org.organization.types.User', -- required: define the schema either by using an Avro specific record class
    6. 'format.avro-schema' = -- or by using an Avro schema
    7. '{
    8. "type": "record",
    9. "name": "test",
    10. "fields" : [
    11. {"name": "a", "type": "long"},
    12. {"name": "b", "type": "string"}
    13. ]
    14. }'
    15. )

    Avro types are mapped to the corresponding SQL data types. Union types are only supported for specifying nullability otherwise they are converted to an ANY type. The following table shows the mapping:

    Avro schemaFlink SQL
    recordROW
    enumVARCHAR
    arrayARRAY[]
    mapMAP[VARCHAR, ]
    unionnon-null type or ANY
    fixedARRAY[TINYINT]
    stringVARCHAR
    bytesARRAY[TINYINT]
    intINT
    longBIGINT
    floatFLOAT
    doubleDOUBLE
    booleanBOOLEAN
    int with logicalType: dateDATE
    int with logicalType: time-millisTIME
    int with logicalType: time-microsINT
    long with logicalType: timestamp-millisTIMESTAMP
    long with logicalType: timestamp-microsBIGINT
    bytes with logicalType: decimalDECIMAL
    fixed with logicalType: decimalDECIMAL
    nullNULL (unsupported yet)

    Avro uses Joda-Time for representing logical date and time types in specific record classes. The Joda-Time dependency is not part of Flink’s distribution. Therefore, make sure that Joda-Time is in your classpath together with your specific record class during runtime. Avro formats specified via a schema string do not require Joda-Time to be present.

    Make sure to add the Apache Avro dependency.

    Old CSV Format

    Attention For prototyping purposes only!

    The old CSV format allows to read and write comma-separated rows using the filesystem connector.

    This format describes Flink’s non-standard CSV table source/sink. In the future, the format will bereplaced by a proper RFC-compliant version. Use the RFC-compliant CSV format when writing to Kafka.Use the old one for stream/batch filesystem operations for now.

    1. .withFormat(
    2. new OldCsv()
    3. .field("field1", Types.STRING) // required: ordered format fields
    4. .field("field2", Types.TIMESTAMP)
    5. .fieldDelimiter(",") // optional: string delimiter "," by default
    6. .lineDelimiter("\n") // optional: string delimiter "\n" by default
    7. .quoteCharacter('"') // optional: single character for string values, empty by default
    8. .commentPrefix('#') // optional: string to indicate comments, empty by default
    9. .ignoreFirstLine() // optional: ignore the first line, by default it is not skipped
    10. .ignoreParseErrors() // optional: skip records with parse error instead of failing by default
    11. )
    1. .with_format(
    2. OldCsv()
    3. .field("field1", DataTypes.STRING()) # required: ordered format fields
    4. .field("field2", DataTypes.TIMESTAMP())
    5. .field_delimiter(",") # optional: string delimiter "," by default
    6. .line_delimiter("\n") # optional: string delimiter "\n" by default
    7. .quote_character('"') # optional: single character for string values, empty by default
    8. .comment_prefix('#') # optional: string to indicate comments, empty by default
    9. .ignore_first_line() # optional: ignore the first line, by default it is not skipped
    10. .ignore_parse_errors() # optional: skip records with parse error instead of failing by default
    11. )
    1. format:
    2. type: csv
    3. fields: # required: ordered format fields
    4. - name: field1
    5. type: VARCHAR
    6. - name: field2
    7. type: TIMESTAMP
    8. field-delimiter: "," # optional: string delimiter "," by default
    9. line-delimiter: "\n" # optional: string delimiter "\n" by default
    10. quote-character: '"' # optional: single character for string values, empty by default
    11. comment-prefix: '#' # optional: string to indicate comments, empty by default
    12. ignore-first-line: false # optional: boolean flag to ignore the first line, by default it is not skipped
    13. ignore-parse-errors: true # optional: skip records with parse error instead of failing by default
    1. CREATE TABLE MyUserTable (
    2. ...
    3. ) WITH (
    4. 'format.type' = 'csv', -- required: specify the schema type
    5. 'format.fields.0.name' = 'lon', -- required: define the schema either by using type information
    6. 'format.fields.0.type' = 'FLOAT',
    7. 'format.fields.1.name' = 'rideTime',
    8. 'format.fields.1.type' = 'TIMESTAMP',
    9. 'format.field-delimiter' = ',', -- optional: string delimiter "," by default
    10. 'format.line-delimiter' = '\n', -- optional: string delimiter "\n" by default
    11. 'format.quote-character' = '"', -- optional: single character for string values, empty by default
    12. 'format.comment-prefix' = '#', -- optional: string to indicate comments, empty by default
    13. 'format.ignore-first-line' = 'false', -- optional: boolean flag to ignore the first line, by default it is not skipped
    14. 'format.ignore-parse-errors' = 'true' -- optional: skip records with parse error instead of failing by default
    15. )

    The old CSV format is included in Flink and does not require additional dependencies.

    Attention The old CSV format for writing rows is limited at the moment. Only a custom field delimiter is supported as optional parameter.

    Further TableSources and TableSinks

    The following table sources and sinks have not yet been migrated (or have not been migrated entirely) to the new unified interfaces.

    These are the additional TableSources which are provided with Flink:

    Class nameMaven dependencyBatch?Streaming?Description
    OrcTableSourceflink-orcYNA TableSource for ORC files.

    These are the additional TableSinks which are provided with Flink:

    Class nameMaven dependencyBatch?Streaming?Description
    CsvTableSinkflink-tableYAppendA simple sink for CSV files.
    JDBCAppendTableSinkflink-jdbcYAppendWrites a Table to a JDBC table.
    CassandraAppendTableSinkflink-connector-cassandraNAppendWrites a Table to a Cassandra table.

    OrcTableSource

    The OrcTableSource reads ORC files. ORC is a file format for structured data and stores the data in a compressed, columnar representation. ORC is very storage efficient and supports projection and filter push-down.

    An OrcTableSource is created as shown below:

    1. // create Hadoop Configuration
    2. Configuration config = new Configuration();
    3. OrcTableSource orcTableSource = OrcTableSource.builder()
    4. // path to ORC file(s). NOTE: By default, directories are recursively scanned.
    5. .path("file:///path/to/data")
    6. // schema of ORC files
    7. .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
    8. // Hadoop configuration
    9. .withConfiguration(config)
    10. // build OrcTableSource
    11. .build();
    1. // create Hadoop Configuration
    2. val config = new Configuration()
    3. val orcTableSource = OrcTableSource.builder()
    4. // path to ORC file(s). NOTE: By default, directories are recursively scanned.
    5. .path("file:///path/to/data")
    6. // schema of ORC files
    7. .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
    8. // Hadoop configuration
    9. .withConfiguration(config)
    10. // build OrcTableSource
    11. .build()

    Note: The OrcTableSource does not support ORC’s Union type yet.

    CsvTableSink

    The CsvTableSink emits a Table to one or more CSV files.

    The sink only supports append-only streaming tables. It cannot be used to emit a Table that is continuously updated. See the documentation on Table to Stream conversions for details. When emitting a streaming table, rows are written at least once (if checkpointing is enabled) and the CsvTableSink does not split output files into bucket files but continuously writes to the same files.

    1. CsvTableSink sink = new CsvTableSink(
    2. path, // output path
    3. "|", // optional: delimit files by '|'
    4. 1, // optional: write to a single file
    5. WriteMode.OVERWRITE); // optional: override existing files
    6. tableEnv.registerTableSink(
    7. "csvOutputTable",
    8. // specify table schema
    9. new String[]{"f0", "f1"},
    10. new TypeInformation[]{Types.STRING, Types.INT},
    11. sink);
    12. Table table = ...
    13. table.insertInto("csvOutputTable");
    1. val sink: CsvTableSink = new CsvTableSink(
    2. path, // output path
    3. fieldDelim = "|", // optional: delimit files by '|'
    4. numFiles = 1, // optional: write to a single file
    5. writeMode = WriteMode.OVERWRITE) // optional: override existing files
    6. tableEnv.registerTableSink(
    7. "csvOutputTable",
    8. // specify table schema
    9. Array[String]("f0", "f1"),
    10. Array[TypeInformation[_]](Types.STRING, Types.INT),
    11. sink)
    12. val table: Table = ???
    13. table.insertInto("csvOutputTable")
    1. field_names = ["f0", "f1"]
    2. field_types = [DataTypes.STRING(), DataTypes.INT()]
    3. sink = CsvTableSink(
    4. field_names,
    5. field_types,
    6. path, # output path
    7. "|", # optional: delimit files by '|'
    8. 1, # optional: write to a single file
    9. WriteMode.OVERWRITE # optional: override existing files
    10. )
    11. table_env.register_table_sink(
    12. "csvOutputTable",
    13. sink
    14. )
    15. table = ...
    16. table.insert_into("csvOutputTable")

    JDBCAppendTableSink

    The JDBCAppendTableSink emits a Table to a JDBC connection. The sink only supports append-only streaming tables. It cannot be used to emit a Table that is continuously updated. See the documentation on Table to Stream conversions for details.

    The JDBCAppendTableSink inserts each Table row at least once into the database table (if checkpointing is enabled). However, you can specify the insertion query using REPLACE or INSERT OVERWRITE to perform upsert writes to the database.

    To use the JDBC sink, you have to add the JDBC connector dependency (flink-jdbc) to your project. Then you can create the sink using JDBCAppendSinkBuilder:

    1. JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
    2. .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    3. .setDBUrl("jdbc:derby:memory:ebookshop")
    4. .setQuery("INSERT INTO books (id) VALUES (?)")
    5. .setParameterTypes(INT_TYPE_INFO)
    6. .build();
    7. tableEnv.registerTableSink(
    8. "jdbcOutputTable",
    9. // specify table schema
    10. new String[]{"id"},
    11. new TypeInformation[]{Types.INT},
    12. sink);
    13. Table table = ...
    14. table.insertInto("jdbcOutputTable");
    1. val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
    2. .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    3. .setDBUrl("jdbc:derby:memory:ebookshop")
    4. .setQuery("INSERT INTO books (id) VALUES (?)")
    5. .setParameterTypes(INT_TYPE_INFO)
    6. .build()
    7. tableEnv.registerTableSink(
    8. "jdbcOutputTable",
    9. // specify table schema
    10. Array[String]("id"),
    11. Array[TypeInformation[_]](Types.INT),
    12. sink)
    13. val table: Table = ???
    14. table.insertInto("jdbcOutputTable")

    Similar to using JDBCOutputFormat, you have to explicitly specify the name of the JDBC driver, the JDBC URL, the query to be executed, and the field types of the JDBC table.

    CassandraAppendTableSink

    The CassandraAppendTableSink emits a Table to a Cassandra table. The sink only supports append-only streaming tables. It cannot be used to emit a Table that is continuously updated. See the documentation on Table to Stream conversions for details.

    The CassandraAppendTableSink inserts all rows at least once into the Cassandra table if checkpointing is enabled. However, you can specify the query as upsert query.

    To use the CassandraAppendTableSink, you have to add the Cassandra connector dependency (flink-connector-cassandra) to your project. The example below shows how to use the CassandraAppendTableSink.

    1. ClusterBuilder builder = ... // configure Cassandra cluster connection
    2. CassandraAppendTableSink sink = new CassandraAppendTableSink(
    3. builder,
    4. // the query must match the schema of the table
    5. "INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)");
    6. tableEnv.registerTableSink(
    7. "cassandraOutputTable",
    8. // specify table schema
    9. new String[]{"id", "name", "value"},
    10. new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE},
    11. sink);
    12. Table table = ...
    13. table.insertInto(cassandraOutputTable);
    1. val builder: ClusterBuilder = ... // configure Cassandra cluster connection
    2. val sink: CassandraAppendTableSink = new CassandraAppendTableSink(
    3. builder,
    4. // the query must match the schema of the table
    5. "INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)")
    6. tableEnv.registerTableSink(
    7. "cassandraOutputTable",
    8. // specify table schema
    9. Array[String]("id", "name", "value"),
    10. Array[TypeInformation[_]](Types.INT, Types.STRING, Types.DOUBLE),
    11. sink)
    12. val table: Table = ???
    13. table.insertInto(cassandraOutputTable)