高级 HPC
OGGM的MPI模块(mpi.py,约100行)是整个并行计算体系的顶层支柱。虽然在代码体量上远小于workflow.py,但它实现了一种经典且高效的master-worker并行模式,使得OGGM能够跨越多台计算节点的边界,处理数万条冰川。
OGGM的MPI设计遵循"够用即可"的原则。它不试图实现复杂的消息传递模式或分布式数据结构,而是聚焦于一个核心任务:将冰川处理任务高效地分发到多台节点上执行。这种设计让核心逻辑保持精简,但不适合更复杂的并行模式(如分布式参数优化)。对于OGGM的主要计算负载——独立冰川的并行处理——master-worker模式已经足够。
MPI在OGGM中的定位是替代multiprocessing提供多节点能力。在语义上,两者等价:都是将execute_entity_task的工作负载分发到多个worker上。区别在于multiprocessing受限于单节点的核心数,而MPI可以扩展到集群的数百核。
OGGM在包导入时就检测MPI的可用性(oggm/__init__.py第17行):
try:
from oggm.mpi import _init_oggm_mpi
_init_oggm_mpi()
except ImportError:
_have_ogmpi = False
_init_oggm_mpi函数定义在mpi.py中:
def _init_oggm_mpi():
"""Initialize MPI for OGGM if available.
Called at package import time. Sets global flags that
workflow.py uses to decide between MPI, multiprocessing,
and sequential execution.
"""
global _have_ogmpi, MPI, comm, rank, size
try:
from mpi4py import MPI as _MPI
MPI = _MPI
except ImportError:
_have_ogmpi = False
return
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if size > 1:
_have_ogmpi = True
else:
_have_ogmpi = False
这段逻辑体现了OGGM对小规模使用的友好态度:只有真正启动了多进程MPI运行时(size > 1),才启用MPI模式。如果用户只是单进程运行(size == 1),即使安装了mpi4py,也不使用MPI路径。这避免了单进程情况下的通信开销。
在高性能计算环境中,mpi4py本身会读取SLURM提供的环境变量(如SLURM_JOBID、SLURM_NTASKS)来配置MPI通信器。OGGM不需要直接解析这些变量——它完全依赖mpi4py的抽象层。这使得OGGM在其他作业调度系统(如PBS/Torque、LSF)中也能正常工作。
这是MPI模块的核心函数,实现了master-worker工作窃取(work stealing)模式:
def mpi_master_spin_tasks(gdirs, task_list):
"""Master distributes tasks, workers execute and return results.
This is the MPI counterpart of pool.map() in the multiprocessing
path. The master (rank 0) sends work items to workers as they
become available, collects results, and assembles the final
ordered output.
Parameters
----------
gdirs : list of GlacierDirectory
All glacier directories to process.
task_list : list of (callable, dict) tuples
Normalized task-keyword pairs for each gdir.
Returns
-------
list
Results in the same order as gdirs.
"""
if rank == 0:
# Master: distribute and collect
results = [None] * len(gdirs)
next_task = 0
pending = {}
active_workers = size - 1 # rank 1..N-1 are workers
# Send initial batch to kickstart workers
for i in range(1, min(size, len(gdirs) + 1)):
if next_task < len(gdirs):
comm.send((next_task, gdirs[next_task],
task_list[next_task]),
dest=i, tag=MPI_TAG_WORK)
pending[i] = next_task
next_task += 1
# Spin loop: receive results, send new work
while next_task < len(gdirs) or pending:
status = MPI.Status()
result = comm.recv(
source=MPI.ANY_SOURCE,
tag=MPI.ANY_TAG,
status=status
)
worker = status.Get_source()
task_idx = pending.pop(worker)
results[task_idx] = result
if next_task < len(gdirs):
comm.send((next_task, gdirs[next_task],
task_list[next_task]),
dest=worker, tag=MPI_TAG_WORK)
pending[worker] = next_task
next_task += 1
return results
else:
# Worker: receive-execute-return loop
while True:
task_idx, gdir, (task, kwargs) = comm.recv(
source=0, tag=MPI_TAG_WORK)
result = task(gdir, **kwargs)
comm.send(result, dest=0, tag=MPI_TAG_RESULT)
在异构集群中(不同节点有不同CPU型号或者有不同程度的背景负载),各个worker的处理速度可能差异很大。"工作窃取"意味着master不会在开始时静态分配等量的工作给每个worker,而是在worker完成当前任务后立即发送下一个。这种动态负载均衡策略确保了所有worker始终保持忙碌,总体运行时间由最慢任务的平均值而非最慢worker决定。
OGGM使用简单的整数标记区分消息类型:
MPI_TAG_WORK = 1 # master发送给worker的工作项目 MPI_TAG_RESULT = 2 # worker返回给master的计算结果
这最小化了MPI通信器的复杂性。
在execute_entity_task中的三项选择(参考第15章)体现了OGGM的并行分层策略:
_have_ogmpi为True,使用MPI。这是唯一的多节点方案。这个优先级排序反映了OGGM的设计理念:在可用性允许的情况下使用最高级别的并行方案。用户不需要手动指定并行后端——系统自动检测并选择最佳路径。
| 角色 | Rank范围 | 职责 | 代码路径 |
|---|---|---|---|
| Master | 0 | 任务分发、结果收集、进度监控、日志聚合 | mpi_master_spin_tasks的if rank == 0分支 |
| Worker | 1至N-1 | 接收任务、执行单冰川处理、返回结果 | mpi_master_spin_tasks的else分支 |
Master本身不执行冰川处理任务——它纯粹是协调者。这个设计选择是刻意的:在master-worker模式中,让master也参与计算会引入额外的复杂性(如master必须在处理任务的同时处理来自worker的结果),而master-worker模式通常受限于master的处理能力。将master专用于协调使通信逻辑最简化。
对于少量冰川(例如少于worker数量),可以将master配置为不参与分发(通过调整初始批次大小),但这在当前的OGGM实现中未采用,因为常见的使用场景是gdirs数量远大于worker数量。
MPI行为通过params.cfg中的use_mpi参数控制。当设为False时,即使在多进程mpirun环境中也回退到multiprocessing或顺序模式。这在调试时非常有用——可以先用顺序模式验证工作流正确性,然后启用MPI进行大规模计算。
# params.cfg 相关内容 use_mpi: bool = True # 是否使用MPI(如果可用) mp_processes: int = -1 # multiprocessing进程数(-1 = CPU核心数)
mp_processes在MPI模式下通常设为1(每个MPI进程内部不再嵌套多进程),以避免进程/线程嵌套导致的资源竞争和调度复杂性。但在某些配置中(如每个MPI进程对应一个NUMA节点),可以设小值以处理I/O密集型任务。
MPI运行中,每个进程独立写日志导致的输出混乱是常见的调试痛点。OGGM通过rank-aware日志抑制来解决:
# 只允许rank 0输出INFO及以上的日志
# 其他rank仅输出WARNING及以上的日志
import logging
if rank != 0:
logging.getLogger('oggm').setLevel(logging.WARNING)
这确保了关键信息集中在master的输出中,而每个worker只在遇到问题时才会发出警告。
worker中抛出的异常必须传播回master。OGGM的worker在捕获异常后将其序列化为错误令牌并通过MPI发送给master,master在收到错误令牌后中止所有待处理任务并报告问题。
mpirun --output-filename logfile将每个rank的输出重定向到独立文件,然后逐一检查。
在MPI并行计算中,通信开销是决定可扩展性的关键因素。OGGM的冰川处理任务具有极高的计算-通信比:每个任务涉及数秒至数分钟的浮点运算,而通信仅涉及序列化一个结果对象(通常小于1MB)。这种"粗粒度"并行模式非常适合master-worker架构。
但在以下情况下通信可能成为瓶颈:
冰川处理的执行时间随冰川大小和复杂度变化显著——大型冰川的处理时间是小冰川的100-1000倍。虽然在gdir列表末尾集中出现大型冰川会导致"长尾"延迟,但OGGM的工作窃取机制会部分缓解这一问题:处理完小冰川的worker会立即接管下一个大任务。
最佳实践是按面积降序排列gdirs——先分配大冰川,最后让小冰川填充空闲时间。这可以最小化所有worker的总等待时间。
在理想情况下(所有gdir处理时间相等),OGGM的MPI伸缩性接近线性——N个worker将运行时间缩短到约1/N。实际上,由于通信开销和负载不均衡,伸缩性在近百个worker时开始退化。但给定的典型RGI区域(数千条冰川),20-50个worker是效率最佳的区间。
#!/bin/bash #SBATCH --job-name=oggm_mpi #SBATCH --output=oggm_%j.out #SBATCH --error=oggm_%j.err #SBATCH --nodes=4 #SBATCH --ntasks-per-node=32 #SBATCH --time=24:00:00 #SBATCH --partition=normal module load python/3.10 module load mpi4py # 确保每个MPI进程使用1个OpenMP线程 export OMP_NUM_THREADS=1 mpirun -np $SLURM_NTASKS python -c " from oggm import workflow, tasks, cfg cfg.initialize() cfg.PARAMS['mp_processes'] = 1 # 不在MPI内部嵌套多进程 # 执行工作流 gdirs = workflow.init_glacier_directories(rgi_df) workflow.gis_prepro_tasks(gdirs) workflow.climate_tasks(gdirs) workflow.inversion_tasks(gdirs) "
对于全球19个RGI区域的处理,有两种并行模式:
| 策略 | 并行粒度 | 优点 | 缺点 |
|---|---|---|---|
| MPI跨区域 | 单个gdir | 完美负载均衡 | 需要将所有数据放在共享文件系统中 |
| Job Array分区域 | 整个区域 | 无需共享文件系统,容错性更好 | 负载不均衡(区域大小差异大) |
| 混合方案 | 区域内MPI + 区域间Job Array | 最佳性能 | 调度复杂度最高 |
混合方案(区域间用job array + 区域内用MPI)常用于大规模预处理。对于大型区域(01 Alaska、02 Western Canada),可部署4-8个节点的MPI;对于小型区域(如区域12 Caucasus),单个节点通常即可。
OGGM的MPI模块代码量不大(约100行),但覆盖了区域冰川批处理最需要的并行模式:冰川级任务相互独立,适合master-worker式分发;当MPI不可用时,又可以回退到multiprocessing或顺序执行。这一设计使同一套工作流能在笔记本电脑和计算集群之间迁移。