游乐游手机版
首页/AI教程/文章详情

kuairand-27k的Parquet数据导出与上传到MaxCompute完整流程(hstu格式)

时间:2026-06-11 16:36
使用pandas和pyarrow读取Kuairand-27kParquet,通过pyodps连接MaxCompute,完成数据探查、类型映射(int64转bigint、list转array),建表后经Tunnel批量上传,处理numpy到Python类型转换,最终将1257行数据写入MaxCompute表。

概述

本文旨在清晰阐述如何将本地的 Parquet 文件(源自 kuairand-27k 推荐系统数据集)完整地导出,并顺利上传至阿里云 MaxCompute 表中。整个流程涵盖数据探查、类型映射、建表以及上传环节,所有代码均已集成,实现一站式操作。

kuairand-27k的Parquet 数据导出与上传到 MaxCompute 完整流程(hstu格式)

1. 环境准备

依赖安装

首先,需要安装所依赖的工具包:

pip install pandas pyarrow pyodps

其中,pandaspyarrow 用于读取 Parquet 文件,而 pyodps 是阿里云 MaxCompute 的 Python SDK,三者缺一不可。

凭证配置

连接 MaxCompute 需要配置 AccessKey。建议通过环境变量传递,切勿硬编码在代码中——确保安全性。配置命令如下:

export ACCESS_ID="your_access_id"
export ACCESS_KEY="your_access_key"

2. 数据探查:读取 Parquet 文件

在上传之前,有必要先了解数据的结构。通过以下代码读取 Parquet 文件,数据概况便一目了然:

import pandas as pd
df = pd.read_parquet("kuairand-27k-train-0.parquet")
print(f"数据总行数: {len(df)}")
print(f"数据列数: {len(df.columns)}")
print(f"列名列表:")
print(df.columns.tolist())
print(f"数据类型:")
print(df.dtypes)
# 逐列查看前5条数据
for col in df.columns:
    print(f"--- {col} (dtype: {df[col].dtype}) ---")
    for i in range(min(5, len(df))):
        val = df[col].iloc[i]
        if isinstance(val, (list,)):
            print(f"[{i}] len={len(val)}, 前10个值: {val[:10]}")
        else:
            print(f"[{i}] {val}")

探查结果

执行代码后,结果显示:该数据集共 1257 行、14 列,各列信息如下:

列名 Python 类型 字段说明
user_id int64 用户唯一标识
user_active_degree int64 用户活跃程度
follow_user_num_range int64 关注人数所在区间
fans_user_num_range int64 粉丝人数所在区间
friend_user_num_range int64 好友人数所在区间
register_days_range int64 注册天数所在区间
video_id list(int) 用户历史交互的视频序列
action_timestamp list(int) 行为发生的时间戳序列
action_weight list(int) 行为权重序列(bitmask 编码)
watch_time list(int) 观看时长序列
item_video_id list(int) 候选视频 ID 序列
item_action_weight list(int) 候选视频的行为标签
item_target_watchtime list(int) 候选视频的目标观看时长
item_query_time list(int) 候选请求的时间戳

3. 类型映射:Parquet → MaxCompute

数据类型必须正确对应,否则上传时可能出错。映射关系如下:

Parquet/Python 类型 MaxCompute 类型
int64(标量) bigint
list(int)(数组) array

映射规则简洁清晰——标量对应 bigint,数组对应 array

4. 完整上传脚本

接下来是核心部分:完整的上传脚本。请注意,脚本中已处理好类型转换和批量写入逻辑,可直接使用。

#!/usr/bin/env python3
"""将 kuairand-27k-train-0.parquet 数据上传到 MaxCompute 表 pairec_kuairand_train"""
import os
import pandas as pd
import numpy as np
from odps import ODPS
from odps.models import TableSchema as Schema, Column

# ========== 1. 配置连接参数 ==========
project_name = "pairec_mc"
access_id = os.environ["ACCESS_ID"]
access_key = os.environ["ACCESS_KEY"]
endpoint = "https://service.cn.maxcompute.aliyun.com/api"

# ========== 2. 连接 MaxCompute ==========
odps = ODPS(access_id, access_key, project_name, endpoint=endpoint)
print("MaxCompute 连接成功")

