当前市面上的关系型数据库种类繁多,尽管大部分产品都采用SQL作为查询语言,但不同数据库在语法细节上存在明显差异。企业在进行ETL作业时,大量使用SQL脚本,而这些脚本之间往往存在复杂的依赖关系,目前主要依赖人工识别与维护。此外,在分析SQL脚本时,常见做法是用文本编辑器打开整个文件,在杂乱的“屎山”代码中逐一排查。另一个痛点在于,一个SQL脚本文件通常被视为最小的运行单元,只能按顺序从上到下执行,整体执行效率较低。
由此引出了几个核心诉求:能否实现跨不同数据库的SQL语法兼容解析?能否自动检测脚本之间的依赖关系?在分析过程中,能否先将脚本拆解成独立的代码块,再以层级化、图形化的方式呈现,从而大幅提升可读性?更进一步,对于大型脚本,能否自动识别出可以并行执行的步骤,从而显著提高执行效率?再延伸一下,如果上游依赖仅有局部变动,能否只执行大脚本中的某个“分支”步骤,避免全量重复运行?
解决方案
为了应对上述挑战,我们可以借助ZGLanguage为各数据库配置相应的SQL语法规则。配置文件为 MARK_SQLS.syn 。目前已经完成了对Hive、Greenplum、DWS、Oracle、MySQL、Hana等主流数据库主要SQL语法的配置。这套方案不仅能识别“增删改查”等基础语法,还能解析判断、循环、游标、赋值等高级语法。
在支持跨数据库解析的同时,ZGLanguage还提供了SQL代码拆解与标注功能。通过该功能,我们可以对任意SQL脚本展开分析。以下是一个基于Python实现的拆解分析案例(split_etl.py),它能够将标注后的结果进一步解析,最终输出结构化的JSON数据。
{
"SQL_FILE_NAME" : "SQL脚本文件名",
"DEPEND_TABLES" : "SQL脚本依赖表清单",
"CREATE_TABLES" : "SQL脚本创建表清单",
"UPDATE_INSERT_TABLES" : "SQL脚本更改表清单",
"SQL_SPLIT_INFO" : [
{
"SQL_SEQ" : "序号:1,2,3...",
"SQL_TYPE" : "类型:__CREATE_TABLE_SELECT__, __INSERT_TABLE_SELECT__...",
"SQL_CODE" : "完整代码",
"LVL_STRUCT" : "代码的层级结构",
"LVL_CODE" : "层级结构的代码",
"TAR_TAB" : "目标表",
"SRC_TAB" : "来源表"
}
],
"SQL_DEPEND_LVL" : "SQL脚本代码段步骤(序号)依赖层级"
}
这些参数的具体用途是什么?DEPEND_TABLES、CREATE_TABLES、UPDATE_INSERT_TABLES 可用于建立各脚本之间的依赖关系,辅助调度工具实现依赖的自动配置。SQL_SPLIT_INFO 支持对脚本进行层级化、图形化展示,显著提升可读性。SQL_DEPEND_LVL 则能辅助实现大脚本中代码段的并行执行,以及选择性地运行“分支”步骤。
一个完整的案例
下面通过一个实际的存储过程来演示完整流程。假设我们有一个Oracle存储过程 proc_test.prc:
CREATE OR REPLACE PROCEDURE PROC_F_CWWS_LOAN
(
P_AS_OF_DATE IN DATE default date'20200101',
RET_FLG OUT VARCHAR2,
RET_MSG OUT VARCHAR2
) IS
/******************************************************************************
功能描述:xxxx业务数据ETL处理
源 表:
目 标 表:MA_F_LOAN
备 注:
******************************************************************************/
-- 声明变量并初始化
V_COUNT NUMBER := 0;
V_PROC_NAME VARCHAR2(200) := 'PROC_F_CWWS_LOAN';
V_PROC_DESC VARCHAR2(100) := 'xxxx业务数据ETL处理';
V_P_FREQ VARCHAR2(4) := '';
BEGIN
--设置会话日期格式
EXECUTE IMMEDIATE ' ALTER SESSION SET NLS_DATE_FORMAT = ''YYYY-MM-DD''';
--查询参数表中,该程序对应的频率值
SELECT P_FREQ
INTO V_P_FREQ
FROM ETL_PROC_STATUS_DEF
WHERE PROC_NAME = V_PROC_NAME;
--判断是调度频率
IF P_AS_OF_DATE = FUNC_GET_FREQ_DAYS(P_AS_OF_DATE, V_P_FREQ) THEN
--调用分区维护程序
ETL.ETL_ADD_PARTITION('MA_F_LOAN', P_AS_OF_DATE, 'ETL');
--删除取上下次支付日临时表
DELETE TMP_XD_LAST_PAYDATE;
DELETE TMP_XD_NEXT_PAYDATE;
COMMIT;
--从还款计划表中取每笔账户最近一次小于等于数据日期还款日,作为上次还款日
INSERT INTO ETL.TMP_XD_LAST_PAYDATE
(OBJECTNO, LAST_PAYDATE)
SELECT OBJECTNO, LAST_PAYDATE
FROM (SELECT T.OBJECTNO,
MAX(TO_DATE(PAYDATE, 'YYYY-MM-DD')) LAST_PAYDATE
FROM NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE T
WHERE T.AS_OF_DATE = P_AS_OF_DATE
AND T.SEQID <> '999'
AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') < P_AS_OF_DATE
GROUP BY T.OBJECTNO);
--从还款计划表中取每笔账户最近一次大于数据日期还款日,作为下次还款日
INSERT INTO ETL.TMP_XD_NEXT_PAYDATE
(OBJECTNO, NEXT_PAYDATE)
SELECT OBJECTNO, NEXT_PAYDATE
FROM (SELECT T.OBJECTNO,
MIN(TO_DATE(PAYDATE, 'YYYY-MM-DD')) NEXT_PAYDATE
FROM NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE T
WHERE T.AS_OF_DATE = P_AS_OF_DATE
AND T.SEQID <> '999'
AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') >= P_AS_OF_DATE
GROUP BY T.OBJECTNO);
COMMIT;
MERGE INTO ETL.MA_F_LOAN A
USING (SELECT /*+PARALLEL(8)*/
T.ACCOUNT_NUMBER, T.GL_ACCOUNT_ID, T.INT_GL_ACCOUNT_ID
FROM ETL.MA_F_LOAN T
INNER JOIN ETL.MA_D_GL_SUBJECT T1
ON T.INT_GL_ACCOUNT_ID = T1.SUBJECT_NO3
AND T1.SUBJECT_NAME3 LIKE '%已减值%'
AND T1.AS_OF_DATE = P_AS_OF_DATE
WHERE T.AS_OF_DATE = P_AS_OF_DATE
AND T.ACCOUNT_NUMBER IN
(SELECT ACCOUNT_NUMBER
FROM (SELECT /*+PARALLEL(8)*/
T2.ACCOUNT_NUMBER, COUNT(1)
FROM ETL.MA_F_LOAN T2
WHERE T2.AS_OF_DATE = P_AS_OF_DATE
GROUP BY T2.ACCOUNT_NUMBER
HA VING COUNT(1) > 1))) B
ON (A.ACCOUNT_NUMBER = B.ACCOUNT_NUMBER AND A.AS_OF_DATE = P_AS_OF_DATE AND A.GL_ACCOUNT_ID = B.GL_ACCOUNT_ID AND A.INT_GL_ACCOUNT_ID = B.INT_GL_ACCOUNT_ID)
WHEN MATCHED THEN
UPDATE SET A.CUR_BOOK_BAL = 0, A.OVERDUE_BAL = 0;
--更新逾期xxxx上下次重订价日及重订价频率为起息日、到期日
UPDATE MA_F_LOAN A
SET LAST_REPRICE_DATE = A.ORIGINATION_DATE,
NEXT_REPRICE_DATE = A.MATURITY_DATE,
REPRICE_FREQ = A.ORG_TERM,
REPRICE_FREQ_MULT = A.ORG_TERM_MULT,
ADJUSTABLE_TYPE_CD = 0
WHERE A.MATURITY_DATE <= P_AS_OF_DATE
AND A.CUR_BOOK_BAL <> 0;
END IF;
INSERT INTO M_RUNLOG VALUES (SYSDATE, V_PROC_NAME, 'it is 10');
COMMIT;
EXCEPTION
WHEN OTHERS THEN
--写入异常日志
call ETL.PROC_ETL_LOG(P_AS_OF_DATE,V_PROC_NAME,V_PROC_DESC,V_COUNT,-1,SQLCODE,SQLERRM);
RET_MSG := SQLCODE || ':' || SQLERRM;
END;
/
首先,执行标注命令:
ZGLanguage -e SPLIT_ETL/MARK_SQLS.syn -t proc_test.prc -o mark_sql.zgl > log.log
执行后,会得到一个标注结果文件 mark_sql.zgl。该文件的内容如下所示,每个代码段都被添加了类型标记:
__CREATE_PROCEDURE_HEAD__{:::}||PROC_F_CWWS_LOAN{:::}
CREATE OR REPLACE PROCEDURE PROC_F_CWWS_LOAN
(
...
) IS
...
BEGIN
{;;;}
__EXECUTE__{:::}
EXECUTE IMMEDIATE ' ALTER SESSION SET NLS_DATE_FORMAT = ''YYYY-MM-DD''
{;;;}
__SELECT_INTO__{:::}V_P_FREQ{:::}
SELECT P_FREQ
INTO V_P_FREQ
FROM {###}{###}ETL_PROC_STATUS_DEF{###} {###}
WHERE PROC_NAME = V_PROC_NAME
{;;;}
__IF__{:::}
IF P_AS_OF_DATE = FUNC_GET_FREQ_DAYS(P_AS_OF_DATE, V_P_FREQ) THEN
{;;;}
__RUN_PROC_FUN__{:::}
ETL.ETL_ADD_PARTITION( 'MA_F_LOAN' , P_AS_OF_DATE , 'ETL' )
{;;;}
__DELETE_TABLE__{:::}||TMP_XD_LAST_PAYDATE{:::}
DELETE TMP_XD_LAST_PAYDATE
{;;;}
__DELETE_TABLE__{:::}||TMP_XD_NEXT_PAYDATE{:::}
DELETE TMP_XD_NEXT_PAYDATE
{;;;}
... (后续更多标注结果) ...
接下来,使用Python对上述标注代码进行解析,即可获取一个结构化的JSON结果。该JSON包含了脚本的文件名、依赖表、创建表、修改表,以及每个SQL代码段的详细拆解信息。以本案例为例,解析后的JSON结果如下:
{
"SQL_FILE_NAME": "proc_test.prc",
"DEPEND_TABLES": [
{"SCH": "NYBDP", "NAME": "O_CWWS_ACCT_PAYMENT_SCHEDULE"},
{"SCH": "ETL", "NAME": "MA_D_GL_SUBJECT"},
{"SCH": "ETL", "NAME": "MA_F_LOAN"}
],
"CREATE_TABLES": [],
"UPDATE_INSERT_TABLES": [
{"SCH": "ETL", "NAME": "TMP_XD_LAST_PAYDATE"},
{"SCH": "ETL", "NAME": "TMP_XD_NEXT_PAYDATE"},
{"SCH": "ETL", "NAME": "MA_F_LOAN"},
{"SCH": "", "NAME": "M_RUNLOG"}
],
"SQL_SPLIT_INFO": [
{"SQL_SEQ": 1, "SQL_TYPE": "__CREATE_PROCEDURE_HEAD__"},
{"SQL_SEQ": 2, "SQL_TYPE": "__EXECUTE__"},
{"SQL_SEQ": 3, "SQL_TYPE": "__SELECT_INTO__"},
{"SQL_SEQ": 4, "SQL_TYPE": "__IF__"},
{"SQL_SEQ": 5, "SQL_TYPE": "__RUN_PROC_FUN__"},
{
"SQL_SEQ": 6, "SQL_TYPE": "__DELETE_TABLE__",
"SQL_CODE": "DELETE TMP_XD_LAST_PAYDATE",
"LVL_STRUCT": {"TMP_XD_LAST_PAYDATE": []},
"LVL_CODE": {"TMP_XD_LAST_PAYDATE": "DELETE TMP_XD_LAST_PAYDATE"},
"TAR_TAB": [{"SCH": "", "NAME": "TMP_XD_LAST_PAYDATE"}],
"SRC_TAB": []
},
{
"SQL_SEQ": 7, "SQL_TYPE": "__DELETE_TABLE__",
"SQL_CODE": "DELETE TMP_XD_NEXT_PAYDATE",
"LVL_STRUCT": {"TMP_XD_NEXT_PAYDATE": []},
"LVL_CODE": {"TMP_XD_NEXT_PAYDATE": "DELETE TMP_XD_NEXT_PAYDATE"},
"TAR_TAB": [{"SCH": "", "NAME": "TMP_XD_NEXT_PAYDATE"}],
"SRC_TAB": []
},
{"SQL_SEQ": 8, "SQL_TYPE": "__COMMIT__"},
{
"SQL_SEQ": 9, "SQL_TYPE": "__INSERT_TABLE_SELECT__",
"SQL_CODE": "INSERT INTO ETL.TMP_XD_LAST_PAYDATE\n ...",
"LVL_STRUCT": {"__SUB_SELECT_1__": ["NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE"], "ETL.TMP_XD_LAST_PAYDATE": ["__SUB_SELECT_1__"]},
"LVL_CODE": {"__SUB_SELECT_1__": "...", "ETL.TMP_XD_LAST_PAYDATE": "INSERT INTO ETL.TMP_XD_LAST_PAYDATE ..."},
"TAR_TAB": [{"SCH": "ETL", "NAME": "TMP_XD_LAST_PAYDATE"}],
"SRC_TAB": [{"SCH": "NYBDP", "NAME": "O_CWWS_ACCT_PAYMENT_SCHEDULE"}]
},
{
"SQL_SEQ": 10, "SQL_TYPE": "__INSERT_TABLE_SELECT__",
"SQL_CODE": "INSERT INTO ETL.TMP_XD_NEXT_PAYDATE\n ...",
"LVL_STRUCT": {"__SUB_SELECT_2__": ["NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE"], "ETL.TMP_XD_NEXT_PAYDATE": ["__SUB_SELECT_2__"]},
"LVL_CODE": {"__SUB_SELECT_2__": "...", "ETL.TMP_XD_NEXT_PAYDATE": "INSERT INTO ETL.TMP_XD_NEXT_PAYDATE ..."},
"TAR_TAB": [{"SCH": "ETL", "NAME": "TMP_XD_NEXT_PAYDATE"}],
"SRC_TAB": [{"SCH": "NYBDP", "NAME": "O_CWWS_ACCT_PAYMENT_SCHEDULE"}]
},
{"SQL_SEQ": 11, "SQL_TYPE": "__COMMIT__"},
{
"SQL_SEQ": 12, "SQL_TYPE": "__MERGE_TABLE__",
"SQL_CODE": "MERGE INTO ETL.MA_F_LOAN A\n USING ...",
"LVL_STRUCT": {"__SUB_SELECT_3__": ["ETL.MA_F_LOAN"], "__SUB_SELECT_4__": ["ETL.MA_F_LOAN", "ETL.MA_D_GL_SUBJECT", "__SUB_SELECT_3__"], "ETL.MA_F_LOAN": ["__SUB_SELECT_4__"]},
"LVL_CODE": { "...": "...", "ETL.MA_F_LOAN": "MERGE INTO ETL.MA_F_LOAN A\n USING __SUB_SELECT_4__ B\n ON (..."}
},
... (后续更多拆解信息)
],
"SQL_DEPEND_LVL": [
{6: {}, 7: {}, 12: {}, 15: {}},
{9: {6}, 10: {7}, 13: {12}}
]
}
这还没有结束。我们可以将前面的两步(执行标注命令、解析标注结果)封装为一个Python函数,形成一个可直接使用的工具。下面给出一个完整的实现(split_etl.py),它主要完成三件事:
- 调用ZGLanguage进行语法标注;
- 解析标注文件,生成结构化的拆解信息;
- 基于拆解信息,自动计算代码段的依赖层级,从而实现并行和分支分析。
# -*- coding: utf-8 -*-
import sys
import subprocess
import json
###############################################################
id_new_select = 0
def make_sql_struct(mark_sql, tartab) :
# ... (函数内部实现,用于解析标注标记,生成层级结构和源/目标表信息)
def split_etl(etl_file) :
# 1. 对SQL文件进行标注转换
run_result = subprocess.run(['ZGLanguage', '-e', 'SPLIT_ETL/MARK_SQLS.syn'
, '-t', etl_file
, '-o', 'mark_sql.zgl']
, capture_output=True
, encoding='utf-8'
, text=True
)
# 2. 初始化结果对象
sql_file_info = {
'SQL_FILE_NAME': etl_file.replace('/', ' ').replace('\\', ' ').split(' ')[-1]
, 'SQL_SPLIT_INFO': []
, 'DEPEND_TABLES': []
, 'CREATE_TABLES': []
, 'UPDATE_INSERT_TABLES': []
, 'SQL_DEPEND_LVL': []
}
# 3. 解析标注文件,生成 SQL_SPLIT_INFO
# ... (解析逻辑)
# 4. 提取 DEPEND_TABLES, CREATE_TABLES, UPDATE_INSERT_TABLES
# ... (提取逻辑)
# 5. 计算 SQL_DEPEND_LVL(依赖层级)
# ... (依赖分析逻辑)
print('##################################################')
print(sql_file_info)
return
if __name__ == "__main__" :
if len(sys.argv) == 1 :
print('Miss sql file !')
sys.exit(-1)
split_etl(sys.argv[1])
封装好之后,执行命令也很简洁:
python split_etl.py proc_test.prc > log.log
最后,通用的SQL语法解析配置文件 MARK_SQLS.syn 内容相对较长,它定义了各个数据库的语法规则。由于篇幅限制,这里不再展开,但它是整个体系的核心,包含了对所有主要SQL语法(CREATE、INSERT、UPDATE、DELETE、MERGE、SELECT,以及存储过程中的判断、循环、游标等)的精细定义。
