游乐游手机版
首页/AI教程/文章详情

Phoenix映射HBase时间戳的实现方案

时间:2026-06-16 15:55
Phoenix官方ROW_TIMESTAMP方案存在主键类型限制、不可更新等缺陷,无法解决数据乱序覆盖问题。通过分析源码,新增PRowts类型,将数据时间戳动态映射到HBase时间戳,实现基于数据自身时间戳的版本控制,有效解决了Kafka重分区和离线补数时的覆盖问题。

HBase用户福利

新用户9.9元即可使用6个月云数据库HBase,更有低至1元包年的入门规格供广大HBase爱好者学习研究,更多内容请参考链接

先说几个核心判断。Phoenix从4.6版本开始官方提供了ROW_TIMESTAMP标签,用来映射HBase的原生时间戳,但这个方案在实际使用中限制还真不少。具体来说,只有主键中的TIME、DATE、TIMESTAMP、BIGINT、UNSIGNED_LONG类型的字段才能被设置成ROW_TIMESTAMP;而且只能有一个主键列能被设置成ROW_TIMESTAMP;这个标志的字段还不能为null值;更关键的是,只有在建表的时候才能指定,并且这个列不能为负数。

除了这些使用层面的限制,实际应用场景中也挺让人头疼的。根据官方设计,ROW_TIMESTAMP字段主要有以下几种形式。


业务主键在前
业务主键在前
ROW_TIMESTAMP字段在前
ROW_TIMESTAMP字段在前
只有ROW_TIMESTAMP字段
只有ROW_TIMESTAMP字段

那么,这几种形式各自的优劣如何?


业务主键在前。无论ROW_TIMESTAMP字段怎么取值,都能通过业务主键1做单点查询,这意味着在知道业务主键1的情况下可以快速精确查询,很实用。
ROW_TIMESTAMP字段在前。如果不知道某条数据对应的ROW_TIMESTAMP值,那就没法通过主键查了;反过来如果通过业务主键能映射这个值,虽然可以查了,但要注意这个字段将无法修改——修改就等于删除旧记录再重新插入。
只有ROW_TIMESTAMP字段。这种形式在时序数据中比较常见,也就是说没有业务主键,一般不做单点查询而是范围扫描。

其实官方ROW_TIMESTAMP方案最大的痛点,就是原有记录不能更新,只能先删除再插入,这直接限制了它的应用面。

我们的实现

背景

团队用Phoenix存储了所有需要实时查询的表,通过写Phoenix-SQL来查询当前最新数据。整体的架构是这样的:

基本架构

问题

正常逻辑下,实时抽取MySQL的binlog,写入Phoenix;每天还会有Hive批量抽取MySQL数据来做校验和补数。实时写入时,必须考虑binlog更新的顺序,至少要做到与MySQL原数据每行更新的顺序一致;离线补数时,需要确保不覆盖实时写入的数据。

实时写入

实时写入的顺序,多数由CDC(比如canal、debezium)控制。针对每一条数据的更新,CDC会按“表名+主键”进行哈希,然后路由到Kafka对应的分区。也就是说,某个表某条记录的更新在消费时是有严格顺序的。但问题在于,后期如果更改Kafka分区数就会比较棘手。如果不停服更新,同一条记录的不同更新可能会跑到不同分区,顺序就无法保证了,插入Phoenix时就会出现覆盖问题。反过来,如果停服更新,就需要先停掉CDC,等消费者消费完,再调整分区,最后重启消费者——这样才能避免相互覆盖。

实时写入还有个潜在风险,那就是数据丢失。无论是网络抖动还是组件健壮性,都可能导致丢数据。一旦发生,就需要走校验和补数的逻辑。

离线补数

离线补数就是用来兜底实时数据丢失的。它包含两个步骤:校验和补数。


校验。拿当前全量或增量的数据,与Phoenix表中相同主键的数据比对,看Phoenix有没有丢数或缺失更新。丢数就是Phoenix应该有的数据却没出现;缺失更新则是数据不是最新的。
补数。根据上一步计算出来的丢失数据或更新,写入Phoenix。

离线补数听起来挺完美,但最大的问题是:校验和补数是两个步骤,不在一个事务里。有可能某条数据在校验阶段确实丢了,但校验之后、补数之前,这条数据又被写回来了。那么补数动作一执行,反而把最新数据给覆盖成旧数据了。

