高级 核心架构
本章深入解析OGGM的工作流编排层,涵盖workflow.py(约1336行)、tasks.py和global_tasks.py三个核心模块。工作流管线是OGGM的"调度层"——它定义了冰川模拟从原始数据到最终输出的完整执行路径,并提供了灵活的并行计算基础设施。
| 研究目标 | 最低任务链 | 主要输出 | 优先读 |
|---|---|---|---|
| 估算冰储量 | GIS预处理 → 气候处理 → 物质平衡校准 → 反演 | inversion_output.pkl | 第5-9章 |
| 历史面积/体积变化 | 反演 → 初始化流线 → run_from_climate_data | model_diagnostics.nc | 第10、15、18章 |
| 未来情景 | 历史工作流 → GCM气候处理 → 正演模拟 | 带filesuffix的诊断文件 | 第7、8、16章 |
| 入海型冰川 | 崩解反演 → 含崩解的正演 → 诊断检查 | calving_m3, calving_rate | 第12、13章 |
OGGM的工作流系统遵循"任务即函数"的哲学。每个科学计算步骤被封装为一个独立的Python函数(称为entity task),工作流管线负责将这些函数按序编排并分发到多个冰川实体上。这种设计的核心优势在于:
workflow.py是这套体系的核心文件。它不直接执行冰川物理计算,而是作为总指挥,调用tasks.py中的具体科学任务。理解workflow.py的调用关系,就等于掌握了OGGM全部功能的入口地图。
位于workflow.py第126行的execute_entity_task是整个OGGM中调用频率最高的函数。几乎所有的并行工作流都经过它。其核心职责是:接收一个task函数和一组冰川目录对象(GlacierDirectory, gdir),将task应用到每个gdir上,并以并行方式收集结果。
def execute_entity_task(task, gdirs, **kwargs):
"""Central parallel execution engine.
Parameters
----------
task : callable
The task function to apply. Must accept a gdir as first argument.
gdirs : list of GlacierDirectory
Glacier directories to process.
**kwargs
Additional keyword arguments passed to each task invocation.
Returns
-------
list
Results collected from all invocations, in the same order as gdirs.
"""
task参数有两种传入方式:
| 传入方式 | 示例 | 说明 |
|---|---|---|
| 单个可调用对象 | tasks.compute_centerlines | 最常见的方式,所有gdir共享同一task和kwargs |
| 列表 | [(task1, kw1), (task2, kw2), ...] | 每个gdir可拥有不同的task和/或不同的kwargs |
在函数内部,task首先被规范化为统一的列表格式。单个函数被自动复制为与gdirs等长的列表:
if isinstance(task, (list, tuple)):
# 用户提供了每gdir的(task, kwargs)列表
task_list = task
else:
# 统一task,复制N份
task_list = [(task, kwargs) for _ in gdirs]
这是OGGM增量计算的关键机制。在将任务分发到worker之前,系统检查每个任务是否已在目标gdir上完成。已完成的任务被替换为_dummy_task(一个返回None的空操作),从而避免浪费计算资源。
判断依据是global_tasks.py中定义的任务完成标志文件。每个冰川目录(GlacierDirectory)维护一组标志文件,记录哪些处理步骤已经完成。这种设计与make的增量构建理念一致——只处理需要更新的部分。
Python标准库的functools.partial在multiprocessing环境下有一个致命缺陷:其__reduce__方法不完整,导致pickle序列化失败。OGGM为此设计了一个精巧的替代品:
class _pickle_copier(object):
"""A pickleable alternative to functools.partial.
Multiprocessing needs to pickle the task and its arguments.
functools.partial objects are not reliably pickleable across
all Python versions, so we use this custom class instead.
"""
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
def __call__(self, gdir):
return self.func(gdir, *self.args, **self.kwargs)
这个类的对象可以被pickle(因为func、args、kwargs都是简单属性),从而安全地跨进程边界传递。
_pickle_copier模式可以直接复用。但请注意:被包装的函数必须是模块级的(不能是lambda或嵌套函数),否则仍然无法pickle。
经过规范化和全局任务(global task)检查后,实际执行路径根据可用并行后端选择:
if _have_ogmpi:
# MPI模式:master分发,worker执行
out = ogmpi.mpi_master_spin_tasks(gdirs, task_list)
elif len(gdirs) > 1:
# 多进程模式:创建进程池
pool = init_mp_pool()
out = pool.map(run_task_on_gdir, zip(gdirs, task_list))
else:
# 顺序模式:单核回退
log.warning("OGGM is running sequentially!")
out = [run_task_on_gdir((g, t)) for g, t in zip(gdirs, task_list)]
顺序回退时会输出一条警告,帮助用户在排查性能问题时快速定位配置错误。
workflow.py第197行的execute_parallel_tasks解决了一个不同于execute_entity_task的问题:对同一个冰川目录(GlacierDirectory)执行多个可以并行运行的任务。这在高分辨率冰川处理或复杂的后处理流程中非常有用。
其实现相对简单:利用multiprocessing.pool.ThreadPool(线程池,而非进程池),因为任务之间共享同一个gdir的内存视图,使用线程可以避免数据复制的开销。
init_mp_pool(第41行)负责创建、复用、重置和销毁multiprocessing进程池。它维护了一个模块级变量_mp_pool和一个标志CONFIG_MODIFIED:
def init_mp_pool(reset=False):
"""Get or create the global multiprocessing pool.
If OGGM configuration has changed since the pool was created,
the old pool is automatically terminated and a fresh one is
created with the new settings.
"""
global _mp_pool, CONFIG_MODIFIED
if reset or CONFIG_MODIFIED:
if _mp_pool is not None:
_mp_pool.terminate()
_mp_pool = None
CONFIG_MODIFIED = False
if _mp_pool is None:
_mp_pool = Pool(processes=cfg.PARAMS['mp_processes'],
initializer=_init_pool_globals)
return _mp_pool
CONFIG_MODIFIED标志是一个巧妙的设计:只要cfg.PARAMS被修改,该标志就被置为True,下一次调用init_mp_pool时自动重建进程池。这避免了配置变更后pool中的worker仍使用旧配置的问题。
每个multiprocessing worker进程在启动时调用_init_pool_globals,将OGGM配置解包到进程的全局命名空间。这确保了所有worker共享一致的参数设置。
在Windows上,Python的multiprocessing默认使用spawn(而非Linux的fork)创建子进程。spawn模式下,子进程不继承父进程的内存,因此需要重新导入模块。OGGM通过use_mp_spawn配置项处理这一差异,并在必要时回退到顺序执行。
mp_processes=1(即关闭多进程),或使用--mpi后端。
init_glacier_directories(第325行)是任何OGGM运行的第一步。它接受一个pandas GeoDataFrame(通常来自RGI shapefile),为每一行创建对应的GlacierDirectory(gdir)对象:
gdirs = workflow.init_glacier_directories(rgi_df)
该函数调用_check_rgi_input进行输入验证,包括:
OGGM能自动区分RGI v6和v7格式。通过检查DataFrame的列名和元数据,系统选择合适的读取器。v7版本引入了更多的属性列和不同的坐标系参数。
RGI intersects文件记录了每条冰川与气候网格和历史数据产品的空间交集。如果该文件在默认位置存在,init_glacier_directories会自动加载;否则输出警告并跳过需要交集信息的后续步骤。
| 初始化方式 | 函数/参数 | 适用场景 |
|---|---|---|
| 从RGI新运行 | init_glacier_directories(rgi_df) | 首次处理 |
| 续跑 | init_glacier_directories(rgi_df, from_prepro_level=N) | 从预处理级别(prepro level)N恢复 |
| 从tar恢复 | init_glacier_directories(rgi_df, from_tar='url') | 从远程存档恢复 |
续跑模式下,OGGM加载已有gdir的状态并跳过已完成的步骤,仅执行未完成的任务。从tar恢复则是从预处理的压缩包中解压完整的gdir目录。
workflow.py提供了一系列快捷函数,每个函数封装了一条完整的子管线。这些函数是用户最常直接调用的API。
位于第487行,这是OGGM最长的子管线,共10个任务:
def gis_prepro_tasks(gdirs, **kwargs):
task_list = [
tasks.define_glacier_region, # 1. 定义冰川区域
tasks.glacier_masks, # 2. 冰川掩膜
tasks.compute_centerlines, # 3. 计算中心线
tasks.initialize_flowlines, # 4. 初始化流线
tasks.compute_downstream_line, # 5. 计算下游延长线
tasks.compute_downstream_bedshape, # 6. 下游冰床形态
tasks.catchment_area, # 7. 集水区面积
tasks.catchment_intersections, # 8. 集水区交叉
tasks.catchment_width_geom, # 9. 几何宽度计算
tasks.catchment_width_correction, # 10. 宽度修正
]
for task, kwargs in task_list:
execute_entity_task(task, gdirs, **kwargs)
这条管线从原始DEM出发,逐步构建出冰川流线网络和集水区几何。每个步骤的输出是下一步的输入,形成严密的数据依赖链。初学者通常应使用gis_prepro_tasks保持标准顺序;只有在调试或开发新方法时,才建议逐步拆开运行。
位于第513行,包含3个核心任务:
process_climate_data → local_mustar → apparent_mb_from_any_mb
local_mustar(即mb_calibration_from_geodetic_mb)是校准的核心——它利用大地测量数据调整温度敏感度参数mu*,使模型物质平衡与观测一致。
位于第531行。这是OGGM最复杂的子管线,因为需要区分是否有冰川崩解(calving):
if gdir.is_tidewater:
# 崩解冰川路径
tasks.prepare_for_inversion(gdir, ...)
tasks.mass_conservation_inversion(gdir, ...)
tasks.filter_inversion_output(gdir, ...)
else:
# 陆地冰川路径
tasks.prepare_for_inversion(gdir, ...)
tasks.mass_conservation_inversion(gdir, ...)
tasks.filter_inversion_output(gdir, ...)
两条路径共享相同的函数名,但内部算法因冰川类型不同而有所差异。mass_conservation_inversion实现了基于冰厚-冰流通量关系的物质守恒反演方法。
位于第586行,是OGGM最关键的全局校准步骤。它通过优化Glen流动律参数A,使OGGM反演的全球冰川冰体积与Farinotti等人(2019)的共识估计相匹配:
def calibrate_inversion_from_consensus(gdirs, ...):
# 搜索Glen A,使总体积收敛到全球共识值
for A in glen_a_values:
execute_entity_task(tasks.mass_conservation_inversion, gdirs, glen_a=A)
total_volume = sum(gdir.read_pickle('inversion_output')['volume'] for gdir in gdirs)
if abs(total_volume - consensus_volume) < tolerance:
break
calibrate_inversion_from_volume(第783行)是该函数的单冰川变体,invert_from_params(第725行)则使用预计算的参数跳过迭代搜索过程。
位于第901行,处理支冰川与主冰川的合并。在自然冰川系统中,多条冰流汇入同一冰川,OGGM需要将这些支流整合到单一流线网络中:
def merge_glacier_tasks(gdirs, **kwargs):
# 步骤1:检测支流关系
execute_entity_task(tasks.intersect_downstream_lines, gdirs)
# 步骤2:递归合并
gdirs = _recursive_merging(gdirs)
# 步骤3:清理合并后的流线
execute_entity_task(tasks.clean_merged_flowlines, gdirs)
return gdirs
_recursive_merging是核心算法:它识别RGI连接级别2中定义的支流关系,从末端支流开始,逐步向上游合并,最终每条冰川只有一个主gdir。合并过程中需要重新计算流线拓扑、宽度分布和集水区边界。
plot_domain可视化检查合并前后的流线网络。
位于第1025行,处理GriddedNcdfFile格式的2D输出数据组合。当多条冰川的输出格网需要合并为区域数据集时使用:
def merge_gridded_data(gdirs, variables=None, combine_plots=None, ...):
# 创建统一的输出格网
target_grid = create_target_grid(gdirs)
# 逐个变量重投影和合并
for var in variables:
merged = reproject_and_merge(gdirs, var, target_grid)
# 高斯平滑减少边缘伪影
merged_smooth = gaussian_filter(merged, sigma=1.0)
return merged_dataset
格网合并面临三个主要挑战:不同冰川可能使用不同分辨率和投影、合并边界处可能出现不连续、以及总面积和总体积等全局量需要在重投影中保持守恒。OGGM通过保守重投影和高斯平滑来应对。
预处理级别(prepro level)系统(定义于cli/prepro_levels.py)是OGGM工业化运行的核心。它将完整工作流分为5个渐进级别:
| 级别 | 名称 | 包含步骤 | 输出 |
|---|---|---|---|
| L0 | 初始化 | init_glacier_directories | gdir骨架 |
| L1 | DEM处理 | define_glacier_region + glacier_masks | 冰川掩膜 |
| L2 | 流线 | compute_centerlines → catchment_width_correction | 完整流线网络 |
| L3 | 气候+反演 | climate_tasks + inversion_tasks + 校准 | 冰厚+mu* |
| L4 | 历史模拟 | historical_run | 时间序列输出 |
| L5 | 清理 | 删除中间文件 | 精简gdir |
每个级别完成后,系统将结果打包为tar存档。下一级别可以通过--start-base-url加载前一级别的存档,实现增量恢复。这种设计带来了两大好处:
workflow.py是OGGM的工作流编排层。它以不到1400行Python代码连接单冰川任务、全局任务(global task)、错误处理和并行执行。其核心设计是将科学任务与调度基础设施解耦,通过任务状态记录支持增量计算,并提供从单行Python脚本到集群批处理的统一API。理解workflow.py的调用模式后,自定义处理管线通常只需要编写遵循(gdir, **kwargs)签名的函数,再交给execute_entity_task调度。