阿里云 MaxCompute + DataWorks:企业级大数据平台深度解析
---
摘要
MaxCompute(原名 ODPS)是阿里云自研的 企业级云原生大数据计算平台,承载了整个阿里巴巴集团的数据处理任务——包括双11实时大屏、淘宝推荐引擎、菜鸟物流调度等核心场景。配套的 DataWorks 是一站式数据开发治理平台。本文从存储引擎、计算架构、SQL 优化、实时计算、数据治理五个维度深度剖析,并对比 Snowflake、BigQuery、EMR 等竞品。
---
一、平台架构全景
1.1 MaxCompute 核心架构
`
┌──────────────────────────────────────────────────────────────┐
│ 接入层 (Tunnel) │
│ 数据上传/下载、批量/流式、HTTP/Console/SDK │
└──────────────────────┬───────────────────────────────────────┘
│
┌──────────────────────▼───────────────────────────────────────┐
│ 计算引擎层 │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ SQL 引擎 │ │ MapReduce │ │ Spark / ML │ │
│ │ (必读) │ │ (兼容) │ │ (兼容) │ │
│ └──────┬───────┘ └──────┬───────┘ └────────┬─────────┘ │
│ │ │ │ │
│ ┌──────▼─────────────────▼────────────────────▼─────────┐ │
│ │ FUXI 分布式作业调度系统 │ │
│ │ • 全局资源管理 (Master + Slave) │ │
│ │ • DAG 感知的任务分配 │ │
│ │ • 单集群最大规模:10万核/天(生产在线) │ │
│ └────────────────────────┬───────────────────────────────┘ │
└───────────────────────────┼───────────────────────────────────┘
│
┌───────────────────────────▼───────────────────────────────────┐
│ 分布式文件系统 (盘古 / Pangu) │
│ │
│ • 三副本强一致 + EC 纠删码(节省 50% 存储) │
│ • 列式存储格式 + ORC/Parquet 原生压缩 │
│ • 分级存储:热数据 SSD → 温数据 HDD → 冷数据对象存储 │
│ • 单集群最大容量:EB 级 │
└────────────────────────────────────────────────────────────────┘
`
1.2 DataWorks 数据开发治理平台
DataWorks 是 MaxCompute 的「IDE + 调度 + 治理」层:
`
DataWorks 功能矩阵:
┌─────────────┬──────────────────────────────────┐
│ 模块 │ 能力说明 │
├─────────────┼──────────────────────────────────┤
│ 数据集成 │ 50+ 数据源(MySQL/PostgreSQL/Mongo/│
│ (DataX) │ Kafka/OSS/HDFS/Salesforce...) │
│ │ 批量+增量+实时三种同步模式 │
├─────────────┼──────────────────────────────────┤
│ 数据开发 │ SQL/MR/PySpark/Shell 在线编辑器 │
│ │ 版本管理、代码评审、发布流程 │
├─────────────┼──────────────────────────────────┤
│ 工作流调度 │ DAG 可视化编排 │
│ │ Cron + 事件 + 依赖 三种触发方式 │
├─────────────┼──────────────────────────────────┤
│ 数据质量 │ 表级别/字段级别质量监控 │
│ │ 自定义规则 + 自动告警 │
├─────────────┼──────────────────────────────────┤
│ 数据地图 │ 血缘追踪(表→字段级别) │
│ │ 元数据搜索 + 数据资产大盘 │
├─────────────┼──────────────────────────────────┤
│ 安全中心 │ 数据脱敏、访问审计、数据分级 │
│ │ (L1-L4) 与 GDPR合规 │
└─────────────┴──────────────────────────────────┘
`
---
二、存储引擎深度解析
2.1 盘古文件系统 (Pangu FS)
盘古是阿里云自研的分布式文件系统,是 MaxCompute 和 OSS 等所有存储服务的底座:
`
文件写入流程:
Client │ ├── 1. 写 Meta (NameNode HA × 3) │ 记录文件名、分块信息、副本位置 │ ├── 2. 写 Chunk (ChunkServer × N) │ 每个 Chunk 64 MB, 默认 3 副本 │ 写入策略: 本地优先 → 同机架 → 跨机架 │ Quorum 确认: 2/3 成功即返回 │ └── 3. 返回 Client ACK
与 HDFS 的差异:
• 无单点故障 (NameNode 不是瓶颈—元数据分离)
• EC 纠删码原生集成,非后期添加
• 支持分级存储自动迁移
• 更强的 POSIX 兼容性(支持 Append/随机写)
`
2.2 列式存储与压缩
MaxCompute 默认采用自研列式存储格式,类似 Parquet 但做了大量优化:
`sql
-- 创建表时指定存储和压缩策略
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
action STRING,
action_time TIMESTAMP,
-- 分区键
dt STRING
)
PARTITIONED BY (dt)
STORED AS ALIORC -- 阿里自研列式格式
TBLPROPERTIES (
'transactional' = 'true', -- ACID事务支持
'compression' = 'zstd', -- Zstd 压缩
'bloomfilter.columns' = 'user_id,item_id' -- 布隆过滤器
);
-- 实际压缩效果(测试数据 10 TB 用户行为日志):
-- 原始文本 TSV → 10 TB
-- Gzip 压缩 → 1.5 TB
-- Zstd (ALIORC) → 0.6 TB (压缩率 > 94%)
-- 查询速度: 比 Parquet+Gzip 快 40-60%
`
2.3 表类型选择
| 表类型 | 适用场景 | 事务支持 | 索引 | |------|------|:---:|:---:| | 非分区表 | 小型维表 < 10 GB | ❌ | ❌ | | 分区表 | 按时间/地域分区 | ❌ | ❌ | | 事务表 | CDC/Merge/Delete | ✅ ACID | ✅ 主键+二级索引 | | 聚簇表 | 高频范围查询 | ❌ | ✅ 聚簇索引 | | Hash聚簇表 | 等值Join优化 | ❌ | ✅ Hash分桶 |
---
三、计算引擎深度解析
3.1 SQL 优化器 (Catalyst-based)
MaxCompute SQL 引擎基于 Apache Calcite 深度改造:
`
SQL 执行流程:
SQL Text
│
▼
Parser → AST (抽象语法树)
│
▼
Analyzer → Logical Plan (逻辑执行计划)
│ 绑定表/字段、类型推导、权限检查
│
▼
Optimizer → Optimized Logical Plan
│ ├── RBO (Rule-Based): 谓词下推、列裁剪、常量折叠
│ ├── CBO (Cost-Based): Join Order、Join 算法选择
│ └── HBO (History-Based): 基于历史执行的动态优化
│
▼
CodeGen → Physical Plan + DAG
│ LLVM JIT 编译 SQL 表达式为机器码
│
▼
Fuxi Scheduler → 并行执行
`
3.2 Join 优化的核心策略
`sql
-- MaxCompute 自动选择最优 Join 策略
-- 示例: 100 亿行订单表 JOIN 100 万行用户表
SELECT o.order_id, u.user_name, o.amount FROM orders o JOIN users u ON o.user_id = u.user_id WHERE o.dt = '2025-01-15';
-- MaxCompute 优化器决策: -- 1. 检测 orders 为「大表」, users 为「小表」 -- 2. 选择 MapJoin: 将小表广播到所有 Map 节点 -- 3. 不需要 Shuffle, 成本降低 80%+
-- 强制指定 MapJoin Hint:
SELECT /+ MAPJOIN(u) / o.order_id, u.user_name, o.amount
FROM orders o JOIN users u ON o.user_id = u.user_id;
`
Join 策略选择决策表:
| Join 类型 | 触发条件 | 网络开销 | 适用场景 | |------|------|:---:|------| | MapJoin | 一张表 < 512 MB (可调) | 广播一次 | 大表 × 小维表 | | SortMergeJoin | 两表都大 | 两边 Shuffle | 大表 × 大表 | | BucketMapJoin | 两表同分桶键 | 无 Shuffle | 预分桶的大表 Join | | SkewJoin | 数据严重倾斜 | 动态倾斜处理 | 热点 Key |
3.3 聚合优化
`sql
-- MaxCompute 三阶段聚合(自动)
-- Stage 1: 局部预聚合 (Combine)
-- Stage 2: Shuffle
-- Stage 3: 全局最终聚合
SELECT user_id, COUNT(*), SUM(amount) FROM orders WHERE dt BETWEEN '2025-01-01' AND '2025-01-31' GROUP BY user_id;
-- 手动控制聚合策略: -- 两阶段: 适合 COUNT DISTINCT 场景 -- 三阶段: 适合 SUM/AVG 等高聚合度场景(默认)
SET odps.sql.groupby.mode = 'two-phase'; -- 强制两阶段
-- 解决 COUNT DISTINCT 倾斜:
SET odps.sql.groupby.skewindata = true; -- 自动均衡
`
---
四、实时计算:MaxCompute + Flink
4.1 实时数仓 Lambda 架构
`
┌──────────────────────────┐
│ Kafka / DataHub │
│ (实时消息队列) │
└──────┬───────────┬─────────┘
│ │
┌──────▼───┐ ┌───▼──────────┐
│ Flink SQL│ │ MaxCompute │
│ (实时链路)│ │ (离线链路) │
│ │ │ │
│ • 秒级延迟│ │ • T+1 离线 │
│ • 窗口聚合│ │ • 全量计算 │
│ • 实时看板│ │ • 机器学习 │
└──────┬───┘ └───┬───────────┘
│ │
┌──────▼───────────▼──────────┐
│ 统一数据服务层 │
│ (Hologres / Quick BI) │
│ │
│ 实时 + 离线 统一查询 │
│ OLAP 分析、报表、大屏 │
└──────────────────────────────┘
`
4.2 Flink + MaxCompute 集成
`sql
-- Flink SQL 实时写入 MaxCompute 事务表
CREATE TABLE realtime_orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10,2),
status STRING,
create_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'maxcompute',
'endpoint' = 'https://service.ap-southeast-1.maxcompute.aliyun.com',
'project' = 'my_project',
'table' = 'realtime_orders',
'partition' = 'ds',
'sink.parallelism' = '16'
);
-- 实时数据写入,同时支持离线查询
-- MaxCompute 事务表特性:
-- 1. CDC 语义 (INSERT/UPDATE/DELETE)
-- 2. 实时写入,秒级可见
-- 3. 自动合并小文件
-- 4. Time Travel 查询历史版本
`
---
五、性能基准
5.1 TPC-DS (10 TB)
| 平台 | 完成时间 | 节点数 | 成本 | |------|:---:|:---:|:---:| | MaxCompute | 98 秒 | 256 CU | $12 | | Amazon EMR (Spark) | 220 秒 | 64 × r5.4xlarge | $34 | | Google BigQuery | 142 秒 | On-demand | $50 | | Snowflake (Medium) | 180 秒 | M Cluster | $38 |
5.2 实际生产案例参考
| 场景 | 数据量 | 查询类型 | 响应时间 | |------|:---:|------|:---:| | 用户行为分析 | 50 TB/天 | 多维聚合 | < 3 秒 | | 广告效果归因 | 200 亿行 | 10 表 Join | < 15 秒 | | 日志全文检索 | 5 PB | 关键词搜索 | < 5 秒 | | 机器学习训练 | 100 TB | SQL + Python UDF | < 30 分钟 |
---
六、与竞品对比
| 维度 | MaxCompute | Snowflake | BigQuery | Amazon EMR | |------|:---:|:---:|:---:|:---:| | 存储计算分离 | ✅ | ✅ | ✅ | 可选 | | 弹性伸缩 | 自动 | 手动+自动 | 自动(on-demand) | 手动 | | 实时写入 | ✅ 事务表 | ✅ (Snowpipe) | ✅ (Streaming) | ❌ | | 数据治理 | DataWorks 强 | 弱 (需第三方) | Dataplex | Lake Formation | | AI 集成 | PAI 平台 | Snowpark ML | BigQuery ML | SageMaker | | 最大集群 | 10万核+ (双11验证) | 未公开 | 未公开 | 有限 | | 成本模型 | 按量/包年包月 | 按量+信用 | 按量 | 按量+预留 | | 中国区域 | ⭐⭐⭐⭐⭐ | ❌ | ❌ | ⭐⭐⭐ |
---
七、最佳实践:搭建 ETL 流水线
`sql
-- Step 1: 创建 ODS 层 (原始数据)
CREATE TABLE ods_order_log (
log_id STRING,
order_data STRING, -- JSON 原始数据
dt STRING
) PARTITIONED BY (dt);
-- Step 2: 创建 DWD 层 (明细数据) CREATE TABLE dwd_order_detail ( order_id BIGINT, user_id BIGINT, product_id BIGINT, amount DECIMAL(10,2), status STRING, create_time TIMESTAMP, dt STRING ) PARTITIONED BY (dt) TBLPROPERTIES ('transactional' = 'true');
-- Step 3: ETL SQL (DataWorks 调度,每小时执行) INSERT OVERWRITE TABLE dwd_order_detail PARTITION (dt = '${bizdate}') SELECT GET_JSON_OBJECT(order_data, '$.order_id') AS order_id, GET_JSON_OBJECT(order_data, '$.user_id') AS user_id, GET_JSON_OBJECT(order_data, '$.product_id') AS product_id, CAST(GET_JSON_OBJECT(order_data, '$.amount') AS DECIMAL(10,2)) AS amount, GET_JSON_OBJECT(order_data, '$.status') AS status, CAST(GET_JSON_OBJECT(order_data, '$.create_time') AS TIMESTAMP) AS create_time FROM ods_order_log WHERE dt = '${bizdate}';
-- Step 4: 创建 DWS 层 (汇总数据) CREATE TABLE dws_user_daily_summary ( user_id BIGINT, order_count BIGINT, total_amount DECIMAL(15,2), dt STRING ) PARTITIONED BY (dt);
INSERT OVERWRITE TABLE dws_user_daily_summary PARTITION (dt = '${bizdate}')
SELECT
user_id,
COUNT(DISTINCT order_id),
SUM(amount)
FROM dwd_order_detail
WHERE dt = '${bizdate}'
GROUP BY user_id;
`
---
总结
MaxCompute 的技术壁垒在于 「阿里集团超大规模业务的压舱石」——双11 峰值、淘宝搜索、菜鸟物流这些场景的 PetaByte 级数据处理,是其他云厂商大数据产品不曾面对的极限压力。加上 DataWorks 提供的一站式数据治理能力,使得 MaxCompute 不只是一个 SQL 引擎,而是一整套已经跑通了的企业数据中台方法论的产品化落地。
---
> 下一篇预告:阿里云 PolarDB 云原生数据库技术深度解析 — 从 PolarFS 分布式文件系统到 Serverless 弹性伸缩的全链路剖析。