0

0

Flink 中高效解析 JSON 字符串至 JSONObject 的实践指南

霞舞

霞舞

发布时间:2025-09-25 12:47:00

|

279人浏览过

|

来源于php中文网

原创

flink 中高效解析 json 字符串至 jsonobject 的实践指南

本文探讨了在 Apache Flink 流处理任务中,将 JSON 格式的字符串数据转换为 JSONObject 时可能遇到的 NullPointerException: Assigned key must not be null! 错误。通过分析问题根源,本文提供了一种基于 org.json 库的有效解决方案,并强调了使用 POJO 进行 JSON 反序列化的最佳实践,以提升 Flink 应用的性能和健壮性。

1. 引言与背景

在 Apache Flink 流处理应用中,处理来自 Kafka、文件或其他数据源的 JSON 格式字符串数据是常见的场景。开发者通常需要将这些原始字符串解析成结构化的 JSONObject 对象,以便进行后续的字段提取、转换或业务逻辑处理。然而,在尝试将解析后的 JSONObject 通过 Flink 的 Collector 发出时,有时会遇到令人困惑的 NullPointerException: Assigned key must not be null! 错误,即使调试显示字符串已成功解析为 JSONObject 实例。本文将深入探讨这一问题,并提供一个可靠的解决方案及最佳实践建议。

2. 问题描述:String 到 JSONObject 的转换困境

考虑以下 Flink 任务代码片段,其目标是将一个包含 JSON 字符串的 DataStream 转换为 DataStream

import com.alibaba.fastjson.JSONObject; // 假设使用Fastjson
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkJsonParseIssue {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator jsonObjDS = inputDS.process(new ProcessFunction() {
            @Override
            public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception {
                // 尝试使用 JSONObject.parseObject 进行解析
                JSONObject jsonObject = JSONObject.parseObject(value);
                out.collect(jsonObject); // 在此处抛出异常
            }
        });
        jsonObjDS.print();

        env.execute();
    }
}

当运行上述代码时,尽管在 processElement 内部调试发现 JSONObject.parseObject(value) 确实成功生成了 JSONObject 实例,但在调用 out.collect(jsonObject) 时,程序却抛出了以下运行时异常:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: Assigned key must not be null!
Caused by: java.lang.NullPointerException: Assigned key must not be null!

这个错误信息 Assigned key must not be null! 通常与 Flink 的状态管理或某些内部序列化机制有关,但在此场景下,它并非直接指向用户代码中显式设置的 Key 为 null。这表明问题可能出在 JSONObject 对象本身的某种特性,或者其与 Flink 内部序列化机制的兼容性上。

3. 解决方案:使用 org.json 库的正确姿势

经过分析和实践,我们发现问题可能与所使用的 JSON 库及其解析方式有关。如果采用 org.json 库,并使用其构造函数进行解析,可以有效避免上述问题。

首先,确保项目中引入了 org.json 库的依赖:


    org.json
    json
    20180130 

然后,修改 processElement 方法中的 JSON 解析逻辑,使用 org.json.JSONObject 的构造函数来创建对象:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.json.JSONObject; // 注意这里导入的是 org.json.JSONObject

public class FlinkJsonParseSolution {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator jsonObjDS = inputDS.process(new ProcessFunction() {
            @Override
            public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception {
                // 使用 org.json.JSONObject 的构造函数进行解析
                JSONObject jsonObject = new JSONObject(value);
                out.collect(jsonObject); // 现在可以正常收集
            }
        });
        jsonObjDS.print();

        env.execute();
    }
}

修改后,程序将能够顺利运行并打印出解析后的 JSONObject 内容,例如:

{"bill_info":{"ADDER_NAME":"sss","ADDER_NO":"0706","UPDATER_NAME":"ssss","UPDATER_NO":"0706","BILL_ID":"8687b584-038c-498c-8f97-ec1ca197da96","ADD_TIME":"2022-11-12 16:05:28:418","ORDER_ID":"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67","S_USER_ID":"s68","B_USER_ID":"b77","UPDATE_TIME":"2022-11-12 16:05:28:418"}}

这表明 org.json.JSONObject 的构造函数方式与 Flink 的内部机制(特别是序列化)具有更好的兼容性,从而避免了 NullPointerException。

4. 注意事项与最佳实践

尽管上述方法可以解决 JSONObject 收集的问题,但在生产环境中,直接在 Flink 流中传递 JSONObject 实例通常不是最佳实践。

