第15章 OGGM工作流管线

高级 核心架构

本章深入解析OGGM的工作流编排层,涵盖workflow.py(约1336行)、tasks.pyglobal_tasks.py三个核心模块。工作流管线是OGGM的"调度层"——它定义了冰川模拟从原始数据到最终输出的完整执行路径,并提供了灵活的并行计算基础设施。

从研究问题选择工作流
研究目标最低任务链主要输出优先读
估算冰储量GIS预处理 → 气候处理 → 物质平衡校准 → 反演inversion_output.pkl第5-9章
历史面积/体积变化反演 → 初始化流线 → run_from_climate_datamodel_diagnostics.nc第10、15、18章
未来情景历史工作流 → GCM气候处理 → 正演模拟带filesuffix的诊断文件第7、8、16章
入海型冰川崩解反演 → 含崩解的正演 → 诊断检查calving_m3, calving_rate第12、13章

15.1 架构概览:编排层设计哲学

OGGM的工作流系统遵循"任务即函数"的哲学。每个科学计算步骤被封装为一个独立的Python函数(称为entity task),工作流管线负责将这些函数按序编排并分发到多个冰川实体上。这种设计的核心优势在于:

workflow.py是这套体系的核心文件。它不直接执行冰川物理计算,而是作为总指挥,调用tasks.py中的具体科学任务。理解workflow.py的调用关系,就等于掌握了OGGM全部功能的入口地图。

15.2 execute_entity_task:中央并行执行引擎

15.2.1 函数签名与职责

位于workflow.py第126行的execute_entity_task是整个OGGM中调用频率最高的函数。几乎所有的并行工作流都经过它。其核心职责是:接收一个task函数和一组冰川目录对象(GlacierDirectory, gdir),将task应用到每个gdir上,并以并行方式收集结果

def execute_entity_task(task, gdirs, **kwargs):
    """Central parallel execution engine.

    Parameters
    ----------
    task : callable
        The task function to apply. Must accept a gdir as first argument.
    gdirs : list of GlacierDirectory
        Glacier directories to process.
    **kwargs
        Additional keyword arguments passed to each task invocation.

    Returns
    -------
    list
        Results collected from all invocations, in the same order as gdirs.
    """

15.2.2 任务规范化

task参数有两种传入方式:

传入方式示例说明
单个可调用对象tasks.compute_centerlines最常见的方式,所有gdir共享同一task和kwargs
列表[(task1, kw1), (task2, kw2), ...]每个gdir可拥有不同的task和/或不同的kwargs

在函数内部,task首先被规范化为统一的列表格式。单个函数被自动复制为与gdirs等长的列表:

if isinstance(task, (list, tuple)):
    # 用户提供了每gdir的(task, kwargs)列表
    task_list = task
else:
    # 统一task,复制N份
    task_list = [(task, kwargs) for _ in gdirs]

15.2.3 全局任务(global task)拒绝机制

这是OGGM增量计算的关键机制。在将任务分发到worker之前,系统检查每个任务是否已在目标gdir上完成。已完成的任务被替换为_dummy_task(一个返回None的空操作),从而避免浪费计算资源。

判断依据是global_tasks.py中定义的任务完成标志文件。每个冰川目录(GlacierDirectory)维护一组标志文件,记录哪些处理步骤已经完成。这种设计与make的增量构建理念一致——只处理需要更新的部分。

15.2.4 _pickle_copier:替代functools.partial

Python标准库的functools.partial在multiprocessing环境下有一个致命缺陷:其__reduce__方法不完整,导致pickle序列化失败。OGGM为此设计了一个精巧的替代品:

class _pickle_copier(object):
    """A pickleable alternative to functools.partial.

    Multiprocessing needs to pickle the task and its arguments.
    functools.partial objects are not reliably pickleable across
    all Python versions, so we use this custom class instead.
    """
    def __init__(self, func, *args, **kwargs):
        self.func = func
        self.args = args
        self.kwargs = kwargs

    def __call__(self, gdir):
        return self.func(gdir, *self.args, **self.kwargs)

