0

0

Flink中JSON字符串到JSONObject转换的陷阱与最佳实践

霞舞

霞舞

发布时间:2025-09-25 12:02:19

|

507人浏览过

|

来源于php中文网

原创

Flink中JSON字符串到JSONObject转换的陷阱与最佳实践

在Apache Flink中处理JSON字符串时,开发者常遇到将String类型数据转换为JSONObject的需求。然而,直接使用JSONObject.parseObject()可能导致NullPointerException,即使字符串已正确解析。本文将揭示这一常见问题的原因,提供使用org.json库中new JSONObject(value)的正确解决方案,并强调出于性能和类型安全考虑,在生产环境中优先使用POJO进行JSON反序列化的最佳实践。

Flink中JSON字符串转换的常见问题

apache flink流处理应用中,从数据源(如kafka、文件等)获取的原始数据通常是json格式的字符串。为了进一步解析和处理这些结构化数据,我们通常需要将其转换为jsonobject对象。然而,许多开发者在尝试将string类型的数据通过processfunction或其他算子转换为jsonobject并收集输出时,可能会遇到如下异常:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: Assigned key must not be null!
Caused by: java.lang.NullPointerException: Assigned key must not be null!

尽管在调试过程中发现String已经成功解析成了JSONObject实例,但当尝试通过Collector收集这些对象时,作业却失败了。这表明问题并非出在JSON解析本身,而可能与JSONObject对象的特定实现、其与Flink内部序列化机制的兼容性,或者其内部状态有关。

原始的错误代码片段如下:

import com.alibaba.fastjson.JSONObject; // 假设使用了Fastjson
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkJsonProcessingIssue {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator jsonObjDS = inputDS.process(new ProcessFunction() {
            @Override
            public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception {
                // 尝试使用JSONObject.parseObject()
                JSONObject jsonObject = JSONObject.parseObject(value);
                out.collect(jsonObject); // 在这里抛出异常
            }
        });
        jsonObjDS.print();

        env.execute();
    }
}

解决方案:使用org.json库的JSONObject构造函数

解决上述NullPointerException的关键在于选择正确的JSONObject实现及其初始化方式。经过验证,使用org.json库提供的JSONObject,并通过其构造函数直接传入JSON字符串可以避免此问题。

首先,确保您的项目中引入了org.json库的依赖:


    org.json
    json
    20180130 

然后,在ProcessFunction中将JSONObject.parseObject(value)替换为new JSONObject(value):

import org.json.JSONObject; // 注意这里引入的是org.json.JSONObject
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkJsonProcessingSolution {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator jsonObjDS = inputDS.process(new ProcessFunction() {
            @Override
            public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception {
                // 使用org.json.JSONObject的构造函数
                JSONObject jsonObject = new JSONObject(value);
                out.collect(jsonObject); // 现在可以正常收集
            }
        });
        jsonObjDS.print();

        env.execute();
    }
}

此更改后,Flink作业将能够正常运行并打印出转换后的JSONObject。这表明org.json库的JSONObject实现与Flink的内部机制(尤其是其类型序列化器)兼容性更好,或者其内部状态在被序列化和反序列化时能够保持完整性,从而避免了NullPointerException。

最佳实践:优先使用POJO进行JSON反序列化

尽管上述方法能够解决String到JSONObject的转换问题,但在实际生产环境中,特别是在处理大量数据或对性能有较高要求的Flink应用中,直接传递和处理JSONObject通常不是最佳实践。

原因如下:

Moshi Chat
Moshi Chat

法国AI实验室Kyutai推出的端到端实时多模态AI语音模型,具备听、说、看的能力,不仅可以实时收听,还能进行自然对话。

