记一次对 DataX 向 ES 同步对象型数据的探索过程

今天开完产品审议会,Leader 表示让我来负责工作台模块自定义地理查询的功能开发时,已经学完 ES 的地理查询的我当即表示莫得问题👌。可就在我想先同步坐标在写业务代码时才发现,“这个 DataX 怎么同步对象型数据嘞?”🤔

介绍

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。使用插件增强功能,这里我需要从 MySQL 同步数据到 ES,就要用到 MySQL 的输入端插件和 ES 的输出端插件。

用法

这里演示从 MySQL 同步到 ES,DataX 的同步很简单,只需要一个配置脚本并运行下面地命令即可

python /datax/bin/datax.py job.json

DataX 就会自动按照配置文件里的信息连接对应服务获取和同步数据。示例如下:

// job.json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 2
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "connection": [
              {
                "querySql": ["select * from user_t"],
                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/db_user"]
              }
            ]
          }
        },
        "writer": {
          "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://127.0.0.1:9200",
            "accessId": "elastic",
            "accessKey": "123456",
            "index": "user",
            "column": [
              { "name": "pk", "type": "id" },
              { "name": "col_ip", "type": "ip" },
              { "name": "col_double", "type": "double" },
              { "name": "col_long", "type": "long" },
              { "name": "col_integer", "type": "integer" },
              { "name": "col_keyword", "type": "keyword" },
              { "name": "col_text", "type": "text", "analyzer": "ik_max_word" },
              { "name": "col_geo_point", "type": "geo_point" },
              { "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss" },
              { "name": "col_nested1", "type": "nested" },
              { "name": "col_nested2", "type": "nested" },
              { "name": "col_object1", "type": "object" },
              { "name": "col_object2", "type": "object" },
              { "name": "col_integer_array", "type": "integer", "array": true },
              { "name": "col_geo_shape", "type": "geo_shape", "tree": "quadtree", "precision": "10m" }
            ]
          }
        }
      }
    ]
  }
}

问题

看起来是不是很简单?如果只是针对普通字符串,数字,或是日期字段,那确实没啥问题,通过 SQL 语句 都能正确地查出并同步。但问题就出在最后一个字段 col_geo_shape,它的类型是 geo_shape。了解 ES 的人都知道,这是 ES 用于地理查询的一个重要类型,用它我们可以实现判断坐标和坐标,坐标和区域以及区域和区域之间的空间关系。

但问题是,在 ES 中,geo_shape 类型的数据长下面这样:

"col_geo_shape": {
    "type": "point",
    "coordinates": [
        108.374854,
        30.809156
    ]
}

没错,它是一个遵循 GeoJson 格式的 Json 对象,这里的类型除了 point 外,还有 circleenvelopepolygon 等等

对于 SQL 语句的查询结果,我们都知道是一个个字段,默认情况下,它们都可以作为字符串,也就是对应 ES 的 keywordtext 类型。但 SQL 如何查出一个对象?

我相信到多数人的第一反应拼出来一个 json 串。如果你想到了,那么恭喜你答对了!可惜的是,当时的我并没有那么聪明,再加上每一次测试同步数据都要等很长的时间,我可不想为了一个猜测去冒这么大的时间成本(其实就是想追求一次通过)

于是,漫长的探索过程就开始了......

探索

原本我以为这样一个小问题,广大网友应该已经踩过坑了,大不了官方文档应该有写怎么用吧。

但直到我搜索了无数次,就是没有找到向 ES 同步对象型数据的具体示例。大部分都是在同步普通类型,如:integerdatekeywordlong 等等。还有一些介绍也只是提了一嘴能同步 geo_shape 类型,但却依然没有给出具体的示例。

这时,我想到,既然官方有提供这样一个类型,那么一定有同步它的办法。所以我决定去 GitHub 找找官方文档。令我大失所望的是,官方仅提供了 ES 输出端的配置信息,我仍不清楚 mysql 到底怎么同步对象给 ES。

最终,我决定看源码。

分析

经过一番寻找,我最终找到了相关的代码,位于 ElasticSearchWriter.java 如下:

// ElasticSearchWriter.java
switch (colType) {
    case STRING:
        // 兼容string类型,ES5之前版本
        break;
    case KEYWORD:
        // https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-search-speed.html#_warm_up_global_ordinals
        field.put("eager_global_ordinals", jo.getBoolean("eager_global_ordinals"));
        break;
    case TEXT:
        field.put("analyzer", jo.getString("analyzer"));
        // 优化disk使用,也同步会提高index性能
        // https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-disk-usage.html
        field.put("norms", jo.getBoolean("norms"));
        field.put("index_options", jo.getBoolean("index_options"));
        if(jo.getString("fields") != null) {
            field.put("fields", jo.getJSONObject("fields"));
        }
        break;
    case DATE:
        if (Boolean.TRUE.equals(jo.getBoolean("origin"))) {
            if (jo.getString("format") != null) {
                field.put("format", jo.getString("format"));
            }
            // es原生format覆盖原先来的format
            if (jo.getString("dstFormat") != null) {
                field.put("format", jo.getString("dstFormat"));
            }
            if(jo.getBoolean("origin") != null) {
                columnItem.setOrigin(jo.getBoolean("origin"));
            }
        } else {
            columnItem.setTimeZone(jo.getString("timezone"));
            columnItem.setFormat(jo.getString("format"));
        }
        break;
    case GEO_SHAPE:
        field.put("tree", jo.getString("tree"));
        field.put("precision", jo.getString("precision"));
        break;
    case OBJECT:
    case NESTED:
        if (jo.getString("dynamic") != null) {
            field.put("dynamic", jo.getString("dynamic"));
        }
        break;
    default:
        break;
}
if (jo.containsKey("other_params")) {
    field.putAll(jo.getJSONObject("other_params"));
}

注意看最后一个 if 语句,它会将除固定配置外的其余参数当成 json 串进行反序列化。这也坐实了,你只需要在 SQL 语句中通过 CONCAT 等字符串函数拼出一个字符串即可同步对象型数据。

结尾

总之,这件事给我的启发就是:在行动前一定要做好可行性分析,否则到时候代码写一半发现方法行不通,那就白写了🤣