# ========== 3. 建表 ==========
TABLE_NAME = "pairec_kuairand_train"
# 先删除已有表(如需覆盖写入)
if odps.exist_table(TABLE_NAME):
    print(f"表 {TABLE_NAME} 已存在,正在删除...")
    odps.delete_table(TABLE_NAME)
    print(f"表 {TABLE_NAME} 已删除")

# 定义表 schema
columns = [
    # 用户侧标量字段 (bigint)
    Column(name="user_id", type="bigint"),
    Column(name="user_active_degree", type="bigint"),
    Column(name="follow_user_num_range", type="bigint"),
    Column(name="fans_user_num_range", type="bigint"),
    Column(name="friend_user_num_range", type="bigint"),
    Column(name="register_days_range", type="bigint"),
    # 历史序列字段 (array)
    Column(name="video_id", type="array"),
    Column(name="action_timestamp", type="array"),
    Column(name="action_weight", type="array"),
    Column(name="watch_time", type="array"),
    # 候选物料字段 (array)
    Column(name="item_video_id", type="array"),
    Column(name="item_action_weight", type="array"),
    Column(name="item_target_watchtime", type="array"),
    Column(name="item_query_time", type="array"),
]
schema = Schema(columns=columns)
odps.create_table(TABLE_NAME, schema)
print(f"表 {TABLE_NAME} 创建成功")

# ========== 4. 读取 Parquet 并上传数据 ==========
PARQUET_PATH = "kuairand-27k-train-0.parquet"
df = pd.read_parquet(PARQUET_PATH)
print(f"Parquet 读取完成,共 {len(df)} 行")

# 将 numpy 数组转为 Python list(PyODPS 要求原生 Python 类型)
array_cols = ["video_id", "action_timestamp", "action_weight", "watch_time",
              "item_video_id", "item_action_weight", "item_target_watchtime", "item_query_time"]
for col in array_cols:
    df[col] = df[col].apply(lambda x: list(x) if isinstance(x, np.ndarray) else (x if isinstance(x, list) else []))

# 标量列确保为 Python int
scalar_cols = ["user_id", "user_active_degree", "follow_user_num_range",
               "fans_user_num_range", "friend_user_num_range", "register_days_range"]
for col in scalar_cols:
    df[col] = df[col].astype(int)

# 使用 Tunnel 上传
table = odps.get_table(TABLE_NAME)
print(f"开始上传数据(共 {len(df)} 行)...")
with table.open_writer() as writer:
    records = []
    for idx, row in df.iterrows():
        record = [
            int(row["user_id"]),
            int(row["user_active_degree"]),
            int(row["follow_user_num_range"]),
            int(row["fans_user_num_range"]),
            int(row["friend_user_num_range"]),
            int(row["register_days_range"]),
            list(row["video_id"]),
            list(row["action_timestamp"]),
            list(row["action_weight"]),
            list(row["watch_time"]),
            list(row["item_video_id"]),
            list(row["item_action_weight"]),
            list(row["item_target_watchtime"]),
            list(row["item_query_time"]),
        ]
        records.append(table.new_record(record))
        if (idx + 1) % 100 == 0:
            print(f"已处理 {idx + 1}/{len(df)} 行")
    writer.write(records)
print(f"数据上传完成!共上传 {len(records)} 行到 {TABLE_NAME}")

5. 执行

请先配置好环境变量,然后执行脚本:

source env.sh  # 设置 ACCESS_ID、ACCESS_KEY 环境变量
python upload_to_odps.py

如果一切顺利,将看到类似如下的输出信息:

project_name: pairec
endpoint: https://service.cn.maxcompute.aliyun.com/api
MaxCompute 连接成功
表 pairec_kuairand_train 创建成功
Parquet 读取完成,共 1257 行
开始上传数据(共 1257 行)...
已处理 100/1257 行
...
已处理 1200/1257 行
数据上传完成!共上传 1257 行到 pairec_kuairand_train

出现上述日志即表示操作成功,数据已安全地存储在 MaxCompute 中。

6. 常见问题与注意事项

Endpoint 与 Project 不匹配

MaxCompute 的 Project 需绑定到特定 Region 对应的 Endpoint。若遇到 Project not found 错误,通常是因为 Endpoint 配置有误。请确认 Project 所在 Region,然后更换正确的 Endpoint。例如,公网华东2的 Endpoint 为 service.cn.maxcompute.aliyun.com

Endpoint 适用场景
service.cn.maxcompute.aliyun.com 公网(华东2)

