• 窗口
    • Window Lifecycle
    • Keyed vs Non-Keyed Windows
    • Window Assigners
      • Tumbling Windows
      • Sliding Windows
      • Session Windows
      • Global Windows
    • Window Functions
      • ReduceFunction
      • AggregateFunction
      • FoldFunction
      • ProcessWindowFunction
      • ProcessWindowFunction with Incremental Aggregation
        • Incremental Window Aggregation with ReduceFunction
        • Incremental Window Aggregation with AggregateFunction
        • Incremental Window Aggregation with FoldFunction
      • Using per-window state in ProcessWindowFunction
      • WindowFunction (Legacy)
    • Triggers
      • Fire and Purge
      • Default Triggers of WindowAssigners
      • Built-in and Custom Triggers
    • Evictors
    • Allowed Lateness
      • Getting late data as a side output
      • Late elements considerations
    • Working with window results
      • Interaction of watermarks and windows
      • Consecutive windowed operations
    • Useful state size considerations

    窗口

    Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size,over which we can apply computations. This document focuses on how windowing is performed in Flink and how theprogrammer can benefit to the maximum from its offered functionality.

    The general structure of a windowed Flink program is presented below. The first snippet refers to keyed streams,while the second to non-keyed ones. As one can see, the only difference is the keyBy(…) call for the keyed streamsand the window(…) which becomes windowAll(…) for non-keyed streams. This is also going to serve as a roadmapfor the rest of the page.

    Keyed Windows

    1. stream
    2. .keyBy(...) <- keyed versus non-keyed windows
    3. .window(...) <- required: "assigner"
    4. [.trigger(...)] <- optional: "trigger" (else default trigger)
    5. [.evictor(...)] <- optional: "evictor" (else no evictor)
    6. [.allowedLateness(...)] <- optional: "lateness" (else zero)
    7. [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
    8. .reduce/aggregate/fold/apply() <- required: "function"
    9. [.getSideOutput(...)] <- optional: "output tag"

    Non-Keyed Windows

    1. stream
    2. .windowAll(...) <- required: "assigner"
    3. [.trigger(...)] <- optional: "trigger" (else default trigger)
    4. [.evictor(...)] <- optional: "evictor" (else no evictor)
    5. [.allowedLateness(...)] <- optional: "lateness" (else zero)
    6. [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
    7. .reduce/aggregate/fold/apply() <- required: "function"
    8. [.getSideOutput(...)] <- optional: "output tag"

    In the above, the commands in square brackets ([…]) are optional. This reveals that Flink allows you to customize yourwindowing logic in many different ways so that it best fits your needs.

    • Window Lifecycle
    • Keyed vs Non-Keyed Windows
    • Window Assigners
      • Tumbling Windows
      • Sliding Windows
      • Session Windows
      • Global Windows
    • Window Functions
      • ReduceFunction
      • AggregateFunction
      • FoldFunction
      • ProcessWindowFunction
      • ProcessWindowFunction with Incremental Aggregation
      • Using per-window state in ProcessWindowFunction
      • WindowFunction (Legacy)
    • Triggers
      • Fire and Purge
      • Default Triggers of WindowAssigners
      • Built-in and Custom Triggers
    • Evictors
    • Allowed Lateness
      • Getting late data as a side output
      • Late elements considerations
    • Working with window results
      • Interaction of watermarks and windows
      • Consecutive windowed operations
    • Useful state size considerations

    Window Lifecycle

    In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and thewindow is completely removed when the time (event or processing time) passes its end timestamp plus the user-specifiedallowed lateness (see Allowed Lateness). Flink guarantees removal only for time-basedwindows and not for other types, e.g. global windows (see Window Assigners). For example, with anevent-time-based windowing strategy that creates non-overlapping (or tumbling) windows every 5 minutes and has an allowedlateness of 1 min, Flink will create a new window for the interval between 12:00 and 12:05 when the first element witha timestamp that falls into this interval arrives, and it will remove it when the watermark passes the 12:06timestamp.

    In addition, each window will have a Trigger (see Triggers) and a function (ProcessWindowFunction, ReduceFunction,AggregateFunction or FoldFunction) (see Window Functions) attached to it. The function will contain the computation tobe applied to the contents of the window, while the Trigger specifies the conditions under which the window isconsidered ready for the function to be applied. A triggering policy might be something like “when the number of elementsin the window is more than 4”, or “when the watermark passes the end of the window”. A trigger can also decide topurge a window’s contents any time between its creation and removal. Purging in this case only refers to the elementsin the window, and not the window metadata. This means that new data can still be added to that window.

    Apart from the above, you can specify an Evictor (see Evictors) which will be able to removeelements from the window after the trigger fires and before and/or after the function is applied.

    In the following we go into more detail for each of the components above. We start with the required parts in the abovesnippet (see Keyed vs Non-Keyed Windows, Window Assigner, andWindow Function) before moving to the optional ones.

    Keyed vs Non-Keyed Windows

    The first thing to specify is whether your stream should be keyed or not. This has to be done before defining the window.Using the keyBy(…) will split your infinite stream into logical keyed streams. If keyBy(…) is not called, yourstream is not keyed.

    In the case of keyed streams, any attribute of your incoming events can be used as a key(more details here). Having a keyed stream willallow your windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processedindependently from the rest. All elements referring to the same key will be sent to the same parallel task.

    In case of non-keyed streams, your original stream will not be split into multiple logical streams and all the windowing logicwill be performed by a single task, i.e. with parallelism of 1.

    Window Assigners

    After specifying whether your stream is keyed or not, the next step is to define a window assigner.The window assigner defines how elements are assigned to windows. This is done by specifying the WindowAssignerof your choice in the window(…) (for keyed streams) or the windowAll() (for non-keyed streams) call.

    A WindowAssigner is responsible for assigning each incoming element to one or more windows. Flink comeswith pre-defined window assigners for the most common use cases, namely tumbling windows,sliding windows, session windows and global windows. You can also implement a custom window assigner byextending the WindowAssigner class. All built-in window assigners (except the globalwindows) assign elements to windows based on time, which can either be processing time or eventtime. Please take a look at our section on event time to learnabout the difference between processing time and event time and how timestamps and watermarks are generated.

    Time-based windows have a start timestamp (inclusive) and an end timestamp (exclusive)that together describe the size of the window. In code, Flink uses TimeWindow when working withtime-based windows which has methods for querying the start- and end-timestamp and also anadditional method maxTimestamp() that returns the largest allowed timestamp for a given windows.

    In the following, we show how Flink’s pre-defined window assigners work and how they are usedin a DataStream program. The following figures visualize the workings of each assigner. The purple circlesrepresent elements of the stream, which are partitioned by some key (in this case user 1, user 2 and user 3).The x-axis shows the progress of time.

    Tumbling Windows

    A tumbling windows assigner assigns each element to a window of a specified window size.Tumbling windows have a fixed size and do not overlap. For example, if you specify a tumblingwindow with a size of 5 minutes, the current window will be evaluated and a new window will bestarted every five minutes as illustrated by the following figure.

    窗口 - 图1

    The following code snippets show how to use tumbling windows.

    1. DataStream<T> input = ...;
    2. // tumbling event-time windows
    3. input
    4. .keyBy(<key selector>)
    5. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    6. .<windowed transformation>(<window function>);
    7. // tumbling processing-time windows
    8. input
    9. .keyBy(<key selector>)
    10. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    11. .<windowed transformation>(<window function>);
    12. // daily tumbling event-time windows offset by -8 hours.
    13. input
    14. .keyBy(<key selector>)
    15. .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    16. .<windowed transformation>(<window function>);
    1. val input: DataStream[T] = ...
    2. // tumbling event-time windows
    3. input
    4. .keyBy(<key selector>)
    5. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    6. .<windowed transformation>(<window function>)
    7. // tumbling processing-time windows
    8. input
    9. .keyBy(<key selector>)
    10. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    11. .<windowed transformation>(<window function>)
    12. // daily tumbling event-time windows offset by -8 hours.
    13. input
    14. .keyBy(<key selector>)
    15. .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    16. .<windowed transformation>(<window function>)

    Time intervals can be specified by using one of Time.milliseconds(x), Time.seconds(x),Time.minutes(x), and so on.

    As shown in the last example, tumbling window assigners also take an optional offsetparameter that can be used to change the alignment of windows. For example, without offsetshourly tumbling windows are aligned with epoch, that is you will get windows such as1:00:00.000 - 1:59:59.999, 2:00:00.000 - 2:59:59.999 and so on. If you want to changethat you can give an offset. With an offset of 15 minutes you would, for example, get1:15:00.000 - 2:14:59.999, 2:15:00.000 - 3:14:59.999 etc.An important use case for offsets is to adjust windows to timezones other than UTC-0.For example, in China you would have to specify an offset of Time.hours(-8).

    Sliding Windows

    The sliding windows assigner assigns elements to windows of fixed length. Similar to a tumblingwindows assigner, the size of the windows is configured by the window size parameter.An additional window slide parameter controls how frequently a sliding window is started. Hence,sliding windows can be overlapping if the slide is smaller than the window size. In this case elementsare assigned to multiple windows.

    For example, you could have windows of size 10 minutes that slides by 5 minutes. With this you get every5 minutes a window that contains the events that arrived during the last 10 minutes as depicted by thefollowing figure.

    窗口 - 图2

    The following code snippets show how to use sliding windows.

    1. DataStream<T> input = ...;
    2. // sliding event-time windows
    3. input
    4. .keyBy(<key selector>)
    5. .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    6. .<windowed transformation>(<window function>);
    7. // sliding processing-time windows
    8. input
    9. .keyBy(<key selector>)
    10. .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    11. .<windowed transformation>(<window function>);
    12. // sliding processing-time windows offset by -8 hours
    13. input
    14. .keyBy(<key selector>)
    15. .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    16. .<windowed transformation>(<window function>);
    1. val input: DataStream[T] = ...
    2. // sliding event-time windows
    3. input
    4. .keyBy(<key selector>)
    5. .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    6. .<windowed transformation>(<window function>)
    7. // sliding processing-time windows
    8. input
    9. .keyBy(<key selector>)
    10. .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    11. .<windowed transformation>(<window function>)
    12. // sliding processing-time windows offset by -8 hours
    13. input
    14. .keyBy(<key selector>)
    15. .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    16. .<windowed transformation>(<window function>)

    Time intervals can be specified by using one of Time.milliseconds(x), Time.seconds(x),Time.minutes(x), and so on.

    As shown in the last example, sliding window assigners also take an optional offset parameterthat can be used to change the alignment of windows. For example, without offsets hourly windowssliding by 30 minutes are aligned with epoch, that is you will get windows such as1:00:00.000 - 1:59:59.999, 1:30:00.000 - 2:29:59.999 and so on. If you want to change thatyou can give an offset. With an offset of 15 minutes you would, for example, get1:15:00.000 - 2:14:59.999, 1:45:00.000 - 2:44:59.999 etc.An important use case for offsets is to adjust windows to timezones other than UTC-0.For example, in China you would have to specify an offset of Time.hours(-8).

    Session Windows

    The session windows assigner groups elements by sessions of activity. Session windows do not overlap anddo not have a fixed start and end time, in contrast to tumbling windows and sliding windows. Instead asession window closes when it does not receive elements for a certain period of time, i.e., when a gap ofinactivity occurred. A session window assigner can be configured with either a static session gap or with a session gap extractor function which defines how long the period of inactivity is. When this period expires, the current session closes and subsequent elements are assigned to a new session window.

    窗口 - 图3

    The following code snippets show how to use session windows.

    1. DataStream<T> input = ...;
    2. // event-time session windows with static gap
    3. input
    4. .keyBy(<key selector>)
    5. .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    6. .<windowed transformation>(<window function>);
    7. // event-time session windows with dynamic gap
    8. input
    9. .keyBy(<key selector>)
    10. .window(EventTimeSessionWindows.withDynamicGap((element) -> {
    11. // determine and return session gap
    12. }))
    13. .<windowed transformation>(<window function>);
    14. // processing-time session windows with static gap
    15. input
    16. .keyBy(<key selector>)
    17. .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    18. .<windowed transformation>(<window function>);
    19. // processing-time session windows with dynamic gap
    20. input
    21. .keyBy(<key selector>)
    22. .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
    23. // determine and return session gap
    24. }))
    25. .<windowed transformation>(<window function>);
    1. val input: DataStream[T] = ...
    2. // event-time session windows with static gap
    3. input
    4. .keyBy(<key selector>)
    5. .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    6. .<windowed transformation>(<window function>)
    7. // event-time session windows with dynamic gap
    8. input
    9. .keyBy(<key selector>)
    10. .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
    11. override def extract(element: String): Long = {
    12. // determine and return session gap
    13. }
    14. }))
    15. .<windowed transformation>(<window function>)
    16. // processing-time session windows with static gap
    17. input
    18. .keyBy(<key selector>)
    19. .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    20. .<windowed transformation>(<window function>)
    21. // processing-time session windows with dynamic gap
    22. input
    23. .keyBy(<key selector>)
    24. .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
    25. override def extract(element: String): Long = {
    26. // determine and return session gap
    27. }
    28. }))
    29. .<windowed transformation>(<window function>)

    Static gaps can be specified by using one of Time.milliseconds(x), Time.seconds(x),Time.minutes(x), and so on.

    Dynamic gaps are specified by implementing the SessionWindowTimeGapExtractor interface.

    Attention Since session windows do not have a fixed start and end,they are evaluated differently than tumbling and sliding windows. Internally, a session window operatorcreates a new window for each arriving record and merges windows together if they are closer to each otherthan the defined gap.In order to be mergeable, a session window operator requires a merging Trigger and a mergingWindow Function, such as ReduceFunction, AggregateFunction, or ProcessWindowFunction(FoldFunction cannot merge.)

    Global Windows

    A global windows assigner assigns all elements with the same key to the same single global window.This windowing scheme is only useful if you also specify a custom trigger. Otherwise,no computation will be performed, as the global window does not have a natural end atwhich we could process the aggregated elements.

    窗口 - 图4

    The following code snippets show how to use a global window.

    1. DataStream<T> input = ...;
    2. input
    3. .keyBy(<key selector>)
    4. .window(GlobalWindows.create())
    5. .<windowed transformation>(<window function>);
    1. val input: DataStream[T] = ...
    2. input
    3. .keyBy(<key selector>)
    4. .window(GlobalWindows.create())
    5. .<windowed transformation>(<window function>)

    Window Functions

    After defining the window assigner, we need to specify the computation that we wantto perform on each of these windows. This is the responsibility of the window function, which is used to process theelements of each (possibly keyed) window once the system determines that a window is ready for processing(see triggers for how Flink determines when a window is ready).

    The window function can be one of ReduceFunction, AggregateFunction, FoldFunction or ProcessWindowFunction. The firsttwo can be executed more efficiently (see State Size section) because Flink can incrementally aggregatethe elements for each window as they arrive. A ProcessWindowFunction gets an Iterable for all the elements contained in awindow and additional meta information about the window to which the elements belong.

    A windowed transformation with a ProcessWindowFunction cannot be executed as efficiently as the othercases because Flink has to buffer all elements for a window internally before invoking the function.This can be mitigated by combining a ProcessWindowFunction with a ReduceFunction, AggregateFunction, or FoldFunction toget both incremental aggregation of window elements and the additional window metadata that theProcessWindowFunction receives. We will look at examples for each of these variants.

    ReduceFunction

    A ReduceFunction specifies how two elements from the input are combined to producean output element of the same type. Flink uses a ReduceFunction to incrementally aggregatethe elements of a window.

    A ReduceFunction can be defined and used like this:

    1. DataStream<Tuple2<String, Long>> input = ...;
    2. input
    3. .keyBy(<key selector>)
    4. .window(<window assigner>)
    5. .reduce(new ReduceFunction<Tuple2<String, Long>> {
    6. public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
    7. return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
    8. }
    9. });
    1. val input: DataStream[(String, Long)] = ...
    2. input
    3. .keyBy(<key selector>)
    4. .window(<window assigner>)
    5. .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }

    The above example sums up the second fields of the tuples for all elements in a window.

    AggregateFunction

    An AggregateFunction is a generalized version of a ReduceFunction that has three types: aninput type (IN), accumulator type (ACC), and an output type (OUT). The input type is the typeof elements in the input stream and the AggregateFunction has a method for adding one inputelement to an accumulator. The interface also has methods for creating an initial accumulator,for merging two accumulators into one accumulator and for extracting an output (of type OUT) froman accumulator. We will see how this works in the example below.

    Same as with ReduceFunction, Flink will incrementally aggregate input elements of a window as theyarrive.

    An AggregateFunction can be defined and used like this:

    1. /**
    2. * The accumulator is used to keep a running sum and a count. The {@code getResult} method
    3. * computes the average.
    4. */
    5. private static class AverageAggregate
    6. implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
    7. @Override
    8. public Tuple2<Long, Long> createAccumulator() {
    9. return new Tuple2<>(0L, 0L);
    10. }
    11. @Override
    12. public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    13. return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
    14. }
    15. @Override
    16. public Double getResult(Tuple2<Long, Long> accumulator) {
    17. return ((double) accumulator.f0) / accumulator.f1;
    18. }
    19. @Override
    20. public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    21. return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
    22. }
    23. }
    24. DataStream<Tuple2<String, Long>> input = ...;
    25. input
    26. .keyBy(<key selector>)
    27. .window(<window assigner>)
    28. .aggregate(new AverageAggregate());
    1. /**
    2. * The accumulator is used to keep a running sum and a count. The [getResult] method
    3. * computes the average.
    4. */
    5. class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
    6. override def createAccumulator() = (0L, 0L)
    7. override def add(value: (String, Long), accumulator: (Long, Long)) =
    8. (accumulator._1 + value._2, accumulator._2 + 1L)
    9. override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
    10. override def merge(a: (Long, Long), b: (Long, Long)) =
    11. (a._1 + b._1, a._2 + b._2)
    12. }
    13. val input: DataStream[(String, Long)] = ...
    14. input
    15. .keyBy(<key selector>)
    16. .window(<window assigner>)
    17. .aggregate(new AverageAggregate)

    The above example computes the average of the second field of the elements in the window.

    FoldFunction

    A FoldFunction specifies how an input element of the window is combined with an element ofthe output type. The FoldFunction is incrementally called for each element that is addedto the window and the current output value. The first element is combined with a pre-defined initial value of the output type.

    A FoldFunction can be defined and used like this:

    1. DataStream<Tuple2<String, Long>> input = ...;
    2. input
    3. .keyBy(<key selector>)
    4. .window(<window assigner>)
    5. .fold("", new FoldFunction<Tuple2<String, Long>, String>> {
    6. public String fold(String acc, Tuple2<String, Long> value) {
    7. return acc + value.f1;
    8. }
    9. });
    1. val input: DataStream[(String, Long)] = ...
    2. input
    3. .keyBy(<key selector>)
    4. .window(<window assigner>)
    5. .fold("") { (acc, v) => acc + v._2 }

    The above example appends all input Long values to an initially empty String.

    Attention fold() cannot be used with session windows or other mergeable windows.

    ProcessWindowFunction

    A ProcessWindowFunction gets an Iterable containing all the elements of the window, and a Contextobject with access to time and state information, which enables it to provide more flexibility thanother window functions. This comes at the cost of performance and resource consumption, becauseelements cannot be incrementally aggregated but instead need to be buffered internally until thewindow is considered ready for processing.

    The signature of ProcessWindowFunction looks as follows:

    1. public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
    2. /**
    3. * Evaluates the window and outputs none or several elements.
    4. *
    5. * @param key The key for which this window is evaluated.
    6. * @param context The context in which the window is being evaluated.
    7. * @param elements The elements in the window being evaluated.
    8. * @param out A collector for emitting elements.
    9. *
    10. * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    11. */
    12. public abstract void process(
    13. KEY key,
    14. Context context,
    15. Iterable<IN> elements,
    16. Collector<OUT> out) throws Exception;
    17. /**
    18. * The context holding window metadata.
    19. */
    20. public abstract class Context implements java.io.Serializable {
    21. /**
    22. * Returns the window that is being evaluated.
    23. */
    24. public abstract W window();
    25. /** Returns the current processing time. */
    26. public abstract long currentProcessingTime();
    27. /** Returns the current event-time watermark. */
    28. public abstract long currentWatermark();
    29. /**
    30. * State accessor for per-key and per-window state.
    31. *
    32. * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
    33. * by implementing {@link ProcessWindowFunction#clear(Context)}.
    34. */
    35. public abstract KeyedStateStore windowState();
    36. /**
    37. * State accessor for per-key global state.
    38. */
    39. public abstract KeyedStateStore globalState();
    40. }
    41. }
    1. abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
    2. /**
    3. * Evaluates the window and outputs none or several elements.
    4. *
    5. * @param key The key for which this window is evaluated.
    6. * @param context The context in which the window is being evaluated.
    7. * @param elements The elements in the window being evaluated.
    8. * @param out A collector for emitting elements.
    9. * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    10. */
    11. def process(
    12. key: KEY,
    13. context: Context,
    14. elements: Iterable[IN],
    15. out: Collector[OUT])
    16. /**
    17. * The context holding window metadata
    18. */
    19. abstract class Context {
    20. /**
    21. * Returns the window that is being evaluated.
    22. */
    23. def window: W
    24. /**
    25. * Returns the current processing time.
    26. */
    27. def currentProcessingTime: Long
    28. /**
    29. * Returns the current event-time watermark.
    30. */
    31. def currentWatermark: Long
    32. /**
    33. * State accessor for per-key and per-window state.
    34. */
    35. def windowState: KeyedStateStore
    36. /**
    37. * State accessor for per-key global state.
    38. */
    39. def globalState: KeyedStateStore
    40. }
    41. }

    Note The key parameter is the key that is extractedvia the KeySelector that was specified for the keyBy() invocation. In case of tuple-indexkeys or string-field references this key type is always Tuple and you have to manually castit to a tuple of the correct size to extract the key fields.

    A ProcessWindowFunction can be defined and used like this:

    1. DataStream<Tuple2<String, Long>> input = ...;
    2. input
    3. .keyBy(t -> t.f0)
    4. .timeWindow(Time.minutes(5))
    5. .process(new MyProcessWindowFunction());
    6. /* ... */
    7. public class MyProcessWindowFunction
    8. extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
    9. @Override
    10. public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    11. long count = 0;
    12. for (Tuple2<String, Long> in: input) {
    13. count++;
    14. }
    15. out.collect("Window: " + context.window() + "count: " + count);
    16. }
    17. }
    1. val input: DataStream[(String, Long)] = ...
    2. input
    3. .keyBy(_._1)
    4. .timeWindow(Time.minutes(5))
    5. .process(new MyProcessWindowFunction())
    6. /* ... */
    7. class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
    8. def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
    9. var count = 0L
    10. for (in <- input) {
    11. count = count + 1
    12. }
    13. out.collect(s"Window ${context.window} count: $count")
    14. }
    15. }

    The example shows a ProcessWindowFunction that counts the elements in a window. In addition, the window function adds information about the window to the output.

    Attention Note that using ProcessWindowFunction for simple aggregates such as count is quite inefficient. The next section shows how a ReduceFunction or AggregateFunction can be combined with a ProcessWindowFunction to get both incremental aggregation and the added information of a ProcessWindowFunction.

    ProcessWindowFunction with Incremental Aggregation

    A ProcessWindowFunction can be combined with either a ReduceFunction, an AggregateFunction, or a FoldFunction toincrementally aggregate elements as they arrive in the window.When the window is closed, the ProcessWindowFunction will be provided with the aggregated result.This allows it to incrementally compute windows while having access to theadditional window meta information of the ProcessWindowFunction.

    Note You can also use the legacy WindowFunction instead ofProcessWindowFunction for incremental window aggregation.

    Incremental Window Aggregation with ReduceFunction

    The following example shows how an incremental ReduceFunction can be combined witha ProcessWindowFunction to return the smallest event in a window alongwith the start time of the window.

    1. DataStream<SensorReading> input = ...;
    2. input
    3. .keyBy(<key selector>)
    4. .timeWindow(<duration>)
    5. .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
    6. // Function definitions
    7. private static class MyReduceFunction implements ReduceFunction<SensorReading> {
    8. public SensorReading reduce(SensorReading r1, SensorReading r2) {
    9. return r1.value() > r2.value() ? r2 : r1;
    10. }
    11. }
    12. private static class MyProcessWindowFunction
    13. extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
    14. public void process(String key,
    15. Context context,
    16. Iterable<SensorReading> minReadings,
    17. Collector<Tuple2<Long, SensorReading>> out) {
    18. SensorReading min = minReadings.iterator().next();
    19. out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
    20. }
    21. }
    1. val input: DataStream[SensorReading] = ...
    2. input
    3. .keyBy(<key selector>)
    4. .timeWindow(<duration>)
    5. .reduce(
    6. (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
    7. ( key: String,
    8. context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,
    9. minReadings: Iterable[SensorReading],
    10. out: Collector[(Long, SensorReading)] ) =>
    11. {
    12. val min = minReadings.iterator.next()
    13. out.collect((context.window.getStart, min))
    14. }
    15. )

    Incremental Window Aggregation with AggregateFunction

    The following example shows how an incremental AggregateFunction can be combined witha ProcessWindowFunction to compute the average and also emit the key and window along withthe average.

    1. DataStream<Tuple2<String, Long>> input = ...;
    2. input
    3. .keyBy(<key selector>)
    4. .timeWindow(<duration>)
    5. .aggregate(new AverageAggregate(), new MyProcessWindowFunction());
    6. // Function definitions
    7. /**
    8. * The accumulator is used to keep a running sum and a count. The {@code getResult} method
    9. * computes the average.
    10. */
    11. private static class AverageAggregate
    12. implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
    13. @Override
    14. public Tuple2<Long, Long> createAccumulator() {
    15. return new Tuple2<>(0L, 0L);
    16. }
    17. @Override
    18. public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    19. return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
    20. }
    21. @Override
    22. public Double getResult(Tuple2<Long, Long> accumulator) {
    23. return ((double) accumulator.f0) / accumulator.f1;
    24. }
    25. @Override
    26. public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    27. return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
    28. }
    29. }
    30. private static class MyProcessWindowFunction
    31. extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
    32. public void process(String key,
    33. Context context,
    34. Iterable<Double> averages,
    35. Collector<Tuple2<String, Double>> out) {
    36. Double average = averages.iterator().next();
    37. out.collect(new Tuple2<>(key, average));
    38. }
    39. }
    1. val input: DataStream[(String, Long)] = ...
    2. input
    3. .keyBy(<key selector>)
    4. .timeWindow(<duration>)
    5. .aggregate(new AverageAggregate(), new MyProcessWindowFunction())
    6. // Function definitions
    7. /**
    8. * The accumulator is used to keep a running sum and a count. The [getResult] method
    9. * computes the average.
    10. */
    11. class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
    12. override def createAccumulator() = (0L, 0L)
    13. override def add(value: (String, Long), accumulator: (Long, Long)) =
    14. (accumulator._1 + value._2, accumulator._2 + 1L)
    15. override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
    16. override def merge(a: (Long, Long), b: (Long, Long)) =
    17. (a._1 + b._1, a._2 + b._2)
    18. }
    19. class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {
    20. def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]): () = {
    21. val average = averages.iterator.next()
    22. out.collect((key, average))
    23. }
    24. }

    Incremental Window Aggregation with FoldFunction

    The following example shows how an incremental FoldFunction can be combined witha ProcessWindowFunction to extract the number of events in the window and return alsothe key and end time of the window.

    1. DataStream<SensorReading> input = ...;
    2. input
    3. .keyBy(<key selector>)
    4. .timeWindow(<duration>)
    5. .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
    6. // Function definitions
    7. private static class MyFoldFunction
    8. implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
    9. public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
    10. Integer cur = acc.getField(2);
    11. acc.setField(cur + 1, 2);
    12. return acc;
    13. }
    14. }
    15. private static class MyProcessWindowFunction
    16. extends ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
    17. public void process(String key,
    18. Context context,
    19. Iterable<Tuple3<String, Long, Integer>> counts,
    20. Collector<Tuple3<String, Long, Integer>> out) {
    21. Integer count = counts.iterator().next().getField(2);
    22. out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
    23. }
    24. }
    1. val input: DataStream[SensorReading] = ...
    2. input
    3. .keyBy(<key selector>)
    4. .timeWindow(<duration>)
    5. .fold (
    6. ("", 0L, 0),
    7. (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
    8. ( key: String,
    9. window: TimeWindow,
    10. counts: Iterable[(String, Long, Int)],
    11. out: Collector[(String, Long, Int)] ) =>
    12. {
    13. val count = counts.iterator.next()
    14. out.collect((key, window.getEnd, count._3))
    15. }
    16. )

    Using per-window state in ProcessWindowFunction

    In addition to accessing keyed state (as any rich function can) a ProcessWindowFunction canalso use keyed state that is scoped to the window that the function is currently processing. In thiscontext it is important to understand what the window that per-window state is referring to is.There are different “windows” involved:

    • The window that was defined when specifying the windowed operation: This might be tumbling windows of 1 hour or sliding windows of 2 hours that slide by 1 hour.
    • An actual instance of a defined window for a given key: This might be time window from 12:00 to 13:00 for user-id xyz. This is based on the window definition and there will be many windows based on the number of keys that the job is currently processing and based on what time slots the events fall into.

    Per-window state is tied to the latter of those two. Meaning that if we process events for 1000different keys and events for all of them currently fall into the [12:00, 13:00) time windowthen there will be 1000 window instances that each have their own keyed per-window state.

    There are two methods on the Context object that a process() invocation receives that allowaccess to the two types of state:

    • globalState(), which allows access to keyed state that is not scoped to a window
    • windowState(), which allows access to keyed state that is also scoped to the window

    This feature is helpful if you anticipate multiple firing for the same window, as can happen whenyou have late firings for data that arrives late or when you have a custom trigger that doesspeculative early firings. In such a case you would store information about previous firings orthe number of firings in per-window state.

    When using windowed state it is important to also clean up that state when a window is cleared. Thisshould happen in the clear() method.

    WindowFunction (Legacy)

    In some places where a ProcessWindowFunction can be used you can also use a WindowFunction. Thisis an older version of ProcessWindowFunction that provides less contextual information and doesnot have some advances features, such as per-window keyed state. This interface will be deprecatedat some point.

    The signature of a WindowFunction looks as follows:

    1. public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
    2. /**
    3. * Evaluates the window and outputs none or several elements.
    4. *
    5. * @param key The key for which this window is evaluated.
    6. * @param window The window that is being evaluated.
    7. * @param input The elements in the window being evaluated.
    8. * @param out A collector for emitting elements.
    9. *
    10. * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    11. */
    12. void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
    13. }
    1. trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
    2. /**
    3. * Evaluates the window and outputs none or several elements.
    4. *
    5. * @param key The key for which this window is evaluated.
    6. * @param window The window that is being evaluated.
    7. * @param input The elements in the window being evaluated.
    8. * @param out A collector for emitting elements.
    9. * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    10. */
    11. def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
    12. }

    It can be used like this:

    1. DataStream<Tuple2<String, Long>> input = ...;
    2. input
    3. .keyBy(<key selector>)
    4. .window(<window assigner>)
    5. .apply(new MyWindowFunction());
    1. val input: DataStream[(String, Long)] = ...
    2. input
    3. .keyBy(<key selector>)
    4. .window(<window assigner>)
    5. .apply(new MyWindowFunction())

    Triggers

    A Trigger determines when a window (as formed by the window assigner) is ready to beprocessed by the window function. Each WindowAssigner comes with a default Trigger.If the default trigger does not fit your needs, you can specify a custom trigger using trigger(…).

    The trigger interface has five methods that allow a Trigger to react to different events:

    • The onElement() method is called for each element that is added to a window.
    • The onEventTime() method is called when a registered event-time timer fires.
    • The onProcessingTime() method is called when a registered processing-time timer fires.
    • The onMerge() method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
    • Finally the clear() method performs any action needed upon removal of the corresponding window.

    Two things to notice about the above methods are:

    1) The first three decide how to act on their invocation event by returning a TriggerResult. The action can be one of the following:

    • CONTINUE: do nothing,
    • FIRE: trigger the computation,
    • PURGE: clear the elements in the window, and
    • FIRE_AND_PURGE: trigger the computation and clear the elements in the window afterwards.

    2) Any of these methods can be used to register processing- or event-time timers for future actions.

    Fire and Purge

    Once a trigger determines that a window is ready for processing, it fires, i.e., it returns FIRE or FIRE_AND_PURGE. This is the signal for the window operatorto emit the result of the current window. Given a window with a ProcessWindowFunctionall elements are passed to the ProcessWindowFunction (possibly after passing them to an evictor).Windows with ReduceFunction, AggregateFunction, or FoldFunction simply emit their eagerly aggregated result.

    When a trigger fires, it can either FIRE or FIRE_AND_PURGE. While FIRE keeps the contents of the window, FIRE_AND_PURGE removes its content.By default, the pre-implemented triggers simply FIRE without purging the window state.

    Attention Purging will simply remove the contents of the window and will leave any potential meta-information about the window and any trigger state intact.

    Default Triggers of WindowAssigners

    The default Trigger of a WindowAssigner is appropriate for many use cases. For example, all the event-time window assigners have an EventTimeTrigger asdefault trigger. This trigger simply fires once the watermark passes the end of a window.

    Attention The default trigger of the GlobalWindow is the NeverTrigger which does never fire. Consequently, you always have to define a custom trigger when using a GlobalWindow.

    Attention By specifying a trigger using trigger() youare overwriting the default trigger of a WindowAssigner. For example, if you specify aCountTrigger for TumblingEventTimeWindows you will no longer get window firings based on theprogress of time but only by count. Right now, you have to write your own custom trigger ifyou want to react based on both time and count.

    Built-in and Custom Triggers

    Flink comes with a few built-in triggers.

    • The (already mentioned) EventTimeTrigger fires based on the progress of event-time as measured by watermarks.
    • The ProcessingTimeTrigger fires based on processing time.
    • The CountTrigger fires once the number of elements in a window exceeds the given limit.
    • The PurgingTrigger takes as argument another trigger and transforms it into a purging one.

    If you need to implement a custom trigger, you should check out the abstractTrigger class.Please note that the API is still evolving and might change in future versions of Flink.

    Evictors

    Flink’s windowing model allows specifying an optional Evictor in addition to the WindowAssigner and the Trigger.This can be done using the evictor(…) method (shown in the beginning of this document). The evictor has the abilityto remove elements from a window after the trigger fires and before and/or after the window function is applied.To do so, the Evictor interface has two methods:

    1. /**
    2. * Optionally evicts elements. Called before windowing function.
    3. *
    4. * @param elements The elements currently in the pane.
    5. * @param size The current number of elements in the pane.
    6. * @param window The {@link Window}
    7. * @param evictorContext The context for the Evictor
    8. */
    9. void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
    10. /**
    11. * Optionally evicts elements. Called after windowing function.
    12. *
    13. * @param elements The elements currently in the pane.
    14. * @param size The current number of elements in the pane.
    15. * @param window The {@link Window}
    16. * @param evictorContext The context for the Evictor
    17. */
    18. void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

    The evictBefore() contains the eviction logic to be applied before the window function, while the evictAfter()contains the one to be applied after the window function. Elements evicted before the application of the windowfunction will not be processed by it.

    Flink comes with three pre-implemented evictors. These are:

    • CountEvictor: keeps up to a user-specified number of elements from the window and discards the remaining ones fromthe beginning of the window buffer.
    • DeltaEvictor: takes a DeltaFunction and a threshold, computes the delta between the last element in thewindow buffer and each of the remaining ones, and removes the ones with a delta greater or equal to the threshold.
    • TimeEvictor: takes as argument an interval in milliseconds and for a given window, it finds the maximumtimestamp max_ts among its elements and removes all the elements with timestamps smaller than max_ts - interval.

    Default By default, all the pre-implemented evictors apply their logic before thewindow function.

    Attention Specifying an evictor prevents any pre-aggregation, as all theelements of a window have to be passed to the evictor before applying the computation.

    Attention Flink provides no guarantees about the order of the elements withina window. This implies that although an evictor may remove elements from the beginning of the window, these are notnecessarily the ones that arrive first or last.

    Allowed Lateness

    When working with event-time windowing, it can happen that elements arrive late, i.e. the watermark that Flink uses tokeep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. Seeevent time and especially late elements for a more thoroughdiscussion of how Flink deals with event time.

    By default, late elements are dropped when the watermark is past the end of the window. However,Flink allows to specify a maximum allowed lateness for window operators. Allowed latenessspecifies by how much time elements can be late before they are dropped, and its default value is 0.Elements that arrive after the watermark has passed the end of the window but before it passes the end ofthe window plus the allowed lateness, are still added to the window. Depending on the trigger used,a late but not dropped element may cause the window to fire again. This is the case for the EventTimeTrigger.

    In order to make this work, Flink keeps the state of windows until their allowed lateness expires. Once this happens, Flink removes the window and deletes its state, asalso described in the Window Lifecycle section.

    Default By default, the allowed lateness is set to0. That is, elements that arrive behind the watermark will be dropped.

    You can specify an allowed lateness like this:

    1. DataStream<T> input = ...;
    2. input
    3. .keyBy(<key selector>)
    4. .window(<window assigner>)
    5. .allowedLateness(<time>)
    6. .<windowed transformation>(<window function>);
    1. val input: DataStream[T] = ...
    2. input
    3. .keyBy(<key selector>)
    4. .window(<window assigner>)
    5. .allowedLateness(<time>)
    6. .<windowed transformation>(<window function>)

    Note When using the GlobalWindows window assigner nodata is ever considered late because the end timestamp of the global window is Long.MAX_VALUE.

    Getting late data as a side output

    Using Flink’s side output feature you can get a stream of the datathat was discarded as late.

    You first need to specify that you want to get late data using sideOutputLateData(OutputTag) onthe windowed stream. Then, you can get the side-output stream on the result of the windowedoperation:

    1. final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
    2. DataStream<T> input = ...;
    3. SingleOutputStreamOperator<T> result = input
    4. .keyBy(<key selector>)
    5. .window(<window assigner>)
    6. .allowedLateness(<time>)
    7. .sideOutputLateData(lateOutputTag)
    8. .<windowed transformation>(<window function>);
    9. DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
    1. val lateOutputTag = OutputTag[T]("late-data")
    2. val input: DataStream[T] = ...
    3. val result = input
    4. .keyBy(<key selector>)
    5. .window(<window assigner>)
    6. .allowedLateness(<time>)
    7. .sideOutputLateData(lateOutputTag)
    8. .<windowed transformation>(<window function>)
    9. val lateStream = result.getSideOutput(lateOutputTag)

    Late elements considerations

    When specifying an allowed lateness greater than 0, the window along with its content is kept after the watermark passesthe end of the window. In these cases, when a late but not dropped element arrives, it could trigger another firing for thewindow. These firings are called late firings, as they are triggered by late events and in contrast to the main firingwhich is the first firing of the window. In case of session windows, late firings can further lead to merging of windows,as they may “bridge” the gap between two pre-existing, unmerged windows.

    Attention You should be aware that the elements emitted by a late firing should be treated as updated results of a previous computation, i.e., your data stream will contain multiple results for the same computation. Depending on your application, you need to take these duplicated results into account or deduplicate them.

    Working with window results

    The result of a windowed operation is again a DataStream, no information about the windowedoperations is retained in the result elements so if you want to keep meta-information about thewindow you have to manually encode that information in the result elements in yourProcessWindowFunction. The only relevant information that is set on the result elements is theelement timestamp. This is set to the maximum allowed timestamp of the processed window, whichis end timestamp - 1, since the window-end timestamp is exclusive. Note that this is true for bothevent-time windows and processing-time windows. i.e. after a windowed operations elements alwayshave a timestamp, but this can be an event-time timestamp or a processing-time timestamp. Forprocessing-time windows this has no special implications but for event-time windows this togetherwith how watermarks interact with windows enablesconsecutive windowed operations with the same window sizes. Wewill cover this after taking a look how watermarks interact with windows.

    Interaction of watermarks and windows

    Before continuing in this section you might want to take a look at our section aboutevent time and watermarks.

    When watermarks arrive at the window operator this triggers two things:

    • the watermark triggers computation of all windows where the maximum timestamp (which is end-timestamp - 1) is smaller than the new watermark
    • the watermark is forwarded (as is) to downstream operations

    Intuitively, a watermark “flushes” out any windows that would be considered late in downstreamoperations once they receive that watermark.

    Consecutive windowed operations

    As mentioned before, the way the timestamp of windowed results is computed and how watermarksinteract with windows allows stringing together consecutive windowed operations. This can be usefulwhen you want to do two consecutive windowed operations where you want to use different keys butstill want elements from the same upstream window to end up in the same downstream window. Considerthis example:

    1. DataStream<Integer> input = ...;
    2. DataStream<Integer> resultsPerKey = input
    3. .keyBy(<key selector>)
    4. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    5. .reduce(new Summer());
    6. DataStream<Integer> globalResults = resultsPerKey
    7. .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    8. .process(new TopKWindowFunction());
    1. val input: DataStream[Int] = ...
    2. val resultsPerKey = input
    3. .keyBy(<key selector>)
    4. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    5. .reduce(new Summer())
    6. val globalResults = resultsPerKey
    7. .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    8. .process(new TopKWindowFunction())

    In this example, the results for time window [0, 5) from the first operation will also end up intime window [0, 5) in the subsequent windowed operation. This allows calculating a sum per keyand then calculating the top-k elements within the same window in the second operation.

    Useful state size considerations

    Windows can be defined over long periods of time (such as days, weeks, or months) and therefore accumulate very large state. There are a couple of rules to keep in mind when estimating the storage requirements of your windowing computation:

    • Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly one window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the Window Assigners section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.

    • ReduceFunction, AggregateFunction, and FoldFunction can significantly reduce the storage requirements, as they eagerly aggregate elements and store only one value per window. In contrast, just using a ProcessWindowFunction requires accumulating all elements.

    • Using an Evictor prevents any pre-aggregation, as all the elements of a window have to be passed through the evictor before applying the computation (see Evictors).