谈到跨集群数据同步,许多团队的第一反应往往是编写定制脚本、搭建数据管道、处理增量数据、再进行数据校验等环节。整个流程下来,少则数周,多则数月,期间还难免遇到各种问题。其实,如果正在使用 EMR Spark,有一个名为 Relational Cache 的特性可以显著简化这一过程——它原本用于加速数据分析,但基于“物化视图”的底层机制,使其天然适合承担数据同步任务。下面通过一个实际案例进行演示。
背景
Relational Cache 是 EMR Spark 的核心特性之一,通过预组织和预计算数据来加速分析,功能上与传统数据仓库中的物化视图类似。然而,它的应用远不止于查询加速——跨集群数据同步便是一个非常实用的场景。
许多企业希望借助统一的 Data Lake 管理所有数据,但在实际环境中,多个数据中心、不同的网络 Region 甚至不同部门,往往导致多个大数据集群并存。集群间的数据同步需求极为普遍,集群迁移或搬站时的老数据与新数据对齐更是常见挑战。传统方法通常需要大量定制开发和人工介入:开发同步工具、处理增量更新、协调读写、比对数据等。而基于 Relational Cache,可以用极小的代价完成这项工作。
接下来通过一个具体示例,展示如何利用 EMR Spark Relational Cache 实现跨集群数据同步。
使用 Relational Cache 同步数据
假设存在 A、B 两个集群,需要将 activity_log 表的数据从集群 A 同步至集群 B。在此期间,新数据会持续写入集群 A 的这张表。集群 A 中 activity_log 的建表语句如下:
CREATE TABLE activity_log (
user_id STRING,
act_type STRING,
module_id INT,
d_year INT
) USING JSON
PARTITIONED BY (d_year)
先插入两条历史数据:
INSERT INTO TABLE activity_log PARTITION (d_year = 2017)
VALUES("user_001", "NOTIFICATION", 10), ("user_101", "SCAN", 2)
然后为这张表创建 Relational Cache:
CACHE TABLE activity_log_sync
REFRESH ON COMMIT
DISABLE REWRITE
USING JSON
PARTITIONED BY (d_year)
LOCATION "hdfs://192.168.1.36:9000/user/hive/data/activity_log"
AS SELECT user_id, act_type, module_id, d_year FROM activity_log
重点在于 REFRESH ON COMMIT:这意味着源表数据一旦更新,Cache 数据将自动刷新。通过 LOCATION 可以指定 Cache 数据的存储位置,这里将其指向集群 B 的 HDFS,从而实现数据从集群 A 到集群 B 的同步。同时,Cache 的字段与分区信息与源表保持一致。
接着在集群 B 中也创建一张 activity_log 表:
CREATE TABLE activity_log (
user_id STRING,
act_type STRING,
module_id INT,
d_year INT
) USING JSON
PARTITIONED BY (d_year)
LOCATION "hdfs:///user/hive/data/activity_log"
执行 MSCK REPAIR TABLE activity_log 自动修复相关元数据信息,然后查询——可以看到集群 B 已经能够查到集群 A 中之前插入的两条数据了。

接着在集群 A 中继续插入新数据:
INSERT INTO TABLE activity_log PARTITION (d_year = 2018)
VALUES("user_011", "SUBCRIBE", 24);
然后在集群 B 中再次执行 MSCK REPAIR TABLE activity_log 并查询——数据已经自动同步过来了!对于分区表,当新分区数据加入时,Relational Cache 能够增量同步新分区,无需重新同步全部数据。

如果集群 A 中 activity_log 的新增数据并非通过 Spark 插入,而是经由 Hive 或其他方式外部导入到 Hive 表中,可以通过 REFRESH TABLE activity_log_sync 手动触发同步,也可以编写脚本定期执行。若新增数据按分区批量导入,还可以使用类似 REFRESH TABLE activity_log_sync WITH TABLE activity_log PARTITION (d_year=2018) 的语句来增量同步单个分区。
Relational Cache 能够保证集群 A 与集群 B 中 activity_log 表的数据一致性。依赖该表的下游任务或应用可以随时切换到集群 B。也可以先暂停向集群 A 写入数据的应用,将数据源指向集群 B 中的表并重启服务,从而完成上层应用的迁移。迁移完成后,清理集群 A 中的 activity_log 和 activity_log_sync 即可。
总结
通过 Relational Cache 在不同大数据集群之间同步数据,操作极为简洁——甚至无需编写额外的同步工具。当然,它的应用场景远不止于此:构建秒级响应的 OLAP 平台、交互式 BI、Dashboard 应用、加速 ETL 流程……都是它擅长的领域。后续我们将继续分享 Relational Cache 在更多场景下的最佳实践。
