本文介绍: 在Apache SeaTunnel的最新插件开发中,connector-v2 maxcompute 连接器实现了基于CatalogTable + SaveMode的新版本。本文主要给大家分享了源端的关键改动包括弃用了过时的方法,改为通过CatalogTable实现数据传递。汇端则增加了对multi-table sink和saveMode接口的实现,并需手动管理auto service注解等此外,开发了特定的catalog以处理公共参数和客户端操作。
在Apache SeaTunnel的最新插件开发中,connector-v2 maxcompute 连接器实现了基于CatalogTable + SaveMode的新版本。
本文主要给大家分享了源端的关键改动包括弃用了过时的方法,改为通过CatalogTable实现数据传递。汇端则增加了对multi-table sink和saveMode接口的实现,并需手动管理auto service注解等经验方法!
此外,开发了特定的catalog以处理公共参数和客户端操作。
connector-v2 maxcompute 连接器
connector-v2 maxcompute 连接器
实现了基于CatalogTable + SaveMode
的新版本。
a) 源端的核心改动
- 新版本中
SeaTunnelSource
继承自SatunnelPluginLifeCycle
接口,其中包含了一个prepare
方法。该方法原本用于初始化源端中的config
参数,以及一些预处理工作,如删除数据、执行SQL等。 SeaTunnelSource
中的getProducedType
方法用于存储SeatunnelRowType
,即列映射转换信息的覆写。
改动如下:
- 不再使用两个已过时的方法。
- 在构造函数中创建
CatalogTable
,并在新方法List<CatalogTable> getProducedCatalogTables()
中实现catalog
的数据传递。在新版本中,catalogTable
将从源端传递到汇端,作为后续saveMode
里批量同步建表的SQL拼接来源。 SeatunnelRowType
可以从catalogTable
中获取,以适配现有连接器,也可以自己封装公共参数进行传递。
关于Catalog:
- 我们通过
getTable
获取CatalogTable
。如果配置了自定义列(column
),以适配DataX,我们需要基于column
生成CatalogTable
。
b) SInk端的核心改动
- 在
MaxcomputeSinkFactory
中,额外实现了createSink
方法,从上下文中获取catalog
和ReadonlyConfig
。 Sink
需要去除auto service
注解,并实现multi-table sink
和saveMode
接口。- 基于
DefaultSaveModeHandler
,通过schemaSaveMode
或DataSaveMode
配置,利用catalog
中的方法实现数据清理、建表或个性化操作。 catalog
实现了truncate table
接口,用于数据删除和分区重建。
去除auto_service
注解的原因:
- 如果某个连接器汇端(例如maxcompute)已经实现了新模式接口,不再通过过时的
prepare
方法注入config
,而是通过构造函数从上下文传入的catalogtable
获取内部seatunnelRowType
的适配。这个连接器在排序上比较靠前,AbstractPluginDiscovery
在服务加载时总是首先发现它。但是,如果其他插件(例如clickhouse)仍在使用旧模式(prepare
),则MultipleTableJobConfigParser
解析汇端会将其视为过时插件,并回退到JobConfigParser
进行适配。在parseSink
方法中,AbstractPluginDiscovery
仍会扫描所有通过autoService
注解注入的SeaTunnelSink
插件。此时,它遇到的第一个插件maxcompute的构造函数不能适配旧模式的实例初始化,导致失败。 - 这导致了Maxcompute到Clickhouse数据同步时,报出MaxcomputeSink无法初始化的问题。
MaxcomputeSinkWriter
需要额外实现SupportMutiTableSinkWriter
,否则无法进行适当的转换。
如果要指定部分列回流:
- 必须比较
catalogTable
中的列数量与指定的自定义列数量,两者相等才可以进行回流。 truncate -> data_save_mode / schema_save_mode
- 支持列数据的隐式转换(
seatunnel row
数据是一个Object[]
数组,需要结合CatalogTable TableSchema + Opds TableSchema
)
c) 开发特定的catalog
- 需要实现公共
Catalog
接口,其中的open
和close
可以处理一些公共参数和客户端操作。 catalog
需要通过catalog factory
进行实例化,并通过auto service
进行装配加载。
本文由 白鲸开源科技 提供发布支持!
原文地址:https://blog.csdn.net/weixin_54625990/article/details/135837572
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_63119.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。