这个类的对象可以被pickle(因为funcargskwargs都是简单属性),从而安全地跨进程边界传递。

设计笔记 如果你在自己的项目中遇到multiprocessing的pickle问题,_pickle_copier模式可以直接复用。但请注意:被包装的函数必须是模块级的(不能是lambda或嵌套函数),否则仍然无法pickle。

15.2.5 MPI与Multiprocessing的分派

经过规范化和全局任务(global task)检查后,实际执行路径根据可用并行后端选择:

if _have_ogmpi:
    # MPI模式:master分发,worker执行
    out = ogmpi.mpi_master_spin_tasks(gdirs, task_list)
elif len(gdirs) > 1:
    # 多进程模式:创建进程池
    pool = init_mp_pool()
    out = pool.map(run_task_on_gdir, zip(gdirs, task_list))
else:
    # 顺序模式:单核回退
    log.warning("OGGM is running sequentially!")
    out = [run_task_on_gdir((g, t)) for g, t in zip(gdirs, task_list)]

顺序回退时会输出一条警告,帮助用户在排查性能问题时快速定位配置错误。

15.3 execute_parallel_tasks:单冰川并行

workflow.py第197行的execute_parallel_tasks解决了一个不同于execute_entity_task的问题:对同一个冰川目录(GlacierDirectory)执行多个可以并行运行的任务。这在高分辨率冰川处理或复杂的后处理流程中非常有用。

其实现相对简单:利用multiprocessing.pool.ThreadPool(线程池,而非进程池),因为任务之间共享同一个gdir的内存视图,使用线程可以避免数据复制的开销。

15.4 Multiprocessing基础设施

15.4.1 init_mp_pool:进程池生命周期管理

init_mp_pool(第41行)负责创建、复用、重置和销毁multiprocessing进程池。它维护了一个模块级变量_mp_pool和一个标志CONFIG_MODIFIED

def init_mp_pool(reset=False):
    """Get or create the global multiprocessing pool.

    If OGGM configuration has changed since the pool was created,
    the old pool is automatically terminated and a fresh one is
    created with the new settings.
    """
    global _mp_pool, CONFIG_MODIFIED
    if reset or CONFIG_MODIFIED:
        if _mp_pool is not None:
            _mp_pool.terminate()
            _mp_pool = None
        CONFIG_MODIFIED = False

    if _mp_pool is None:
        _mp_pool = Pool(processes=cfg.PARAMS['mp_processes'],
                        initializer=_init_pool_globals)
    return _mp_pool

CONFIG_MODIFIED标志是一个巧妙的设计:只要cfg.PARAMS被修改,该标志就被置为True,下一次调用init_mp_pool时自动重建进程池。这避免了配置变更后pool中的worker仍使用旧配置的问题。

15.4.2 _init_pool_globals:Worker初始化

每个multiprocessing worker进程在启动时调用_init_pool_globals,将OGGM配置解包到进程的全局命名空间。这确保了所有worker共享一致的参数设置。

15.4.3 use_mp_spawn与跨平台兼容

在Windows上,Python的multiprocessing默认使用spawn(而非Linux的fork)创建子进程。spawn模式下,子进程不继承父进程的内存,因此需要重新导入模块。OGGM通过use_mp_spawn配置项处理这一差异,并在必要时回退到顺序执行。

关键限制 在Jupyter Notebook中使用multiprocessing时,spawn模式可能导致无限递归导入。OGGM建议在notebook环境中始终设置mp_processes=1(即关闭多进程),或使用--mpi后端。

15.5 init_glacier_directories:工作流的起点

15.5.1 从RGI GeoDataFrame初始化

init_glacier_directories(第325行)是任何OGGM运行的第一步。它接受一个pandas GeoDataFrame(通常来自RGI shapefile),为每一行创建对应的GlacierDirectory(gdir)对象:

gdirs = workflow.init_glacier_directories(rgi_df)