Moshi Chat
Moshi Chat

法国AI实验室Kyutai推出的端到端实时多模态AI语音模型,具备听、说、看的能力,不仅可以实时收听,还能进行自然对话。

下载
  1. 序列化与反序列化开销: JSONObject 对象通常包含复杂的内部结构,其序列化和反序列化成本较高。在 Flink 这种高性能的流处理框架中,频繁地序列化和反序列化 JSONObject 会带来显著的性能损耗,尤其是在数据量大或状态操作多的场景下。

  2. 类型安全与可读性: 使用 JSONObject 意味着在运行时通过字符串键访问数据,缺乏编译时检查,容易出错。代码可读性也相对较差。

推荐的最佳实践是:将 JSON 数据反序列化为 Plain Old Java Objects (POJOs)。

POJO 具有以下优势:

  • 性能优化: Flink 对 POJO 有着优秀的序列化支持(特别是 Kryo 序列化),通常比通用 JSON 对象更高效。
  • 类型安全: POJO 定义了明确的字段类型,可以在编译时捕获类型错误。
  • 代码可读性与维护性: 业务逻辑可以直接操作 POJO 字段,代码更清晰、更易于理解和维护。

示例:将 JSON 反序列化为 POJO

假设我们的 JSON 结构可以映射到一个 BillInfo POJO:

// 定义 POJO 类
public class BillInfo {
    private String ADD_TIME;
    private String ORDER_ID;
    private String ADDER_NO;
    private String UPDATER_NO;
    private String S_USER_ID;
    private String B_USER_ID;
    private String BILL_ID;
    private String ADDER_NAME;
    private String UPDATE_TIME;
    private String UPDATER_NAME;

    // 必须提供无参构造函数
    public BillInfo() {}

    // 提供所有字段的 Getter 和 Setter 方法
    public String getADD_TIME() { return ADD_TIME; }
    public void setADD_TIME(String ADD_TIME) { this.ADD_TIME = ADD_TIME; }
    // ... 其他字段的 Getter/Setter
    public String getORDER_ID() { return ORDER_ID; }
    public void setORDER_ID(String ORDER_ID) { this.ORDER_ID = ORDER_ID; }
    public String getADDER_NO() { return ADDER_NO; }
    public void setADDER_NO(String ADDER_NO) { this.ADDER_NO = ADDER_NO; }
    public String getUPDATER_NO() { return UPDATER_NO; }
    public void setUPDATER_NO(String UPDATER_NO) { this.UPDATER_NO = UPDATER_NO; }
    public String getS_USER_ID() { return S_USER_ID; }
    public void setS_USER_ID(String S_USER_ID) { this.S_USER_ID = S_USER_ID; }
    public String getB_USER_ID() { return B_USER_ID; }
    public void setB_USER_ID(String B_USER_ID) { this.B_USER_ID = B_USER_ID; }
    public String getBILL_ID() { return BILL_ID; }
    public void setBILL_ID(String BILL_ID) { this.BILL_ID = BILL_ID; }
    public String getADDER_NAME() { return ADDER_NAME; }
    public void setADDER_NAME(String ADDER_NAME) { this.ADDER_NAME = ADDER_NAME; }
    public String getUPDATE_TIME() { return UPDATE_TIME; }
    public void setUPDATE_TIME(String UPDATE_TIME) { this.UPDATE_TIME = UPDATE_TIME; }
    public String getUPDATER_NAME() { return UPDATER_NAME; }
    public void setUPDATER_NAME(String UPDATER_NAME) { this.UPDATER_NAME = UPDATER_NAME; }

    @Override
    public String toString() {
        return "BillInfo{" +
               "ADD_TIME='" + ADD_TIME + '\'' +
               ", ORDER_ID='" + ORDER_ID + '\'' +
               // ... 其他字段
               '}';
    }
}

// 在 Flink 任务中使用 Jackson 或 Gson 进行反序列化
import com.fasterxml.jackson.databind.ObjectMapper; // 假设使用Jackson
// 或 import com.google.gson.Gson;

