网站首页 > 博客文章 正文
导读:在日常开发中常有这么一个场景,采集如日志等数据后以JSON形式存储到Kafka中,再由Flink从Kafka中获取数据并进行处理。但是有时候JSON比较复杂(多层嵌套),在FlinkSQL中解析起来比较麻烦,下面将讨论Flink SQL(1.10版本) 如何解析复杂JSON。
官网Demo
//JSON Format
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#table-formats
首先查看官网给出的一个例子,大致的解决思路为使用 format.json-schema,自定义一个format schema。
//官网例子
CREATE TABLE MyUserTable (
...
) WITH (
'format.type' = 'json', -- required: specify the format type
'format.fail-on-missing-field' = 'true' -- optional: flag whether to fail if a field is missing or not, false by default
'format.fields.0.name' = 'lon', -- optional: define the schema explicitly using type information.
'format.fields.0.data-type' = 'FLOAT', -- This overrides default behavior that uses table's schema as format schema.
'format.fields.1.name' = 'rideTime',
'format.fields.1.data-type' = 'TIMESTAMP(3)',
'format.json-schema' = -- or by using a JSON schema which parses to DECIMAL and TIMESTAMP.
'{ -- This also overrides the default behavior.
"type": "object",
"properties": {
"lon": {
"type": "number"
},
"rideTime": {
"type": "string",
"format": "date-time"
}
}
}'
)
分析:Flink 在解析 JSON 的时候,对于复杂的 JSON 可以通过自定义format schema来支持。如果table schema 和 format schema相同,则可以自动派生 json 的 schema(但这种往往不适用于解析复杂JSON )。
实战例子
了解了官网的例子之后,我们手动试验一下。
1、从Kafka中获取复杂JSON用于测试,JSON 如下:
{"code":0,"data":{"request":{"name":"test","id":"ce1beb37-ed3e-4365-8e44-c3bd1d249cfc"}},"message":"SUCCESS","retryCount":1,"success":true}
2、编写format.json-schema
通过参考官网Demo,发现第一层的 retryCount 可直接就映射到字段上,而 data 是多层嵌套,所以定义data 的类型为object ,而properties则是其json的内层数据。我们的例子中为多层嵌套,那么简化对应的 json-schema 如下:
'format.json-schema' = '{
"type": "object",
"properties": {
"retryCount": {type: "string"},
"data":{type: "object",
"properties" : {
"request":{type: "object",
"properties" : {
"id" : {type:"string"}
}
}
}
}
}
}'
3、定义table schema
从上面的 json schame 和 Flink SQL 的映射关系可以看出,data对应的table 字段的类型是ROW,所以 table schema 应是如下:
CREATE TABLE sourceTable (
retryCount VARCHAR,
data ROW(request ROW(id string))
)
WITH (
......
}
4、使用的时候,直接用 "."的方式即可
Table table = bsTableEnv.sqlQuery("SELECT data.request.id AS ID,retryCount FROM sourceTable");
5、Kafka SourceTable完整例子
CREATE TABLE sourceTable (
retryCount VARCHAR,
data ROW(request ROW(id string))
)
WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'XXXX',
'connector.properties.zookeeper.connect' = 'XXXX:2181',
'connector.properties.bootstrap.servers' = 'XXXX:9092',
'connector.properties.group.id' = 'XXXXX',
'format.json-schema' = '{
"type": "object",
"properties": {
"retryCount": {type: "string"},
"data":{type: "object",
"properties" : {
"request":{type: "object",
"properties" : {
"id" : {type:"string"}
}
}
}
}
}
}',
'format.type' = 'json');
最后
以上就是在FlinkSQL1.10中处理复杂JSON的一种方式,通过定义format.json schema实现。而在查看Flink中文邮件列表中也发现了其他的一些不错思路,如下:
通过在上游时将就转义成一个String放到JSON的一个field中,这样Flink解析出来就是一个String,
然后编写UDTF进行处理,感兴趣的朋友也可以尝试一下。
感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。
猜你喜欢
- 2024-12-24 go语言序列化json/gob/msgp/protobuf性能对比
- 2024-12-24 Dotnet工具箱:带你探索10大工具分类和73个实时在线小工具
- 2024-12-24 Python数据持久化:JSON
- 2024-12-24 原来解析 JSON 数据有这么简单的方法
- 2024-12-24 什么是JSON?怎么用Python来编码+解码JSON对象?
- 2024-12-24 序列化与反序列化——FastJSON、Jackson、Gson性能测试
- 2024-12-24 Map转JSON字符串,对象转JSON字符串,JSON.toJSONString()全解决
- 2024-12-24 jQuery学习笔记
- 2024-12-24 为什么JSON.parse会损坏大数字,如何解决这个问题?
- 2024-12-24 【json系列】一文读懂什么是JSON Schema
你 发表评论:
欢迎- 最近发表
-
- Python 中 必须掌握的 20 个核心函数—len()函数
- 用PLC的指针实现字符串转byte(Codesys平台)一文极简搞懂指针
- EXCEL如何用函数读取复杂字符串中的数据
- 2025-07-19:计算字符串的镜像分数。用go语言,给定一个字符串 s
- 2025-07-10:字符相同的最短子字符串Ⅰ。用go语言,给定一个长度
- 基于物理特征融合与机器学习的多井协同钻井速率实时预测与优化(
- [电子学报文章精选]一种基于特征融合的恶意代码快速检测方法
- 强大的可视化流程图编辑神器 — LogicFlow
- 前端框架太卷了!字节企业级框架Arco Design Mobile开源了
- Vue独立组件——11个最佳Vue.js日期选择器组件
- 标签列表
-
- ifneq (61)
- 字符串长度在线 (61)
- googlecloud (64)
- flutterrun (59)
- powershellfor (73)
- messagesource (71)
- plsql64位 (73)
- vueproxytable (64)
- npminstallsave (63)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- nacos启动失败 (64)
- ssh-add (70)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- qcombobox样式表 (68)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)