解决方案

看到这里应该能明白,使用官方提供的ROW_TIMESTAMP方案,是没法很好解决数据乱序覆盖问题的。那究竟该怎么办?有没有一种方案能一劳永逸地解决上面所有问题?下面分享一下解决思路和具体实现。

思路

熟悉HBase的朋友一定知道,HBase在插入或更新数据时可以指定时间戳(版本号),而且查询时默认显示时间戳最大的那条数据。那么,如果Phoenix在根据主键写入数据时,能把该条数据的更新时间直接填入HBase的时间戳字段,是不是就能解决覆盖问题了呢?确实可以。

其实每一条更新都是数据的一个版本。如果写入时能指定时间戳,就意味着指定了数据的版本,无论更新到达的顺序如何,Phoenix读到的始终是最新的数据。如果真能实现,那么无论是Kafka重新分区还是离线补数,都不再需要担心覆盖问题了。可惜,Phoenix目前没有这个机制,得我们给它做个简单的升级。

实现方案

其实Phoenix官方有一个CurrentSCN属性,可以控制每次DDL、DML、QUERY的时间戳。也就是说,插入或更新时,它会根据CurrentSCN的值来设定当前数据对应的HBase时间戳。但遗憾的是,它只能控制每次提交的整批数据,没法精确控制每一条数据。当然,如果每Upsert一条数据都设一次CurrentSCN再提交,理论上也能解决问题,但这样就没法做批量提交了,性能会受影响。

实现时参考了一下CurrentSCN的原理。经过分析,在MutationState类的generateMutations方法里找到了这么一段代码:

PRow row = table.newRow(connection.getKeyValueBuilder(), timestampToUse, key, hasOnDupKey);

这段代码是创建一条数据,后续的Upsert数据都由此产生。从命名看,timestampToUse应该就是这条数据的时间戳。

/**
 * Creates a new row at the specified timestamp using the key
 * for the PK values (from {@link #newKey(ImmutableBytesWritable, byte[][])}
 * and the optional key values specified using values.
 * @param ts the timestamp that the key value will ha ve when committed
 * @param key the row key of the key value
 * @param hasOnDupKey true if row has an ON DUPLICATE KEY clause and false otherwise.
 * @param values the optional key values
 * @return the new row. Use {@link org.apache.phoenix.schema.PRow#toRowMutations()} to
 * generate the Row to send to the HBase server.
 * @throws ConstraintViolationException if row data violates schema
 * constraint
 */
PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values);

从newRow的描述来看,timestampToUse确实是当前数据的时间戳。顺着调用链,找到了timestampToUse最近的一次赋值位置:UpsertCompiler.setValues方法,里面有RowTimestampColInfo类型的rowTsColInfo字段。不过为了不干扰原有的CurrentSCN功能,选择了优化UpsertCompiler.setValues方法。改造后的代码片段如下:

for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
    byte[] value = values[j];
    PColumn column = table.getColumns().get(columnIndexes[i]);
    if (SchemaUtil.isPKColumn(column)) {
        pkValues[pkSlotIndex[i]] = value;
        if (SchemaUtil.getPKPosition(table, column) == table.getRowTimestampColPos()) {
            if (!useServerTimestamp) {
                PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
                rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder());
                if (rowTimestamp < 0) {
                    throw new IllegalDataException("Value of a column designated as ROW_TIMESTAMP cannot be less than zero");
                }
                rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
            }
        } 
    } else {
        columnValues.put(column, value);
        columnValueSize += (column.getEstimatedSize() + value.length);
    }
    if(column.getDataType().getSqlTypeName().equals(PRowts.INSTANCE.getSqlTypeName()) && rowTimestamp == null){
        rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, column.getSortOrder());
        if (rowTimestamp < 0) {
            throw new IllegalDataException("Value of a column designated as ROW_TS cannot be less than zero");
        }
        rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
    }
}

处理每行数据每个字段时,判断当前字段类型是否为PRowts,如果是,就根据该值创建RowTimestampColInfo。这样就实现了根据数据动态改变HBase时间戳的目标。

