Flink连接Kafka的实现原理

tammypi tammypi
2019-01-10 17:54
770
0

Apache Flink Streaming支持多种数据源,第三方连接器列表如下:

  • Apache Kafka
  • Amazon Kinesis Streams
  • RabbitMQ
  • Apache NiFi
  • Twitter Streaming API

 

其中Kafka作为一个十分流行的消息发布订阅中间件,被广泛的运用在各个项目中。使用Flink连接Kafka作为源的代码如下:

FlinkKafkaConsumerBase source = new FlinkKafkaConsumer010(topics, new AbstractDeserializationSchema() {
			@Override
			public byte[] deserialize(byte[] bytes) throws IOException {
				return bytes;
			}
		}, properties).setStartFromLatest();
DataStream stream = env
				.addSource(source)
                .......

接收的第一个参数是需要消费的Kafka的topic,如果topic有多个,则以列表(List)的形式传入。

第二个参数是反序列化函数,Flink的官方文档示例里给的是new SimpleStringSchema()用于处理字符串类型的消息。但是,如果消费的topic有多个,且有的topic是字符串类型的数据,而有的topic是二进制类型的数据,那么推荐用那么推荐用AbstractDeserializationSchema进行处理。如果直接用SimpleStringSchema处理,会存在一定的问题(比如将string getBytes()拿到的不再是原先的二进制数据)。

第三个参数接受的配置,因为本文示例中指定了每次启动时都使用kafka内的最新消息,所以无需传入groupid,只需要将brokerList的配置传入即可。properites的初始化方法如下:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", brokerList);

被Flink add的数据源,默认的并行度为1。如果要让数据源支持并行,需要让数据源继承org.apache.flink.streaming.api.functions.source.ParallelSourceFunction或者org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction。

我们看一下FlinkKafkaConsumer010的实现,它是继承的FlinkKafkaConsumer09。而FlinkKafkaConsumer09继承FlinkKafkaConsumerBase,看起来应该是Kafka消费类的基类。而FlinkKafkaConsumerBase是继承了RichParallelSourceFunction的,这样就保证了并行性(没有什么实际的方法需要实现,仅仅是标志该类可以并行的被执行)。

Flink实际是根据传入的source方法,构造了一个StreamSource实例,并将其作为DataStreamSource实例的第三个参数初始化了一个DataStreamSource。在后续对于DataStream进行各种map动作时,实际是构造了DataStream的子类,并将其通过addOperator方法,加入envrionment里。

那么,是什么来触发Flink程序的真正执行呢?奥秘就在于代码里必须要调用的env.execute()方法。如果你写的Streaming程序的话,那么实际触发的就是StreamExecutionEnvironment的execute方法,它实际触发了整个StreamGraph的生成以及Job的提交。而StreamGraph则是根据之前的多次transform动作addOperator得到的transformations列表(List)生成。

而Kafka源的tranformation由于会命中instanceof SourceTransformation 的判断,从而进入transformSource方法(StreamGraphGenerator.java中)。

/**
 * Transforms a {@code SourceTransformation}.
 */
private  Collection transformSource(SourceTransformation source) {
    String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());

    streamGraph.addSource(source.getId(),
            slotSharingGroup,
            source.getCoLocationGroupKey(),
            source.getOperator(),
            null,
            source.getOutputType(),
            "Source: " + source.getName());
    if (source.getOperator().getUserFunction() instanceof InputFormatSourceFunction) {
        InputFormatSourceFunction fs = (InputFormatSourceFunction) source.getOperator().getUserFunction();
        streamGraph.setInputFormat(source.getId(), fs.getFormat());
    }
    streamGraph.setParallelism(source.getId(), source.getParallelism());
    streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());
    return Collections.singleton(source.getId());
}

可以看到streamGraph会设置并行度,而并行度来自于传入的数据(-p指定并行度)。StreamGraph会生成JobGraph,并且将并行度传入JobVertex(setParallelism方法),生成并行的task进行执行。所以,如果设定的并行度为10,实际上就会启动10个FlinkKafkaConsumer实例。

那么每一个FlinkKafkaConsumer实例是如何获取到自己需要消费的topic以及partition呢?Flink会对于所有的topic分区(FlinkKafkaConsumer初始化时指定的那些),对于并行task的数目进行求余,如果余数==当前的taskid,那么该task就消费该分区。

AbstractPartitionDiscoverer.java:

public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
		if (isUndiscoveredPartition(partition)) {
			discoveredPartitions.add(partition);

			return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
		}

		return false;
	}

KafkaTopicPartitionAssigner.java:

public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
    int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

    // here, the assumption is that the id of Kafka partitions are always ascending
    // starting from 0, and therefore can be used directly as the offset clockwise from the start index
    return (startIndex + partition.getPartition()) % numParallelSubtasks;
}

所以每个FlinkKafkaConsumer在运行时就已经知道自己要消费哪些topic的那些partition了。在FlinkKafkaConsumer010里会初始化Kafka010Fetcher,而从Kafka里读取数据实际就是由Kafka010Fetcher实现的(具体实现在其父类Kafka09Fetcher里),具体方法为runFetchLoop。

在runFetchLoop()方法里,KafkaFetcher会启动consumerThread(KafkaConsumerThread)。KafkaConsumerThread会初始化KafkaConsumer(这个已经是Kafka API中的类),并且拿到kafka partitions,并且将这些partitions传给KafkaConsumer,再使用KafkaConsumer的poll方法拿到Kafka里的数据,再将数据放入Handover。

Handover可以认为是KafkaConsumer和KafkaFetcher的中间数据通道,KafkaConsumer负责将Kafka里的数据放入handover里,而KafkaFetcher则是从handover里取数据。

 

结论:

1.Flink会根据设定的并行度启动相应多的task,那么对于FlinkKafkaConsumer来说就是跟并行度数量一致的FlinkKafkaConsumer实例。而每个FlinkKafkaConsumer实例消费的topic和partition则是根据探测到的所有指定topic分区对于并行数量取余数拿到。

公式:

(startIndex+parttion.getPartition())%numParallerSubtasks == currentTaskId


2.具体的消费Kafka里的数据,还是用的Kafka的API KafkaConsumer里的epoll方法实现

3.Flink里的所有transform动作,被作为DataStream加入envrionment里。而最终的env.execute()是触发StreamGraph生成,以及最终JobGraph的生成及执行的关键。

发表评论

验证码: