记一次对 DataX 向 ES 同步对象型数据的探索过程
今天开完产品审议会,Leader 表示让我来负责工作台模块自定义地理查询的功能开发时,已经学完 ES 的地理查询的我当即表示莫得问题👌。可就在我想先同步坐标在写业务代码时才发现,“这个 DataX 怎么同步对象型数据嘞?”🤔
介绍
DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。使用插件增强功能,这里我需要从 MySQL 同步数据到 ES,就要用到 MySQL 的输入端插件和 ES 的输出端插件。
用法
这里演示从 MySQL 同步到 ES,DataX 的同步很简单,只需要一个配置脚本并运行下面地命令即可
python /datax/bin/datax.py job.json
DataX 就会自动按照配置文件里的信息连接对应服务获取和同步数据。示例如下:
// job.json
{
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"querySql": ["select * from user_t"],
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/db_user"]
}
]
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://127.0.0.1:9200",
"accessId": "elastic",
"accessKey": "123456",
"index": "user",
"column": [
{ "name": "pk", "type": "id" },
{ "name": "col_ip", "type": "ip" },
{ "name": "col_double", "type": "double" },
{ "name": "col_long", "type": "long" },
{ "name": "col_integer", "type": "integer" },
{ "name": "col_keyword", "type": "keyword" },
{ "name": "col_text", "type": "text", "analyzer": "ik_max_word" },
{ "name": "col_geo_point", "type": "geo_point" },
{ "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss" },
{ "name": "col_nested1", "type": "nested" },
{ "name": "col_nested2", "type": "nested" },
{ "name": "col_object1", "type": "object" },
{ "name": "col_object2", "type": "object" },
{ "name": "col_integer_array", "type": "integer", "array": true },
{ "name": "col_geo_shape", "type": "geo_shape", "tree": "quadtree", "precision": "10m" }
]
}
}
}
]
}
}
问题
看起来是不是很简单?如果只是针对普通字符串,数字,或是日期字段,那确实没啥问题,通过 SQL 语句 都能正确地查出并同步。但问题就出在最后一个字段 col_geo_shape
,它的类型是 geo_shape
。了解 ES 的人都知道,这是 ES 用于地理查询的一个重要类型,用它我们可以实现判断坐标和坐标,坐标和区域以及区域和区域之间的空间关系。
但问题是,在 ES 中,geo_shape 类型的数据长下面这样:
"col_geo_shape": {
"type": "point",
"coordinates": [
108.374854,
30.809156
]
}
没错,它是一个遵循 GeoJson 格式的 Json 对象,这里的类型除了 point
外,还有 circle
,envelope
,polygon
等等
对于 SQL 语句的查询结果,我们都知道是一个个字段,默认情况下,它们都可以作为字符串,也就是对应 ES 的 keyword
或 text
类型。但 SQL 如何查出一个对象?
我相信到多数人的第一反应拼出来一个 json 串。如果你想到了,那么恭喜你答对了!可惜的是,当时的我并没有那么聪明,再加上每一次测试同步数据都要等很长的时间,我可不想为了一个猜测去冒这么大的时间成本(其实就是想追求一次通过)
于是,漫长的探索过程就开始了......
探索
原本我以为这样一个小问题,广大网友应该已经踩过坑了,大不了官方文档应该有写怎么用吧。
但直到我搜索了无数次,就是没有找到向 ES 同步对象型数据的具体示例。大部分都是在同步普通类型,如:integer
,date
,keyword
,long
等等。还有一些介绍也只是提了一嘴能同步 geo_shape
类型,但却依然没有给出具体的示例。
这时,我想到,既然官方有提供这样一个类型,那么一定有同步它的办法。所以我决定去 GitHub 找找官方文档。令我大失所望的是,官方仅提供了 ES 输出端的配置信息,我仍不清楚 mysql 到底怎么同步对象给 ES。
最终,我决定看源码。
分析
经过一番寻找,我最终找到了相关的代码,位于 ElasticSearchWriter.java 如下:
// ElasticSearchWriter.java
switch (colType) {
case STRING:
// 兼容string类型,ES5之前版本
break;
case KEYWORD:
// https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-search-speed.html#_warm_up_global_ordinals
field.put("eager_global_ordinals", jo.getBoolean("eager_global_ordinals"));
break;
case TEXT:
field.put("analyzer", jo.getString("analyzer"));
// 优化disk使用,也同步会提高index性能
// https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-disk-usage.html
field.put("norms", jo.getBoolean("norms"));
field.put("index_options", jo.getBoolean("index_options"));
if(jo.getString("fields") != null) {
field.put("fields", jo.getJSONObject("fields"));
}
break;
case DATE:
if (Boolean.TRUE.equals(jo.getBoolean("origin"))) {
if (jo.getString("format") != null) {
field.put("format", jo.getString("format"));
}
// es原生format覆盖原先来的format
if (jo.getString("dstFormat") != null) {
field.put("format", jo.getString("dstFormat"));
}
if(jo.getBoolean("origin") != null) {
columnItem.setOrigin(jo.getBoolean("origin"));
}
} else {
columnItem.setTimeZone(jo.getString("timezone"));
columnItem.setFormat(jo.getString("format"));
}
break;
case GEO_SHAPE:
field.put("tree", jo.getString("tree"));
field.put("precision", jo.getString("precision"));
break;
case OBJECT:
case NESTED:
if (jo.getString("dynamic") != null) {
field.put("dynamic", jo.getString("dynamic"));
}
break;
default:
break;
}
if (jo.containsKey("other_params")) {
field.putAll(jo.getJSONObject("other_params"));
}
注意看最后一个 if 语句,它会将除固定配置外的其余参数当成 json 串进行反序列化。这也坐实了,你只需要在 SQL 语句中通过 CONCAT
等字符串函数拼出一个字符串即可同步对象型数据。
结尾
总之,这件事给我的启发就是:在行动前一定要做好可行性分析,否则到时候代码写一半发现方法行不通,那就白写了🤣