为了快速实现PRowts类型,选择把它设定为Long类型的别名,也就是说基于PLong类创建PRowts,逻辑完全一样,只是个别参数名不同。下面是PRowts的默认构造函数:

private PRowts() {
    super("ROW_TS", 21, Long.class, new PLong.LongCodec(), 48);
}

至此,就实现了将数据时间戳映射到HBase时间戳的功能。整个过程可以归纳为两步:


新增PRowts类型。创建表时指定某个字段为PRowts,该字段原始类型必须是long;或者修改现有字段的类型为PRowts。
根据数据构造HBase的Put命令时,将PRowts的值写入row timestamp。

实现过程看上去简单,但背后确实花了不少精力去阅读和梳理Phoenix源码,只有真正理解了才能改造升级。篇幅所限,很多细节没法展开说。其实也不一定非要改造UpsertCompiler.setValues,读者完全可以根据实际情况自行实现。另外还可以扩展PRowts,使其支持TIME、DATE、TIMESTAMP、BIGINT等其他时间类型的数据。

来源:https://developer.aliyun.com/article/704753
上一篇Java动态加载卸载JAR包的关键技术深度思考分析 下一篇从Skill到Hook自动化闭环验证工程实践
本站内容用于信息整理与展示,如有侵权或内容问题请及时联系处理。

相关推荐

补充同频道和同主题内容,方便继续浏览更多相关内容。

同类最新

继续查看同栏目最近更新的文章。

更多
企业组织级AI赋能具体实施方法
AI教程 · 2026-06-30

企业组织级AI赋能具体实施方法

前段时间收到一位读者的留言,希望聊聊企业级、组织级的AI赋能究竟该怎么落地。巧的是,前几天刚看到一份咨询调研机构的数据:对近一两年所有企业级AI赋能项目的统计显示,超过90%的甲方企业认为,AI赋能在核心业务价值链上没有发挥任何实质性作用。除了AI辅助办公、企业智能知识库这类边缘应用起到了一些辅助效

Scrapy与Redis分布式架构的日本电商多平台数据聚合系统
AI教程 · 2026-06-30

Scrapy与Redis分布式架构的日本电商多平台数据聚合系统

从事日本电商数据聚合工作时,最大的难点在于要同时应对雅虎拍卖、煤炉(Mercari)、乐天和亚马逊日本站等截然不同的平台。以往使用单机爬虫,经常出现运行中崩溃的情况——单点故障、带宽利用率不足、数据存储混乱,这三大痛点令人困扰。 本文分享一套基于Scrapy + Redis的分布式爬虫方案,专门解决

详细PuTTY 0.81安装教程 SSH远程连接与自定义路径设置
AI教程 · 2026-06-30

详细PuTTY 0.81安装教程 SSH远程连接与自定义路径设置

​ PuTTY(简称PT)是一款轻量级开源SSH Telnet客户端,凭借简洁高效的特性,多年来始终是系统管理员与开发者进行远程连接的首选利器。本教程将详细介绍PuTTY 0 81版本的完整安装过程,并指导您自定义安装路径,以便更灵活地管理SSH远程连接工具。 安装准备 首先需要说明的是,整个安装流

在线教育系统必备功能:直播课堂与题库考试架构
AI教程 · 2026-06-30

在线教育系统必备功能:直播课堂与题库考试架构

很多人一想到做在线教育系统,第一反应往往是先把直播间和课程播放器搭起来,觉得“能看课”就万事大吉了。真到落地那天才发现,系统能不能顺滑跑起来,关键全藏在那些细节里——课程怎么组织、学习进度怎么记、考试怎么处理、后台怎么管得住。前端看起来就几个页面,后端其实是一整条业务链路。不管你是要做在线教育APP

ZStack源码级AI诊断套件让故障排查秒出答案
AI教程 · 2026-06-30

ZStack源码级AI诊断套件让故障排查秒出答案

一次故障排查,到底要花多少时间? 运维人员处理私有云、虚拟化平台的问题,流程大致都是这样:先翻日志看现象,再去文档里找对应机制,然后搜社区有没有类似案例,最后综合判断给出答复。简单问题半小时,复杂问题可能要跨天——而这些时间里,大部分精力耗在了“找信息”而不是“做决策”上。 类似的问题,也许每天都在