下载
  1. 序列化与反序列化开销大: JSONObject是一个通用的Map结构,其内部字段类型不固定,这使得Flink在序列化和反序列化JSONObject时,需要进行更多的元数据处理和类型推断,导致额外的CPU和内存开销。相比之下,POJO(Plain Old Java Object)具有固定的结构和明确的字段类型,Flink可以利用Kyro等高效序列化器进行快速、紧凑的序列化。
  2. 缺乏类型安全: JSONObject的操作通常基于字符串键值对,容易出现拼写错误或类型转换错误,且这些错误通常在运行时才能发现。而POJO提供了编译时类型检查,能够有效减少运行时错误。
  3. 可读性和可维护性差: 使用JSONObject意味着需要通过getString("key")、getInt("key")等方法手动提取字段,代码冗长且不易阅读。POJO则允许直接通过属性访问数据,代码更简洁、更具可读性。
  4. Schema演进: 随着业务发展,JSON数据的Schema可能会发生变化。POJO可以更优雅地处理Schema演进,例如通过添加新字段或使用@JsonIgnoreProperties(ignoreUnknown = true)注解忽略未知字段。

推荐做法:将JSON字符串反序列化为POJO

在Flink中,最佳实践是将JSON字符串反序列化为定义好的POJO类。这通常通过自定义DeserializationSchema或使用Flink提供的JSON格式(如JsonRowSerializationSchema)来实现。

例如,对于上述JSON数据,我们可以定义一个POJO类:

import java.io.Serializable;

public class BillInfo implements Serializable {
    private String ADD_TIME;
    private String ORDER_ID;
    private String ADDER_NO;
    private String UPDATER_NO;
    private String S_USER_ID;
    private String B_USER_ID;
    private String BILL_ID;
    private String ADDER_NAME;
    private String UPDATE_TIME;
    private String UPDATER_NAME;

    // 必须有无参构造函数
    public BillInfo() {}

    // Getter和Setter方法
    public String getADD_TIME() { return ADD_TIME; }
    public void setADD_TIME(String ADD_TIME) { this.ADD_TIME = ADD_TIME; }
    public String getORDER_ID() { return ORDER_ID; }
    public void setORDER_ID(String ORDER_ID) { this.ORDER_ID = ORDER_ID; }
    public String getADDER_NO() { return ADDER_NO; }
    public void setADDER_NO(String ADDER_NO) { this.ADDER_NO = ADDER_NO; }
    public String getUPDATER_NO() { return UPDATER_NO; }
    public void setUPDATER_NO(String UPDATER_NO) { this.UPDATER_NO = UPDATER_NO; }
    public String getS_USER_ID() { return S_USER_ID; }
    public void setS_USER_ID(String S_USER_ID) { this.S_USER_ID = S_USER_ID; }
    public String getB_USER_ID() { return B_USER_ID; }
    public void setB_USER_ID(String B_USER_ID) { this.B_USER_ID = B_USER_ID; }
    public String getBILL_ID() { return BILL_ID; }
    public void setBILL_ID(String BILL_ID) { this.BILL_ID = BILL_ID; }
    public String getADDER_NAME() { return ADDER_NAME; }
    public void setADDER_NAME(String ADDER_NAME) { this.ADDER_NAME = ADDER_NAME; }
    public String getUPDATE_TIME() { return UPDATE_TIME; }
    public void setUPDATE_TIME(String UPDATE_TIME) { this.UPDATE_TIME = UPDATE_TIME; }
    public String getUPDATER_NAME() { return UPDATER_NAME; }
    public void setUPDATER_NAME(String UPDATER_NAME) { this.UPDATER_NAME = UPDATER_NAME; }

    @Override
    public String toString() {
        return "BillInfo{" +
               "ADD_TIME='" + ADD_TIME + '\'' +
               ", ORDER_ID='" + ORDER_ID + '\'' +
               ", ADDER_NO='" + ADDER_NO + '\'' +
               ", UPDATER_NO='" + UPDATER_NO + '\'' +
               ", S_USER_ID='" + S_USER_ID + '\'' +
               ", B_USER_ID='" + B_USER_ID + '\'' +
               ", BILL_ID='" + BILL_ID + '\'' +
               ", ADDER_NAME='" + ADDER_NAME + '\'' +
               ", UPDATE_TIME='" + UPDATE_TIME + '\'' +
               ", UPDATER_NAME='" + UPDATER_NAME + '\'' +
               '}';
    }
}

// 如果JSON结构更复杂,包含嵌套对象,则需要定义相应的嵌套POJO
public class RootData implements Serializable {
    private BillInfo bill_info;

    public RootData() {}

