主要报错
【ClassNotFoundException: org.apache.kafka.clients.consumer.OffsetResetStrategy】
flink 版本 1.17.1
python 版本 3.10
demo 代码
import json from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, FlinkKafkaProducer from pyflink.common.serialization import SimpleStringSchema from pyflink.common.watermark_strategy import WatermarkStrategy from pyflink.common.typeinfo import Types from json import dumps if __name__ == '__main__': brokers = '172.18.98.96:9092' source_topic = "test1" # 源数据 sink_topic = "test3" # 结果 env = StreamExecutionEnvironment.get_execution_environment() env.set_runtime_mode(RuntimeExecutionMode.AUTOMATIC) env.add_jars("file:///home/demo/jar/flink-sql-connector-kafka-1.17.1.jar") # env.add_jars("file:///usr/local/lib/python3.10/dist-packages/lib/flink-sql-connector-kafka-1.17.1.jar") source = KafkaSource.builder() \ .set_bootstrap_servers(brokers) \ .set_topics(source_topic) \ .set_group_id("demo") \ .set_starting_offsets(KafkaOffsetsInitializer.latest()) \ .set_value_only_deserializer(SimpleStringSchema()) \ .build() ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source") def str_to_dict(data): json_data = json.loads(data) action=json_data.get('action') is_click=1 if action=='click' else 0 return json_data.get('name'), is_click def format_json(data): return json.dumps({'name':data[0] ,'click_num':data[1]},ensure_ascii=False) ds = ds.map(str_to_dict,output_type=Types.TUPLE([Types.STRING(), Types.INT()])) ds = ds.key_by(lambda x: x[0]).sum(1).map(format_json,output_type=Types.STRING()) serialization_schema = SimpleStringSchema() kafka_producer = FlinkKafkaProducer( topic=sink_topic, serialization_schema=serialization_schema, producer_config={'bootstrap.servers': brokers, 'group.id': 'my-group'}) ds.add_sink(kafka_producer) env.execute('demo')
本地运行 python demo.py 正常
提交flink 时 执行
flink run -m 172.19.98.96:8081 -py demo.py --jarfile /home/demo/jar/flink-sql-connector-kafka-1.17.1.jar
报错
root@a68045bb7b7a:/home/demo# flink run -m 172.19.98.96:8081 -py demo.py --jarfile /home/demo/jar/flink-sql-connector-kafka-1.17.1.jar Traceback (most recent call last): File "/home/demo/demo.py", line 22, in <module> source = KafkaSource.builder() \ File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/connectors/kafka.py", line 387, in builder File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/connectors/kafka.py", line 430, in __init__ File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.connector.kafka.source.KafkaSource.builder. : java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/OffsetResetStrategy at org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.earliest(OffsetsInitializer.java:147) at org.apache.flink.connector.kafka.source.KafkaSourceBuilder.<init>(KafkaSourceBuilder.java:106) at org.apache.flink.connector.kafka.source.KafkaSource.builder(KafkaSource.java:123) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.OffsetResetStrategy at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 14 more org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) Caused by: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130) ... 14 more
补充 jobmanager 类库
先说下我这里测试运行发现的可能的报错原因:
缺少org.apache.kafka.clients.consumer.OffsetResetStrategy类。猜测可能是因为你的Flink集群中缺少相关依赖项。
我的建议是将flink-sql-connector-kafka-1.17.1.jar上传到集群的lib目录中。使用这个命令
cp /home/demo/jar/flink-sql-connector-kafka-1.17.1.jar /opt/flink/lib/
将其复制到Flink的lib目录,重启后应该就可以解决这个环境问题。