在使用 PyODPS DataFrame 进行数据写入操作时,同一个脚本文件中的代码可能会被分发到不同环境执行——例如本地开发机、MaxCompute Executor 或 DataWorks 等平台。这种“多环境执行”特性有时会引发一些意料之外的难题。本文将深入探讨如何精准定位代码的实际运行位置,并针对常见问题提供可落地的解决方案。
概述
先来看一段示例代码:
from odps import ODPS, options
import numpy as np
o = ODPS(access_id, access_key, project, endpoint)
df = o.get_table('pyodps_iris').to_df()
coeffs = [0.1, 0.2, 0.4]
def handle(v):
import numpy as np
return float(np.cosh(v)) * sum(coeffs)
options.df.supersede_libraries = True
val = df.sepal_length.map(handle).sum().execute(libraries=['numpy.zip', 'other.zip'])
print(np.sinh(val))
首先需要明确一点:PyODPS 本质上只是一个 Python 包,并非经过改造的 Python 解释器。它运行在标准 Python 环境中,因此不会自动将单机代码转化为分布式执行——你所编写的每一条语句,其行为都与普通 Python 完全一致,不要指望它自动“施展魔法”。
下面来详细拆解这段代码的执行流程。

上图展示了执行过程中可能涉及的系统组件。代码实际运行的位置已用紫色标注——这些系统均位于 MaxCompute 外部,为方便描述,下文统一称为“本地”。在本地执行的代码包括 handle 函数之外的部分(注意 handle 传入 map 时仅传递了函数本身,尚未实际执行)。因此,这部分代码的行为与普通 Python 无异,导入第三方包时引用的也是本地已安装的包。这就引出了一个关键问题:上面代码中 libraries=['numpy.zip', 'other.zip'] 所引用的 other.zip 如果未在本地安装,那么本地代码中一旦出现 import other,就会立即报错——即便 other.zip 已成功上传至 MaxCompute 资源,本地环境仍然无法找到它。理论上,只要本地代码不涉及 PyODPS 包,就与 PyODPS 无关,需要用户自行排查问题。
那么 handle 函数的情况又如何呢?这就完全不同了。当你将 handle 传入 map 方法时(假设后端为 MaxCompute),它首先会被 cloudpickle 模块打包——闭包与字节码会一并被序列化。随后,PyODPS DataFrame 会利用这些信息生成一个 Python UDF,并提交至 MaxCompute。当作业以 SQL 形式执行时,系统会调用这个 UDF,在 MaxCompute Executor 中完成 unpickle 并执行。因此需要注意以下几点:
handle函数体内部的代码全部在 MaxCompute Executor 中运行,本地不会执行这部分逻辑。handle内部无法使用本地安装的包,只有 Executor 环境中存在的包才能生效。- 上传的第三方包必须与 Executor 的 Python 版本兼容(当前为 Python 2.7,UCS2)。
handle中对外部变量(例如coeffs)的修改不会影响本地的变量值。- 如果在
handle外部导入了包,然后在handle内部调用,可能会引发报错——不同环境的包结构存在差异,cloudpickle 会将本地引用一并携带过去,从而导致错误。因此强烈建议将 import 语句放在handle函数内部。 - 由于使用了 cloudpickle,如果
handle调用了其他文件中的代码,那么这些文件所属的包必须存在于 Executor 环境中。若不想依赖第三方包,建议将所有个人代码集中放置在同一文件中。
上述关于 handle 的说明同样适用于自定义聚合、apply 以及 map_reduce 中调用的自定义方法或 Agg 类。如果使用的后端是 Pandas,那么所有代码都将在本地执行,本地也需要安装好相关依赖包。不过,Pandas 后端通常在调试完成后会切换至 MaxCompute 运行,因此建议在本地安装包的同时,按照 MaxCompute 后端的开发规范来编写代码。
使用第三方包
个人电脑 / 自有服务器在本地使用第三方包 / 其他文件中的代码
在对应的 Python 版本环境下直接安装即可。
DataWorks 中本地使用其他文件中的代码
该部分功能由 DataWorks 平台提供,具体操作请参考 DataWorks 官方文档。
map / apply / map_reduce / 自定义聚合中使用第三方包 / 其他文件中的代码
请参考阿里云官方文档的相关文章(https://yq.aliyun.com/articles/591508)。需要特别补充的是:在 DataWorks 上上传资源后,务必点击“提交”按钮,确保资源已被正确上传至 MaxCompute。如果需要使用自定义的 Numpy 版本,请上传正确版本的 wheel 包,同时配置 odps.df.supersede_libraries = True,并确保上传的 numpy 包名位于 libraries 列表的最前面;如果指定了 options.df.libraries,则 numpy 包名需要位于 options.df.libraries 的最前面。
引用其他 MaxCompute 表中的数据
个人电脑 / 自有服务器在本地访问 MaxCompute 表
如果 Endpoint 可以正常连接,使用 PyODPS 或 DataFrame 进行访问即可。
map / apply / map_reduce / 自定义聚合中访问其他 MaxCompute 表
MaxCompute Executor 环境通常不支持访问 Endpoint 或 Tunnel Endpoint,该环境中也没有 PyODPS 包可用——因此无法直接使用 ODPS 入口对象或 PyODPS DataFrame,也不能从自定义函数外部传入这些对象。如果待访问的表数据量不大,建议将 DataFrame 作为资源传入(具体方法请参考 PyODPS 官方文档:https://pyodps.readthedocs.io/zh_CN/latest/df-element.html#function-resource)。如果数据量较大,改写为 join 操作是更优的选择。
访问其他服务
个人电脑 / 自有服务器在本地访问其他服务
确保本地环境能够正常访问相关服务,生产服务器上的问题可以联系 PE 进行沟通处理。
DataWorks 上的本地代码中访问其他服务
请咨询 DataWorks 技术支持团队获取帮助。
map / apply / map_reduce / 自定义聚合中访问其他服务
参考上文提到的文档启用 Isolation 功能,如果仍然遇到网络报错,请联系 MaxCompute 用户群,由售后工程师协助解决。