public class FlinkJsonToPojoSolution {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator billInfoDS = inputDS.process(new ProcessFunction() {
            // ObjectMapper 是线程安全的,可以作为成员变量
            private transient ObjectMapper objectMapper;

            @Override
            public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
                super.open(parameters);
                objectMapper = new ObjectMapper();
            }

            @Override
            public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception {
                // 假设 JSON 字符串的根元素是包含 "bill_info" 的对象
                // 需要先解析到 JsonNode 或 Map,再提取 "bill_info" 部分
                // 或者如果确定字符串直接是 bill_info 的内容,可以直接反序列化

                // 这里假设 value 是包含一个根键 "bill_info" 的完整 JSON 字符串
                // 先解析为 JsonNode,然后提取 "bill_info" 的内容
                com.fasterxml.jackson.databind.JsonNode rootNode = objectMapper.readTree(value);
                com.fasterxml.jackson.databind.JsonNode billInfoNode = rootNode.get("bill_info");

                if (billInfoNode != null) {
                    BillInfo billInfo = objectMapper.treeToValue(billInfoNode, BillInfo.class);
                    out.collect(billInfo);
                } else {
                    // 处理 JSON 格式不符合预期的情况
                    System.err.println("JSON string missing 'bill_info' key: " + value);
                }
            }
        });
        billInfoDS.print();

        env.execute();
    }
}

注意: 如果 JSON 字符串直接是 BillInfo 对象的内容(即没有外层的 {"bill_info": ...}),则 processElement 可以简化为 BillInfo billInfo = objectMapper.readValue(value, BillInfo.class);。上述示例处理了带有嵌套 bill_info 键的情况。

5. 总结

在 Flink 中处理 JSON 字符串时,将字符串解析为 JSONObject 并通过 Collector 发出可能因 JSON 库的选择和使用方式不当而导致 NullPointerException: Assigned key must not be null! 错误。通过使用 org.json 库并采用其构造函数 new JSONObject(value) 的方式,可以有效解决此问题。然而,从性能、类型安全和可维护性角度考虑,更推荐的做法是将 JSON 数据反序列化为 POJO。选择合适的 JSON 处理策略,将有助于构建更健壮、高效的 Flink 流处理应用。

相关专题

更多
java
java

Java是一个通用术语,用于表示Java软件及其组件,包括“Java运行时环境 (JRE)”、“Java虚拟机 (JVM)”以及“插件”。php中文网还为大家带了Java相关下载资源、相关课程以及相关文章等内容,供大家免费下载使用。

825

2023.06.15

java正则表达式语法
java正则表达式语法

java正则表达式语法是一种模式匹配工具,它非常有用,可以在处理文本和字符串时快速地查找、替换、验证和提取特定的模式和数据。本专题提供java正则表达式语法的相关文章、下载和专题,供大家免费下载体验。

724

2023.07.05

java自学难吗
java自学难吗

Java自学并不难。Java语言相对于其他一些编程语言而言,有着较为简洁和易读的语法,本专题为大家提供java自学难吗相关的文章,大家可以免费体验。

728

2023.07.31

java配置jdk环境变量
java配置jdk环境变量

Java是一种广泛使用的高级编程语言,用于开发各种类型的应用程序。为了能够在计算机上正确运行和编译Java代码,需要正确配置Java Development Kit(JDK)环境变量。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

395

2023.08.01

java保留两位小数
java保留两位小数

Java是一种广泛应用于编程领域的高级编程语言。在Java中,保留两位小数是指在进行数值计算或输出时,限制小数部分只有两位有效数字,并将多余的位数进行四舍五入或截取。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

398

2023.08.02

java基本数据类型
java基本数据类型

java基本数据类型有:1、byte;2、short;3、int;4、long;5、float;6、double;7、char;8、boolean。本专题为大家提供java基本数据类型的相关的文章、下载、课程内容,供大家免费下载体验。

445

2023.08.02

java有什么用
java有什么用

java可以开发应用程序、移动应用、Web应用、企业级应用、嵌入式系统等方面。本专题为大家提供java有什么用的相关的文章、下载、课程内容,供大家免费下载体验。

428

2023.08.02

java在线网站
java在线网站

Java在线网站是指提供Java编程学习、实践和交流平台的网络服务。近年来,随着Java语言在软件开发领域的广泛应用,越来越多的人对Java编程感兴趣,并希望能够通过在线网站来学习和提高自己的Java编程技能。php中文网给大家带来了相关的视频、教程以及文章,欢迎大家前来学习阅读和下载。

16861

2023.08.03

php源码安装教程大全
php源码安装教程大全

本专题整合了php源码安装教程,阅读专题下面的文章了解更多详细内容。

7

2025.12.31

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.1万人学习

C# 教程
C# 教程

共94课时 | 5.7万人学习

Java 教程
Java 教程

共578课时 | 40万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号