• Flink Operations Playground
    • Anatomy of this Playground
    • Starting the Playground
    • Entering the Playground
      • Flink WebUI
      • Logs
      • Flink CLI
      • Flink REST API
      • Kafka Topics
    • Time to Play!
      • Listing Running Jobs
      • Observing Failure & Recovery
        • Step 1: Observing the Output
        • Step 2: Introducing a Fault
        • Step 3: Recovery
      • Upgrading & Rescaling a Job
        • Step 1: Stopping the Job
        • Step 2a: Restart Job without Changes
        • Step 2b: Restart Job with a Different Parallelism (Rescaling)
      • Querying the Metrics of a Job
    • Variants

    Flink Operations Playground

    There are many ways to deploy and operate Apache Flink in various environments. Regardless of thisvariety, the fundamental building blocks of a Flink Cluster remain the same, and similaroperational principles apply.

    In this playground, you will learn how to manage and run Flink Jobs. You will see how to deploy and monitor an application, experience how Flink recovers from Job failure, and perform everyday operational tasks like upgrades and rescaling.

    Anatomy of this Playground

    This playground consists of a long livingFlink Session Cluster and a KafkaCluster.

    A Flink Cluster always consists of a Flink Master and one or more Flink TaskManagers. The Flink Master is responsible for handling Job submissions, the supervision of Jobs as well as resource management. The Flink TaskManagers are the worker processes and are responsible for the execution of the actual Tasks which make up a Flink Job. In this playground you will start with a single TaskManager, but scale out to more TaskManagers later. Additionally, this playground comes with a dedicated client container, which we use to submit the Flink Job initially and to perform various operational tasks later on. The client container is notneeded by the Flink Cluster itself but only included for ease of use.

    The Kafka Cluster consists of a Zookeeper server and a Kafka Broker.

    Flink Docker Playground

    When the playground is started a Flink Job called Flink Event Count will be submitted to the Flink Master. Additionally, two Kafka Topics input and output are created.

    Click Event Count Example

    The Job consumes ClickEvents from the input topic, each with a timestamp and a page. The events are then keyed by page and counted in 15 secondwindows. The results are written to the output topic.

    There are six different pages and we generate 1000 click events per page and 15 seconds. Hence, the output of the Flink job should show 1000 views per page and window.

    Starting the Playground

    The playground environment is set up in just a few steps. We will walk you through the necessary commands and show how to validate that everything is running correctly.

    We assume that you have that you have docker (1.12+) anddocker-compose (2.1+) installed on your machine.

    The required configuration files are available in the flink-playgrounds repository. Check it out and spinup the environment:

    1. git clone --branch release-1.9 https://github.com/apache/flink-playgrounds.git
    2. cd flink-playgrounds/operations-playground
    3. docker-compose build
    4. docker-compose up -d

    Afterwards, you can inspect the running Docker containers with the following command:

    1. docker-compose ps
    2. Name Command State Ports
    3. -----------------------------------------------------------------------------------------------------------------------------
    4. operations-playground_clickevent-generator_1 /docker-entrypoint.sh java ... Up 6123/tcp, 8081/tcp
    5. operations-playground_client_1 /docker-entrypoint.sh flin ... Exit 0
    6. operations-playground_jobmanager_1 /docker-entrypoint.sh jobm ... Up 6123/tcp, 0.0.0.0:8081->8081/tcp
    7. operations-playground_kafka_1 start-kafka.sh Up 0.0.0.0:9094->9094/tcp
    8. operations-playground_taskmanager_1 /docker-entrypoint.sh task ... Up 6123/tcp, 8081/tcp
    9. operations-playground_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

    This indicates that the client container has successfully submitted the Flink Job (Exit 0) and all cluster components as well as the data generator are running (Up).

    You can stop the playground environment by calling:

    1. docker-compose down -v

    Entering the Playground

    There are many things you can try and check out in this playground. In the following two sections we will show you how to interact with the Flink Cluster and demonstrate some of Flink’s key features.

    The most natural starting point to observe your Flink Cluster is the WebUI exposed under http://localhost:8081. If everything went well, you’ll see that the cluster initially consists of one TaskManager and executes a Job called Click Event Count.

    Playground Flink WebUI

    The Flink WebUI contains a lot of useful and interesting information about your Flink Cluster and its Jobs (JobGraph, Metrics, Checkpointing Statistics, TaskManager Status,…).

    Logs

    JobManager

    The JobManager logs can be tailed via docker-compose.

    1. docker-compose logs -f jobmanager

    After the initial startup you should mainly see log messages for every checkpoint completion.

    TaskManager

    The TaskManager log can be tailed in the same way.

    1. docker-compose logs -f taskmanager

    After the initial startup you should mainly see log messages for every checkpoint completion.

    The Flink CLI can be used from within the client container. Forexample, to print the help message of the Flink CLI you can run

    1. docker-compose run --no-deps client flink --help

    The Flink REST API is exposed vialocalhost:8081 on the host or via jobmanager:8081 from the client container, e.g. to list allcurrently running jobs, you can run:

    1. curl localhost:8081/jobs

    Kafka Topics

    You can look at the records that are written to the Kafka Topics by running

    1. //input topic (1000 records/s)
    2. docker-compose exec kafka kafka-console-consumer.sh \
    3. --bootstrap-server localhost:9092 --topic input
    4. //output topic (24 records/min)
    5. docker-compose exec kafka kafka-console-consumer.sh \
    6. --bootstrap-server localhost:9092 --topic output

    Time to Play!

    Now that you learned how to interact with Flink and the Docker containers, let’s have a look at some common operational tasks that you can try out on our playground.All of these tasks are independent of each other, i.e.i you can perform them in any order. Most tasks can be executed via the CLI and the REST API.

    Listing Running Jobs

    Command

    1. docker-compose run --no-deps client flink list

    Expected Output

    1. Waiting for response...
    2. ------------------ Running/Restarting Jobs -------------------
    3. 16.07.2019 16:37:55 : <job-id> : Click Event Count (RUNNING)
    4. --------------------------------------------------------------
    5. No scheduled jobs.

    Request

    1. curl localhost:8081/jobs

    Expected Response (pretty-printed)

    1. {
    2. "jobs": [
    3. {
    4. "id": "<job-id>",
    5. "status": "RUNNING"
    6. }
    7. ]
    8. }

    The JobID is assigned to a Job upon submission and is needed to perform actions on the Job via the CLI or REST API.

    Observing Failure & Recovery

    Flink provides exactly-once processing guarantees under (partial) failure. In this playground you can observe and - to some extent - verify this behavior.

    Step 1: Observing the Output

    As described above, the events in this playground are generate such that each window contains exactly one thousand records. So, in order to verify that Flink successfully recovers from a TaskManager failure without data loss or duplication you can tail the output topic and check that - after recovery - all windows are present and the count is correct.

    For this, start reading from the output topic and leave this command running until after recovery (Step 3).

    1. docker-compose exec kafka kafka-console-consumer.sh \
    2. --bootstrap-server localhost:9092 --topic output

    Step 2: Introducing a Fault

    In order to simulate a partial failure you can kill a TaskManager. In a production setup, this could correspond to a loss of the TaskManager process, the TaskManager machine or simply a transient exception being thrown from the framework or user code (e.g. due to the temporary unavailability of an external resource).

    1. docker-compose kill taskmanager

    After a few seconds, Flink will notice the loss of the TaskManager, cancel the affected Job, and immediately resubmit it for recovery.When the Job gets restarted, its tasks remain in the SCHEDULED state, which is indicated by the purple colored squares (see screenshot below).

    Playground Flink WebUI

    Note: Even though the tasks of the job are in SCHEDULED state and not RUNNING yet, the overall status of a Job is shown as RUNNING.

    At this point, the tasks of the Job cannot move from the SCHEDULED state to RUNNING because thereare no resources (TaskSlots provided by TaskManagers) to the run the tasks.Until a new TaskManager becomes available, the Job will go through a cycle of cancellations and resubmissions.

    In the meantime, the data generator keeps pushing ClickEvents into the input topic. This is similar to a real production setup where data is produced while the Job to process it is down.

    Step 3: Recovery

    Once you restart the TaskManager, it reconnects to the Master.

    1. docker-compose up -d taskmanager

    When the Master is notified about the new TaskManager, it schedules the tasks of the recovering Job to the newly available TaskSlots. Upon restart, the tasks recover their state fromthe last successful checkpoint that was takenbefore the failure and switch to the RUNNING state.

    The Job will quickly process the full backlog of input events (accumulated during the outage) from Kafka and produce output at a much higher rate (> 24 records/minute) until it reaches the head of the stream. In the output you will see that all keys (pages) are present for all time windows and that every count is exactly one thousand. Since we are using the FlinkKafkaProducerin its “at-least-once” mode, there is a chance that you will see some duplicate output records.

    Note: Most production setups rely on a resource manager (Kubernetes, Yarn, Mesos) to automatically restart failed processes.

    Upgrading & Rescaling a Job

    Upgrading a Flink Job always involves two steps: First, the Flink Job is gracefully stopped with aSavepoint. A Savepoint is a consistent snapshot of the complete application state at a well-defined, globally consistent point in time (similar to a checkpoint). Second, the upgraded Flink Job is started from the Savepoint. In this context “upgrade” can mean different things including the following:

    • An upgrade to the configuration (incl. the parallelism of the Job)
    • An upgrade to the topology of the Job (added/removed Operators)
    • An upgrade to the user-defined functions of the Job

    Before starting with the upgrade you might want to start tailing the output topic, in order to observe that no data is lost or corrupted in the course the upgrade.

    1. docker-compose exec kafka kafka-console-consumer.sh \
    2. --bootstrap-server localhost:9092 --topic output

    Step 1: Stopping the Job

    To gracefully stop the Job, you need to use the “stop” command of either the CLI or the REST API. For this you will need the JobID of the Job, which you can obtain by listing all running Jobs or from the WebUI. With the JobID you can proceed to stopping the Job:

    Command

    1. docker-compose run --no-deps client flink stop <job-id>

    Expected Output

    1. Suspending job "<job-id>" with a savepoint.
    2. Suspended job "<job-id>" with a savepoint.

    The Savepoint has been stored to the state.savepoint.dir configured in the flink-conf.yaml, which is mounted under /tmp/flink-savepoints-directory/ on your local machine. You will need the path to this Savepoint in the next step. In case of the REST API this path was already part of the response, you will need to have a look at the filesystem directly.

    Command

    1. ls -lia /tmp/flink-savepoints-directory

    Expected Output

    1. total 0
    2. 17 drwxr-xr-x 3 root root 60 17 jul 17:05 .
    3. 2 drwxrwxrwt 135 root root 3420 17 jul 17:09 ..
    4. 1002 drwxr-xr-x 2 root root 140 17 jul 17:05 savepoint-<short-job-id>-<uuid>

    Request

    1. # triggering stop
    2. curl -X POST localhost:8081/jobs/<job-id>/stop -d '{"drain": false}'

    Expected Response (pretty-printed)

    1. {
    2. "request-id": "<trigger-id>"
    3. }

    Request

    1. # check status of stop action and retrieve savepoint path
    2. curl localhost:8081/jobs/<job-id>/savepoints/<trigger-id>

    Expected Response (pretty-printed)

    1. {
    2. "status": {
    3. "id": "COMPLETED"
    4. },
    5. "operation": {
    6. "location": "<savepoint-path>"
    7. }

    Step 2a: Restart Job without Changes

    You can now restart the upgraded Job from this Savepoint. For simplicity, you can start by restarting it without any changes.

    Command

    1. docker-compose run --no-deps client flink run -s <savepoint-path> \
    2. -d /opt/ClickCountJob.jar \
    3. --bootstrap.servers kafka:9092 --checkpointing --event-time

    Expected Output

    1. Starting execution of program
    2. Job has been submitted with JobID <job-id>

    Request

    1. # Uploading the JAR from the Client container
    2. docker-compose run --no-deps client curl -X POST -H "Expect:" \
    3. -F "jarfile=@/opt/ClickCountJob.jar" http://jobmanager:8081/jars/upload

    Expected Response (pretty-printed)

    1. {
    2. "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>",
    3. "status": "success"
    4. }

    Request

    1. # Submitting the Job
    2. curl -X POST http://localhost:8081/jars/<jar-id>/run \
    3. -d '{"programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}'

    Expected Response (pretty-printed)

    1. {
    2. "jobid": "<job-id>"
    3. }

    Once the Job is RUNNING again, you will see in the output Topic that records are produced at a higher rate while the Job is processing the backlog accumulated during the outage. Additionally, you will see that no data was lost during the upgrade: all windows are present with a count of exactly one thousand.

    Step 2b: Restart Job with a Different Parallelism (Rescaling)

    Alternatively, you could also rescale the Job from this Savepoint by passing a different parallelismduring resubmission.

    Command

    1. docker-compose run --no-deps client flink run -p 3 -s <savepoint-path> \
    2. -d /opt/ClickCountJob.jar \
    3. --bootstrap.servers kafka:9092 --checkpointing --event-time

    Expected Output

    1. Starting execution of program
    2. Job has been submitted with JobID <job-id>

    Request

    1. # Uploading the JAR from the Client container
    2. docker-compose run --no-deps client curl -X POST -H "Expect:" \
    3. -F "jarfile=@/opt/ClickCountJob.jar" http://jobmanager:8081/jars/upload

    Expected Response (pretty-printed)

    1. {
    2. "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>",
    3. "status": "success"
    4. }

    Request

    1. # Submitting the Job
    2. curl -X POST http://localhost:8081/jars/<jar-id>/run \
    3. -d '{"parallelism": 3, "programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}'

    Expected Response (pretty-printed

    1. {
    2. "jobid": "<job-id>"
    3. }

    Now, the Job has been resubmitted, but it will not start as there are not enough TaskSlots toexecute it with the increased parallelism (2 available, 3 needed). With

    1. docker-compose scale taskmanager=2

    you can add a second TaskManager with two TaskSlots to the Flink Cluster, which will automatically register with the Flink Master. Shortly after adding the TaskManager the Job should start running again.

    Once the Job is “RUNNING” again, you will see in the output Topic that now data was lost during rescaling: all windows are present with a count of exactly one thousand.

    Querying the Metrics of a Job

    The Flink Master exposes system and user metricsvia its REST API.

    The endpoint depends on the scope of these metrics. Metrics scoped to a Job can be listed via jobs/<job-id>/metrics. The actual value of a metric can be queried via the get query parameter.

    Request

    1. curl "localhost:8081/jobs/<jod-id>/metrics?get=lastCheckpointSize"

    Expected Response (pretty-printed; no placeholders)

    1. [
    2. {
    3. "id": "lastCheckpointSize",
    4. "value": "9378"
    5. }
    6. ]

    The REST API can not only be used to query metrics, but you can also retrieve detailed informationabout the status of a running Job.

    Request

    1. # find the vertex-id of the vertex of interest
    2. curl localhost:8081/jobs/<jod-id>

    Expected Response (pretty-printed)

    1. {
    2. "jid": "<job-id>",
    3. "name": "Click Event Count",
    4. "isStoppable": false,
    5. "state": "RUNNING",
    6. "start-time": 1564467066026,
    7. "end-time": -1,
    8. "duration": 374793,
    9. "now": 1564467440819,
    10. "timestamps": {
    11. "CREATED": 1564467066026,
    12. "FINISHED": 0,
    13. "SUSPENDED": 0,
    14. "FAILING": 0,
    15. "CANCELLING": 0,
    16. "CANCELED": 0,
    17. "RECONCILING": 0,
    18. "RUNNING": 1564467066126,
    19. "FAILED": 0,
    20. "RESTARTING": 0
    21. },
    22. "vertices": [
    23. {
    24. "id": "<vertex-id>",
    25. "name": "ClickEvent Source",
    26. "parallelism": 2,
    27. "status": "RUNNING",
    28. "start-time": 1564467066423,
    29. "end-time": -1,
    30. "duration": 374396,
    31. "tasks": {
    32. "CREATED": 0,
    33. "FINISHED": 0,
    34. "DEPLOYING": 0,
    35. "RUNNING": 2,
    36. "CANCELING": 0,
    37. "FAILED": 0,
    38. "CANCELED": 0,
    39. "RECONCILING": 0,
    40. "SCHEDULED": 0
    41. },
    42. "metrics": {
    43. "read-bytes": 0,
    44. "read-bytes-complete": true,
    45. "write-bytes": 5033461,
    46. "write-bytes-complete": true,
    47. "read-records": 0,
    48. "read-records-complete": true,
    49. "write-records": 166351,
    50. "write-records-complete": true
    51. }
    52. },
    53. {
    54. "id": "<vertex-id>",
    55. "name": "Timestamps/Watermarks",
    56. "parallelism": 2,
    57. "status": "RUNNING",
    58. "start-time": 1564467066441,
    59. "end-time": -1,
    60. "duration": 374378,
    61. "tasks": {
    62. "CREATED": 0,
    63. "FINISHED": 0,
    64. "DEPLOYING": 0,
    65. "RUNNING": 2,
    66. "CANCELING": 0,
    67. "FAILED": 0,
    68. "CANCELED": 0,
    69. "RECONCILING": 0,
    70. "SCHEDULED": 0
    71. },
    72. "metrics": {
    73. "read-bytes": 5066280,
    74. "read-bytes-complete": true,
    75. "write-bytes": 5033496,
    76. "write-bytes-complete": true,
    77. "read-records": 166349,
    78. "read-records-complete": true,
    79. "write-records": 166349,
    80. "write-records-complete": true
    81. }
    82. },
    83. {
    84. "id": "<vertex-id>",
    85. "name": "ClickEvent Counter",
    86. "parallelism": 2,
    87. "status": "RUNNING",
    88. "start-time": 1564467066469,
    89. "end-time": -1,
    90. "duration": 374350,
    91. "tasks": {
    92. "CREATED": 0,
    93. "FINISHED": 0,
    94. "DEPLOYING": 0,
    95. "RUNNING": 2,
    96. "CANCELING": 0,
    97. "FAILED": 0,
    98. "CANCELED": 0,
    99. "RECONCILING": 0,
    100. "SCHEDULED": 0
    101. },
    102. "metrics": {
    103. "read-bytes": 5085332,
    104. "read-bytes-complete": true,
    105. "write-bytes": 316,
    106. "write-bytes-complete": true,
    107. "read-records": 166305,
    108. "read-records-complete": true,
    109. "write-records": 6,
    110. "write-records-complete": true
    111. }
    112. },
    113. {
    114. "id": "<vertex-id>",
    115. "name": "ClickEventStatistics Sink",
    116. "parallelism": 2,
    117. "status": "RUNNING",
    118. "start-time": 1564467066476,
    119. "end-time": -1,
    120. "duration": 374343,
    121. "tasks": {
    122. "CREATED": 0,
    123. "FINISHED": 0,
    124. "DEPLOYING": 0,
    125. "RUNNING": 2,
    126. "CANCELING": 0,
    127. "FAILED": 0,
    128. "CANCELED": 0,
    129. "RECONCILING": 0,
    130. "SCHEDULED": 0
    131. },
    132. "metrics": {
    133. "read-bytes": 20668,
    134. "read-bytes-complete": true,
    135. "write-bytes": 0,
    136. "write-bytes-complete": true,
    137. "read-records": 6,
    138. "read-records-complete": true,
    139. "write-records": 0,
    140. "write-records-complete": true
    141. }
    142. }
    143. ],
    144. "status-counts": {
    145. "CREATED": 0,
    146. "FINISHED": 0,
    147. "DEPLOYING": 0,
    148. "RUNNING": 4,
    149. "CANCELING": 0,
    150. "FAILED": 0,
    151. "CANCELED": 0,
    152. "RECONCILING": 0,
    153. "SCHEDULED": 0
    154. },
    155. "plan": {
    156. "jid": "<job-id>",
    157. "name": "Click Event Count",
    158. "nodes": [
    159. {
    160. "id": "<vertex-id>",
    161. "parallelism": 2,
    162. "operator": "",
    163. "operator_strategy": "",
    164. "description": "ClickEventStatistics Sink",
    165. "inputs": [
    166. {
    167. "num": 0,
    168. "id": "<vertex-id>",
    169. "ship_strategy": "FORWARD",
    170. "exchange": "pipelined_bounded"
    171. }
    172. ],
    173. "optimizer_properties": {}
    174. },
    175. {
    176. "id": "<vertex-id>",
    177. "parallelism": 2,
    178. "operator": "",
    179. "operator_strategy": "",
    180. "description": "ClickEvent Counter",
    181. "inputs": [
    182. {
    183. "num": 0,
    184. "id": "<vertex-id>",
    185. "ship_strategy": "HASH",
    186. "exchange": "pipelined_bounded"
    187. }
    188. ],
    189. "optimizer_properties": {}
    190. },
    191. {
    192. "id": "<vertex-id>",
    193. "parallelism": 2,
    194. "operator": "",
    195. "operator_strategy": "",
    196. "description": "Timestamps/Watermarks",
    197. "inputs": [
    198. {
    199. "num": 0,
    200. "id": "<vertex-id>",
    201. "ship_strategy": "FORWARD",
    202. "exchange": "pipelined_bounded"
    203. }
    204. ],
    205. "optimizer_properties": {}
    206. },
    207. {
    208. "id": "<vertex-id>",
    209. "parallelism": 2,
    210. "operator": "",
    211. "operator_strategy": "",
    212. "description": "ClickEvent Source",
    213. "optimizer_properties": {}
    214. }
    215. ]
    216. }
    217. }

    Please consult the REST API referencefor a complete list of possible queries including how to query metrics of different scopes (e.g. TaskManager metrics);

    Variants

    You might have noticed that the Click Event Count application was always started with —checkpointing and —event-time program arguments. By omitting these in the command of the client container in the docker-compose.yaml, you can change the behavior of the Job.

    • —checkpointing enables checkpoint, which is Flink’s fault-tolerance mechanism. If you run without it and go through failure and recovery, you should will see that data is actually lost.

    • —event-time enables event time semantics for your Job. When disabled, the Job will assign events to windows based on the wall-clock time instead of the timestamp of the ClickEvent. Consequently, the number of events per window will not be exactlyone thousand anymore.

    The Click Event Count application also has another option, turned off by default, that you can enable to explore the behavior of this job under backpressure. You can add this option in the command of the client container in docker-compose.yaml.

    • —backpressure adds an additional operator into the middle of the job that causes severe backpressure during even-numbered minutes (e.g., during 10:12, but not during 10:13). This can be observed by inspecting various network metrics such as outputQueueLength and outPoolUsage, and/or by using the backpressure monitoring available in the WebUI.