    public BillInfo getBill_info() { return bill_info; }
    public void setBill_info(BillInfo bill_info) { this.bill_info = bill_info; }

    @Override
    public String toString() {
        return "RootData{" +
               "bill_info=" + bill_info +
               '}';
    }
}

然后,可以使用Jackson或Gson等库在ProcessFunction中将字符串反序列化为POJO:

import com.fasterxml.jackson.databind.ObjectMapper; // 引入Jackson库
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkPojoProcessing {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator pojoDS = inputDS.process(new ProcessFunction() {
            // ObjectMapper 是线程安全的,可以作为类成员或静态成员
            private transient ObjectMapper objectMapper; 

            @Override
            public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
                super.open(parameters);
                objectMapper = new ObjectMapper();
            }

            @Override
            public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception {
                RootData rootData = objectMapper.readValue(value, RootData.class);
                out.collect(rootData);
            }
        });
        pojoDS.print();

        env.execute();
    }
}

为了使用Jackson,需要添加以下依赖:


    com.fasterxml.jackson.core
    jackson-databind
    2.13.4 

总结

在Flink中将JSON字符串转换为JSONObject时,如果遇到NullPointerException,尝试使用org.json库的new JSONObject(value)构造函数通常可以解决问题。然而,从长期维护和性能优化的角度来看,强烈建议将JSON字符串反序列化为POJO。POJO不仅提供了更好的类型安全和代码可读性,还能显著提高Flink应用的序列化和反序列化效率,是构建健壮、高性能流处理应用的基石。

相关专题

更多
java
java

Java是一个通用术语,用于表示Java软件及其组件,包括“Java运行时环境 (JRE)”、“Java虚拟机 (JVM)”以及“插件”。php中文网还为大家带了Java相关下载资源、相关课程以及相关文章等内容,供大家免费下载使用。

825

2023.06.15

java正则表达式语法
java正则表达式语法

java正则表达式语法是一种模式匹配工具,它非常有用,可以在处理文本和字符串时快速地查找、替换、验证和提取特定的模式和数据。本专题提供java正则表达式语法的相关文章、下载和专题,供大家免费下载体验。

724

2023.07.05

java自学难吗
java自学难吗

Java自学并不难。Java语言相对于其他一些编程语言而言,有着较为简洁和易读的语法,本专题为大家提供java自学难吗相关的文章,大家可以免费体验。

728

2023.07.31

java配置jdk环境变量
java配置jdk环境变量

Java是一种广泛使用的高级编程语言,用于开发各种类型的应用程序。为了能够在计算机上正确运行和编译Java代码,需要正确配置Java Development Kit(JDK)环境变量。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

395

2023.08.01

java保留两位小数
java保留两位小数

Java是一种广泛应用于编程领域的高级编程语言。在Java中,保留两位小数是指在进行数值计算或输出时,限制小数部分只有两位有效数字,并将多余的位数进行四舍五入或截取。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

398

2023.08.02

java基本数据类型
java基本数据类型

java基本数据类型有:1、byte;2、short;3、int;4、long;5、float;6、double;7、char;8、boolean。本专题为大家提供java基本数据类型的相关的文章、下载、课程内容,供大家免费下载体验。

445

2023.08.02

java有什么用
java有什么用

java可以开发应用程序、移动应用、Web应用、企业级应用、嵌入式系统等方面。本专题为大家提供java有什么用的相关的文章、下载、课程内容,供大家免费下载体验。

428

2023.08.02

java在线网站
java在线网站

Java在线网站是指提供Java编程学习、实践和交流平台的网络服务。近年来,随着Java语言在软件开发领域的广泛应用,越来越多的人对Java编程感兴趣,并希望能够通过在线网站来学习和提高自己的Java编程技能。php中文网给大家带来了相关的视频、教程以及文章,欢迎大家前来学习阅读和下载。

16861

2023.08.03

php源码安装教程大全
php源码安装教程大全

本专题整合了php源码安装教程,阅读专题下面的文章了解更多详细内容。

7

2025.12.31

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.1万人学习

C# 教程
C# 教程

共94课时 | 5.7万人学习

Java 教程
Java 教程

共578课时 | 40万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号