AccessKey 与 Endpoint 网络域不匹配

公网 AccessKey 只能搭配公网 Endpoint,内网 AccessKey 只能搭配内网 Endpoint,混用会导致 AccessKeyIdNotFound 错误。该问题较为常见,请务必区分清楚。

PyODPS 要求原生 Python 类型

使用 open_writer() 写入数据时,PyODPS 不兼容 numpy 的 int64ndarray,必须转换为 Python 原生的 intlist。上面脚本中已包含转换逻辑,若自行编写代码,切勿遗漏此步骤,否则会直接抛出类型错误。

批量上传性能

如果数据量极大(例如百万行以上),建议采用分批写入方式,避免单次 writer.write() 占用过多内存。例如每批 5000 条:

BATCH_SIZE = 5000
with table.open_writer() as writer:
    batch = []
    for idx, row in df.iterrows():
        batch.append(table.new_record([...]))
        if len(batch) >= BATCH_SIZE:
            writer.write(batch)
            batch = []
    if batch:
        writer.write(batch)

这种方式既节省内存,又能保持较高写入速度,尤其适用于生产环境。

来源:https://developer.aliyun.com/article/1740572
上一篇阿里云Qwen3.7-Plus模型能力优势适用场景与订阅计划 下一篇阿里社区OpenClaw安装教程
本站内容用于信息整理与展示,如有侵权或内容问题请及时联系处理。

相关推荐

补充同频道和同主题内容,方便继续浏览更多相关内容。

同类最新

继续查看同栏目最近更新的文章。

更多
企业组织级AI赋能具体实施方法
AI教程 · 2026-06-30

企业组织级AI赋能具体实施方法

前段时间收到一位读者的留言,希望聊聊企业级、组织级的AI赋能究竟该怎么落地。巧的是,前几天刚看到一份咨询调研机构的数据:对近一两年所有企业级AI赋能项目的统计显示,超过90%的甲方企业认为,AI赋能在核心业务价值链上没有发挥任何实质性作用。除了AI辅助办公、企业智能知识库这类边缘应用起到了一些辅助效

Scrapy与Redis分布式架构的日本电商多平台数据聚合系统
AI教程 · 2026-06-30

Scrapy与Redis分布式架构的日本电商多平台数据聚合系统

从事日本电商数据聚合工作时,最大的难点在于要同时应对雅虎拍卖、煤炉(Mercari)、乐天和亚马逊日本站等截然不同的平台。以往使用单机爬虫,经常出现运行中崩溃的情况——单点故障、带宽利用率不足、数据存储混乱,这三大痛点令人困扰。 本文分享一套基于Scrapy + Redis的分布式爬虫方案,专门解决

详细PuTTY 0.81安装教程 SSH远程连接与自定义路径设置
AI教程 · 2026-06-30

详细PuTTY 0.81安装教程 SSH远程连接与自定义路径设置

​ PuTTY(简称PT)是一款轻量级开源SSH Telnet客户端,凭借简洁高效的特性,多年来始终是系统管理员与开发者进行远程连接的首选利器。本教程将详细介绍PuTTY 0 81版本的完整安装过程,并指导您自定义安装路径,以便更灵活地管理SSH远程连接工具。 安装准备 首先需要说明的是,整个安装流

在线教育系统必备功能:直播课堂与题库考试架构
AI教程 · 2026-06-30

在线教育系统必备功能:直播课堂与题库考试架构

很多人一想到做在线教育系统,第一反应往往是先把直播间和课程播放器搭起来,觉得“能看课”就万事大吉了。真到落地那天才发现,系统能不能顺滑跑起来,关键全藏在那些细节里——课程怎么组织、学习进度怎么记、考试怎么处理、后台怎么管得住。前端看起来就几个页面,后端其实是一整条业务链路。不管你是要做在线教育APP

ZStack源码级AI诊断套件让故障排查秒出答案
AI教程 · 2026-06-30

ZStack源码级AI诊断套件让故障排查秒出答案

一次故障排查,到底要花多少时间? 运维人员处理私有云、虚拟化平台的问题,流程大致都是这样:先翻日志看现象,再去文档里找对应机制,然后搜社区有没有类似案例,最后综合判断给出答复。简单问题半小时,复杂问题可能要跨天——而这些时间里,大部分精力耗在了“找信息”而不是“做决策”上。 类似的问题,也许每天都在