该函数调用_check_rgi_input进行输入验证,包括:

15.5.2 RGI版本自动检测

OGGM能自动区分RGI v6和v7格式。通过检查DataFrame的列名和元数据,系统选择合适的读取器。v7版本引入了更多的属性列和不同的坐标系参数。

15.5.3 Intersects数据库自动配置

RGI intersects文件记录了每条冰川与气候网格和历史数据产品的空间交集。如果该文件在默认位置存在,init_glacier_directories会自动加载;否则输出警告并跳过需要交集信息的后续步骤。

15.5.4 恢复模式与预计算存档

初始化方式函数/参数适用场景
从RGI新运行init_glacier_directories(rgi_df)首次处理
续跑init_glacier_directories(rgi_df, from_prepro_level=N)从预处理级别(prepro level)N恢复
从tar恢复init_glacier_directories(rgi_df, from_tar='url')从远程存档恢复

续跑模式下,OGGM加载已有gdir的状态并跳过已完成的步骤,仅执行未完成的任务。从tar恢复则是从预处理的压缩包中解压完整的gdir目录。

15.6 工作流快捷函数

workflow.py提供了一系列快捷函数,每个函数封装了一条完整的子管线。这些函数是用户最常直接调用的API。

15.6.1 gis_prepro_tasks:流线预处理链

位于第487行,这是OGGM最长的子管线,共10个任务:

def gis_prepro_tasks(gdirs, **kwargs):
    task_list = [
        tasks.define_glacier_region,        # 1. 定义冰川区域
        tasks.glacier_masks,                # 2. 冰川掩膜
        tasks.compute_centerlines,          # 3. 计算中心线
        tasks.initialize_flowlines,         # 4. 初始化流线
        tasks.compute_downstream_line,      # 5. 计算下游延长线
        tasks.compute_downstream_bedshape,  # 6. 下游冰床形态
        tasks.catchment_area,               # 7. 集水区面积
        tasks.catchment_intersections,      # 8. 集水区交叉
        tasks.catchment_width_geom,         # 9. 几何宽度计算
        tasks.catchment_width_correction,   # 10. 宽度修正
    ]
    for task, kwargs in task_list:
        execute_entity_task(task, gdirs, **kwargs)

这条管线从原始DEM出发,逐步构建出冰川流线网络和集水区几何。每个步骤的输出是下一步的输入,形成严密的数据依赖链。初学者通常应使用gis_prepro_tasks保持标准顺序;只有在调试或开发新方法时,才建议逐步拆开运行。

15.6.2 climate_tasks:气候与物质平衡校准链

位于第513行,包含3个核心任务:

process_climate_data → local_mustar → apparent_mb_from_any_mb

local_mustar(即mb_calibration_from_geodetic_mb)是校准的核心——它利用大地测量数据调整温度敏感度参数mu*,使模型物质平衡与观测一致。

15.6.3 inversion_tasks:冰厚反演链

位于第531行。这是OGGM最复杂的子管线,因为需要区分是否有冰川崩解(calving):

if gdir.is_tidewater:
    # 崩解冰川路径
    tasks.prepare_for_inversion(gdir, ...)
    tasks.mass_conservation_inversion(gdir, ...)
    tasks.filter_inversion_output(gdir, ...)
else:
    # 陆地冰川路径
    tasks.prepare_for_inversion(gdir, ...)
    tasks.mass_conservation_inversion(gdir, ...)
    tasks.filter_inversion_output(gdir, ...)

两条路径共享相同的函数名,但内部算法因冰川类型不同而有所差异。mass_conservation_inversion实现了基于冰厚-冰流通量关系的物质守恒反演方法。

15.6.4 calibrate_inversion_from_consensus

位于第586行,是OGGM最关键的全局校准步骤。它通过优化Glen流动律参数A,使OGGM反演的全球冰川冰体积与Farinotti等人(2019)的共识估计相匹配:

