概述
本文旨在清晰阐述如何将本地的 Parquet 文件(源自 kuairand-27k 推荐系统数据集)完整地导出,并顺利上传至阿里云 MaxCompute 表中。整个流程涵盖数据探查、类型映射、建表以及上传环节,所有代码均已集成,实现一站式操作。

1. 环境准备
依赖安装
首先,需要安装所依赖的工具包:
pip install pandas pyarrow pyodps
其中,pandas 和 pyarrow 用于读取 Parquet 文件,而 pyodps 是阿里云 MaxCompute 的 Python SDK,三者缺一不可。
凭证配置
连接 MaxCompute 需要配置 AccessKey。建议通过环境变量传递,切勿硬编码在代码中——确保安全性。配置命令如下:
export ACCESS_ID="your_access_id"
export ACCESS_KEY="your_access_key"
2. 数据探查:读取 Parquet 文件
在上传之前,有必要先了解数据的结构。通过以下代码读取 Parquet 文件,数据概况便一目了然:
import pandas as pd
df = pd.read_parquet("kuairand-27k-train-0.parquet")
print(f"数据总行数: {len(df)}")
print(f"数据列数: {len(df.columns)}")
print(f"列名列表:")
print(df.columns.tolist())
print(f"数据类型:")
print(df.dtypes)
# 逐列查看前5条数据
for col in df.columns:
print(f"--- {col} (dtype: {df[col].dtype}) ---")
for i in range(min(5, len(df))):
val = df[col].iloc[i]
if isinstance(val, (list,)):
print(f"[{i}] len={len(val)}, 前10个值: {val[:10]}")
else:
print(f"[{i}] {val}")
探查结果
执行代码后,结果显示:该数据集共 1257 行、14 列,各列信息如下:
| 列名 | Python 类型 | 字段说明 |
|---|---|---|
user_id | int64 | 用户唯一标识 |
user_active_degree | int64 | 用户活跃程度 |
follow_user_num_range | int64 | 关注人数所在区间 |
fans_user_num_range | int64 | 粉丝人数所在区间 |
friend_user_num_range | int64 | 好友人数所在区间 |
register_days_range | int64 | 注册天数所在区间 |
video_id | list(int) | 用户历史交互的视频序列 |
action_timestamp | list(int) | 行为发生的时间戳序列 |
action_weight | list(int) | 行为权重序列(bitmask 编码) |
watch_time | list(int) | 观看时长序列 |
item_video_id | list(int) | 候选视频 ID 序列 |
item_action_weight | list(int) | 候选视频的行为标签 |
item_target_watchtime | list(int) | 候选视频的目标观看时长 |
item_query_time | list(int) | 候选请求的时间戳 |
3. 类型映射:Parquet → MaxCompute
数据类型必须正确对应,否则上传时可能出错。映射关系如下:
| Parquet/Python 类型 | MaxCompute 类型 |
|---|---|
int64(标量) | bigint |
list(int)(数组) | array |
映射规则简洁清晰——标量对应 bigint,数组对应 array。
4. 完整上传脚本
接下来是核心部分:完整的上传脚本。请注意,脚本中已处理好类型转换和批量写入逻辑,可直接使用。
#!/usr/bin/env python3
"""将 kuairand-27k-train-0.parquet 数据上传到 MaxCompute 表 pairec_kuairand_train"""
import os
import pandas as pd
import numpy as np
from odps import ODPS
from odps.models import TableSchema as Schema, Column
# ========== 1. 配置连接参数 ==========
project_name = "pairec_mc"
access_id = os.environ["ACCESS_ID"]
access_key = os.environ["ACCESS_KEY"]
endpoint = "https://service.cn.maxcompute.aliyun.com/api"
# ========== 2. 连接 MaxCompute ==========
odps = ODPS(access_id, access_key, project_name, endpoint=endpoint)
print("MaxCompute 连接成功")
# ========== 3. 建表 ==========
TABLE_NAME = "pairec_kuairand_train"
# 先删除已有表(如需覆盖写入)
if odps.exist_table(TABLE_NAME):
print(f"表 {TABLE_NAME} 已存在,正在删除...")
odps.delete_table(TABLE_NAME)
print(f"表 {TABLE_NAME} 已删除")
# 定义表 schema
columns = [
# 用户侧标量字段 (bigint)
Column(name="user_id", type="bigint"),
Column(name="user_active_degree", type="bigint"),
Column(name="follow_user_num_range", type="bigint"),
Column(name="fans_user_num_range", type="bigint"),
Column(name="friend_user_num_range", type="bigint"),
Column(name="register_days_range", type="bigint"),
# 历史序列字段 (array)
Column(name="video_id", type="array"),
Column(name="action_timestamp", type="array"),
Column(name="action_weight", type="array"),
Column(name="watch_time", type="array"),
# 候选物料字段 (array)
Column(name="item_video_id", type="array"),
Column(name="item_action_weight", type="array"),
Column(name="item_target_watchtime", type="array"),
Column(name="item_query_time", type="array"),
]
schema = Schema(columns=columns)
odps.create_table(TABLE_NAME, schema)
print(f"表 {TABLE_NAME} 创建成功")
# ========== 4. 读取 Parquet 并上传数据 ==========
PARQUET_PATH = "kuairand-27k-train-0.parquet"
df = pd.read_parquet(PARQUET_PATH)
print(f"Parquet 读取完成,共 {len(df)} 行")
# 将 numpy 数组转为 Python list(PyODPS 要求原生 Python 类型)
array_cols = ["video_id", "action_timestamp", "action_weight", "watch_time",
"item_video_id", "item_action_weight", "item_target_watchtime", "item_query_time"]
for col in array_cols:
df[col] = df[col].apply(lambda x: list(x) if isinstance(x, np.ndarray) else (x if isinstance(x, list) else []))
# 标量列确保为 Python int
scalar_cols = ["user_id", "user_active_degree", "follow_user_num_range",
"fans_user_num_range", "friend_user_num_range", "register_days_range"]
for col in scalar_cols:
df[col] = df[col].astype(int)
# 使用 Tunnel 上传
table = odps.get_table(TABLE_NAME)
print(f"开始上传数据(共 {len(df)} 行)...")
with table.open_writer() as writer:
records = []
for idx, row in df.iterrows():
record = [
int(row["user_id"]),
int(row["user_active_degree"]),
int(row["follow_user_num_range"]),
int(row["fans_user_num_range"]),
int(row["friend_user_num_range"]),
int(row["register_days_range"]),
list(row["video_id"]),
list(row["action_timestamp"]),
list(row["action_weight"]),
list(row["watch_time"]),
list(row["item_video_id"]),
list(row["item_action_weight"]),
list(row["item_target_watchtime"]),
list(row["item_query_time"]),
]
records.append(table.new_record(record))
if (idx + 1) % 100 == 0:
print(f"已处理 {idx + 1}/{len(df)} 行")
writer.write(records)
print(f"数据上传完成!共上传 {len(records)} 行到 {TABLE_NAME}")
5. 执行
请先配置好环境变量,然后执行脚本:
source env.sh # 设置 ACCESS_ID、ACCESS_KEY 环境变量
python upload_to_odps.py
如果一切顺利,将看到类似如下的输出信息:
project_name: pairec
endpoint: https://service.cn.maxcompute.aliyun.com/api
MaxCompute 连接成功
表 pairec_kuairand_train 创建成功
Parquet 读取完成,共 1257 行
开始上传数据(共 1257 行)...
已处理 100/1257 行
...
已处理 1200/1257 行
数据上传完成!共上传 1257 行到 pairec_kuairand_train
出现上述日志即表示操作成功,数据已安全地存储在 MaxCompute 中。
6. 常见问题与注意事项
Endpoint 与 Project 不匹配
MaxCompute 的 Project 需绑定到特定 Region 对应的 Endpoint。若遇到 Project not found 错误,通常是因为 Endpoint 配置有误。请确认 Project 所在 Region,然后更换正确的 Endpoint。例如,公网华东2的 Endpoint 为 service.cn.maxcompute.aliyun.com:
| Endpoint | 适用场景 |
|---|---|
service.cn.maxcompute.aliyun.com | 公网(华东2) |
AccessKey 与 Endpoint 网络域不匹配
公网 AccessKey 只能搭配公网 Endpoint,内网 AccessKey 只能搭配内网 Endpoint,混用会导致 AccessKeyIdNotFound 错误。该问题较为常见,请务必区分清楚。
PyODPS 要求原生 Python 类型
使用 open_writer() 写入数据时,PyODPS 不兼容 numpy 的 int64 或 ndarray,必须转换为 Python 原生的 int 和 list。上面脚本中已包含转换逻辑,若自行编写代码,切勿遗漏此步骤,否则会直接抛出类型错误。
批量上传性能
如果数据量极大(例如百万行以上),建议采用分批写入方式,避免单次 writer.write() 占用过多内存。例如每批 5000 条:
BATCH_SIZE = 5000
with table.open_writer() as writer:
batch = []
for idx, row in df.iterrows():
batch.append(table.new_record([...]))
if len(batch) >= BATCH_SIZE:
writer.write(batch)
batch = []
if batch:
writer.write(batch)
这种方式既节省内存,又能保持较高写入速度,尤其适用于生产环境。
