DataStream Examples

The following example programs showcase different applications of Flink from simple word counting to graph algorithms. The code samples illustrate the use of Flink’s DataStream API.

The full source code of the following and more examples can be found in the flink-examples-streaming module of the Flink source repository.

Running an example

In order to run a Flink example, we assume you have a running Flink instance available. The “Quickstart” and “Setup” tabs in the navigation describe various ways of starting Flink.

The easiest way is running the ./bin/start-cluster.sh, which by default starts a local cluster with one JobManager and one TaskManager.

Each binary release of Flink contains an examples directory with jar files for each of the examples on this page.

To run the WordCount example, issue the following command:

$ ./bin/flink run ./examples/streaming/WordCount.jar

The other examples can be started in a similar way.

Note that many examples run without passing any arguments for them, by using build-in data. To run WordCount with real data, you have to pass the path to the data:

$ ./bin/flink run ./examples/streaming/WordCount.jar --input /path/to/some/text/data --output /path/to/result

Note that non-local file systems require a schema prefix, such as hdfs://.

Word Count

WordCount is the “Hello World” of Big Data processing systems. It computes the frequency of words in a text collection. The algorithm works in two steps: First, the texts are splits the text to individual words. Second, the words are grouped and counted.

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// get input data
DataStream<String> text = env.readTextFile(params.get("input"));;

DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism)
    // group by the tuple field "0" and sum up tuple field "1"
    .keyBy(0).sum(1).setParallelism(parallelism);

counts.writeAsText(params.get("output"));

// execute program
env.execute("Streaming WordCount");


///////////////////////////////////////////////////////////////////////////////////////
// User-defined functions
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
    private static final long serialVersionUID = 1L;

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
            throws Exception {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");

        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<String, Integer>(token, 1));
            }
        }
    }
}

The WordCount example implements the above described algorithm with input parameters: --input <path> --output <path>. As test data, any text file will do.

// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// read the text file from given input path
val text = env.readTextFile(params.get("input"))

// split up the lines in pairs (2-tuples) containing: (word,1)
val counts: DataStream[(String, Int)] = text
    // split up the lines in pairs (2-tuples) containing: (word,1)
    .flatMap(_.toLowerCase.split("\\W+"))
    .filter(_.nonEmpty)
    .map((_, 1))
    // group by the tuple field "0" and sum up tuple field "1"
    .keyBy(0)
    .sum(1)
    
counts.writeAsText(params.get("output"))

// execute program
env.execute("Streaming WordCount")    

The WordCount example implements the above described algorithm with input parameters: --input <path> --output <path>. As test data, any text file will do.

To run the WordCount example, issue the following command:

$ ./bin/flink run ./examples/streaming/WordCount.jar

Stream SQL Example: WordCount run

Open the web: http://localhost:8081, and you can see the job was finished quickly.

SQL Example: WordCount web dashboard

Clink the job name: “Streaming WordCount”, and you can see the detailed info page:

SQL Example: WordCount web detail

And run the following command to see the result:

$ tail -f ./log/flink-*-taskexecutor*.out

Stream SQL Example: WordCount result

Socket Window Word Count

SocketWindowWordCount implements a streaming windowed version of the “WordCount” program.

This program connects to a server socket and reads strings from the socket. The easiest way to try this out is to open a text server (Take port 1234 as example)

The SocketWindowWordCount java program and SocketWindowWordCount scala program is the source code.

First, using the netcat tool via

$ nc -l 1234

If you get an error “Ncat: socket: Address family not supported by protocol QUITTING”, try the following command:

$ nc -l 0.0.0.0 1234

Then run this example with the hostname and the port as arguments.

$ ./bin/flink run ./examples/streaming/SocketWindowWordCount.jar --port 1234

Open the web: http://localhost:8081

SQL Example: WordCount web dashboard

Clink the job name: “Socket Window WordCount”, and you can see the detailed info page:

SQL Example: WordCount web detail

Then, you can input data in nc shell terminal:

$ nc -l 1234
hello flink hello world

SQL Example: SocketWordCount input

And run the following command to see the result:

$ tail -f ./log/flink-*-taskexecutor*.out

SQL Example: SocketWordCount output

Top Speed Windowing

TopSpeedWindowing is an example of grouped stream windowing where different eviction and trigger policies can be used. A source fetches events from cars every 100 msec containing their id, their current speed (kmh), overall elapsed distance (m) and a timestamp. The streaming example triggers the top speed of each car every x meters elapsed for the last y seconds.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Tuple4<Integer, Integer, Double, Long>> carData = env.readTextFile(params.get("input")).map(new ParseCarData());

DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
    .assignTimestampsAndWatermarks(new CarTimestamp())
    .keyBy(0)
    .window(GlobalWindows.create())
    .evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS)))
    .trigger(DeltaTrigger.of(triggerMeters,
            new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
                private static final long serialVersionUID = 1L;
    
                @Override
                public double getDelta(
                        Tuple4<Integer, Integer, Double, Long> oldDataPoint,
                        Tuple4<Integer, Integer, Double, Long> newDataPoint) {
                    return newDataPoint.f2 - oldDataPoint.f2;
                }
            }, carData.getType().createSerializer(env.getConfig())))
    .maxBy(1);

topSpeeds.writeAsText(params.get("output"));

// User-defined functions
private static class ParseCarData extends RichMapFunction<String, Tuple4<Integer, Integer, Double, Long>> {
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple4<Integer, Integer, Double, Long> map(String record) {
        String rawData = record.substring(1, record.length() - 1);
        String[] data = rawData.split(",");
        return new Tuple4<>(Integer.valueOf(data[0]), Integer.valueOf(data[1]), Double.valueOf(data[2]), Long.valueOf(data[3]));
    }
}

private static class CarTimestamp extends AscendingTimestampExtractor<Tuple4<Integer, Integer, Double, Long>> {
    private static final long serialVersionUID = 1L;

    @Override
    public long extractAscendingTimestamp(Tuple4<Integer, Integer, Double, Long> element) {
        return element.f3;
    }
}
	

The TopSpeedWindowing program implements the above example.

// User-defined types
case class Link(sourceId: Long, targetId: Long)
case class Page(pageId: Long, rank: Double)
case class AdjacencyList(sourceId: Long, targetIds: Array[Long])

// set up execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val cars = env.readTextFile(params.get("input"))
  .map(parseMap(_))
  .map(x => CarEvent(x._1, x._2, x._3, x._4))
 
val topSeed = cars
  .assignAscendingTimestamps( _.time )
  .keyBy("carId")
  .window(GlobalWindows.create)
  .evictor(TimeEvictor.of(Time.of(evictionSec * 1000, TimeUnit.MILLISECONDS)))
  .trigger(DeltaTrigger.of(triggerMeters, new DeltaFunction[CarEvent] {
    def getDelta(oldSp: CarEvent, newSp: CarEvent): Double = newSp.distance - oldSp.distance
  }, cars.getType().createSerializer(env.getConfig)))
  .maxBy("speed")

// emit result
topSeed.writeAsText(params.get("output"))

env.execute("TopSpeedWindowing")         

The TopSpeedWindowing program implements the above example.

Run the example

$ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

SQL Example: cartopspeedwindow run

Open the web: http://localhost:8081

SQL Example: cartopspeedwindow web dashboard

Clink the job name: “CarTopSpeedWindowingExample”, and you can see the detailed info page:

SQL Example: cartopspeedwindow web detail

And run the following command to see the continuous updated result:

$ tail -f ./log/flink-*-taskexecutor*.out

SQL Example: cartopspeedwindow result

Back to top