def calibrate_inversion_from_consensus(gdirs, ...):
    # 搜索Glen A,使总体积收敛到全球共识值
    for A in glen_a_values:
        execute_entity_task(tasks.mass_conservation_inversion, gdirs, glen_a=A)
        total_volume = sum(gdir.read_pickle('inversion_output')['volume'] for gdir in gdirs)
        if abs(total_volume - consensus_volume) < tolerance:
            break

calibrate_inversion_from_volume(第783行)是该函数的单冰川变体,invert_from_params(第725行)则使用预计算的参数跳过迭代搜索过程。

15.7 merge_glacier_tasks:支流合并工作流

位于第901行,处理支冰川与主冰川的合并。在自然冰川系统中,多条冰流汇入同一冰川,OGGM需要将这些支流整合到单一流线网络中:

def merge_glacier_tasks(gdirs, **kwargs):
    # 步骤1:检测支流关系
    execute_entity_task(tasks.intersect_downstream_lines, gdirs)
    # 步骤2:递归合并
    gdirs = _recursive_merging(gdirs)
    # 步骤3:清理合并后的流线
    execute_entity_task(tasks.clean_merged_flowlines, gdirs)
    return gdirs

_recursive_merging是核心算法:它识别RGI连接级别2中定义的支流关系,从末端支流开始,逐步向上游合并,最终每条冰川只有一个主gdir。合并过程中需要重新计算流线拓扑、宽度分布和集水区边界。

专家提示 支流合并是整个工作流中最容易出现问题的环节之一。常见的错误包括:流线相交拓扑不一致、合并后宽度突变导致的反演不稳定、以及支流编号丢失。调试时建议先用plot_domain可视化检查合并前后的流线网络。

15.8 merge_gridded_data:2D格网数据(gridded data)合并

位于第1025行,处理GriddedNcdfFile格式的2D输出数据组合。当多条冰川的输出格网需要合并为区域数据集时使用:

def merge_gridded_data(gdirs, variables=None, combine_plots=None, ...):
    # 创建统一的输出格网
    target_grid = create_target_grid(gdirs)
    # 逐个变量重投影和合并
    for var in variables:
        merged = reproject_and_merge(gdirs, var, target_grid)
        # 高斯平滑减少边缘伪影
        merged_smooth = gaussian_filter(merged, sigma=1.0)
    return merged_dataset

格网合并面临三个主要挑战:不同冰川可能使用不同分辨率和投影、合并边界处可能出现不连续、以及总面积和总体积等全局量需要在重投影中保持守恒。OGGM通过保守重投影和高斯平滑来应对。

15.9 预处理级别(prepro level)系统

预处理级别(prepro level)系统(定义于cli/prepro_levels.py)是OGGM工业化运行的核心。它将完整工作流分为5个渐进级别:

级别名称包含步骤输出
L0初始化init_glacier_directoriesgdir骨架
L1DEM处理define_glacier_region + glacier_masks冰川掩膜
L2流线compute_centerlines → catchment_width_correction完整流线网络
L3气候+反演climate_tasks + inversion_tasks + 校准冰厚+mu*
L4历史模拟historical_run时间序列输出
L5清理删除中间文件精简gdir

每个级别完成后,系统将结果打包为tar存档。下一级别可以通过--start-base-url加载前一级别的存档,实现增量恢复。这种设计带来了两大好处:

  1. 故障隔离:如果L3失败,不需要重新运行L0-L2。只需从L2存档恢复。
  2. 分布式处理:不同RGI区域可以在不同节点上独立运行各个级别,存档是节点间传递数据的标准格式。

15.10 小结

workflow.py是OGGM的工作流编排层。它以不到1400行Python代码连接单冰川任务、全局任务(global task)、错误处理和并行执行。其核心设计是将科学任务与调度基础设施解耦,通过任务状态记录支持增量计算,并提供从单行Python脚本到集群批处理的统一API。理解workflow.py的调用模式后,自定义处理管线通常只需要编写遵循(gdir, **kwargs)签名的函数,再交给execute_entity_task调度。

上一章:第14章 下一章:第16章 命令行界面