文档首页> MapReduce服务 MRS> 开发指南(普通版_2.x及之前)> Flink应用开发> FAQ> 为什么非static的KafkaPartitioner类对象去构造FlinkKafkaProducer010,运行时会报错?
更新时间:2022-07-19 GMT+08:00

为什么非static的KafkaPartitioner类对象去构造FlinkKafkaProducer010,运行时会报错?

问题

Flink内核升级到1.3.0之后,当kafka调用带有非static的KafkaPartitioner类对象为参数的FlinkKafkaProducer010去构造函数时,运行时会报错。

报错内容如下:

org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaPartitioner is not serializable. The object probably contains or references non serializable fields.

回答

Flink的1.3.0版本,为了兼容原有那些使用KafkaPartitioner的API接口,如FlinkKafkaProducer010带KafkaPartitioner对象的构造函数,增加了FlinkKafkaDelegatePartitioner类。

该类定义了一个成员变量,即kafkaPartitioner:

private final KafkaPartitioner<T> kafkaPartitioner;

当Flink传入参数是KafkaPartitioner去构造FlinkKafkaProducer010时,调用栈如下:

FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner)
->  FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner)
---->  FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner)
------>  ClosureCleaner::clean(Object func, boolean checkSerializable)

首先使用KafkaPartitioner对象去构造一个FlinkKafkaDelegatePartitioner对象,然后再检查该对象是否可序列化。由于ClosureCleaner::clean函数是static函数,当用例中的KafkaPartitioner对象是非static时,ClosureCleaner::clean函数无法访问KafkaDelegatePartitioner类内的非static成员变量kafkaPartitioner,导致报错。

解决方法如下,两者任选其一:

  • 将KafkaPartitioner类改成static类。
  • 改用以FlinkKafkaPartitioner为参数的FlinkKafkaProducer010构造函数,内部实现不会去构造FlinkKafkaDelegatePartitioner,也就不会存在成员变量的问题。