第20章 扩展OGGM:自定义模块与插件

Advanced Developer

OGGM 从一开始就被设计为可扩展的。它不是强制所有用户使用单一的建模流程, 而是在技术栈的每个层面都暴露了定义良好的扩展点:气候数据源、物质平衡模型、 冰床几何形态、流动求解器,甚至包括任务执行框架本身。本章将逐一说明每个扩展点, 提供可运行的示例,并讨论构建可维护、可测试的 OGGM 扩展的最佳实践。

开发者最小闭环

扩展OGGM时不要只写一个函数。一个可维护扩展至少应包含:实现代码、一个最小示例冰川、一个能重复运行的entity task或脚本、一个检查输出文件的测试、以及写入diagnostics.json或独立结果文件的溯源信息。这样后续更换OGGM版本、RGI版本或气候数据时,才能判断差异来自模型还是来自输入。

20.1 OGGM的扩展架构

OGGM 的可扩展性建立在以下几个核心设计模式之上:

扩展点机制稳定性
气候数据源 Shop 模块模式:在 cfg.PARAMS['baseline_climate'] 中注册 download + process 函数 稳定
物质平衡模型 继承 MassBalanceModel;实现 get_monthly_mb()get_annual_mb() 稳定
冰床形状 / flowline 继承 FlowlineMixedBedFlowline;实现几何方法 稳定
演化模型 继承 FlowlineModel;实现 step();在 cfg.PARAMS['evolution_model'] 中注册 稳定
Entity Task 使用 @entity_task 装饰器;通过 writes 声明依赖关系 稳定
配置参数 params.cfg 添加条目;使用前在 PARAMS 中编写文档 半稳定
输出文件类型 cfg.BASENAMES 中注册新的键名 稳定

20.2 Shop 模块模式

"Shop" 模式是 OGGM 实现可插拔气候数据提供者的机制。每个 shop 模块实现统一的接口, 使得用户可以在不改变工作流程代码的情况下切换不同的气候数据集。

20.2.1 Shop 模块的组成结构

一个完整的 shop 模块需要两个具有标准化签名的函数:

下载函数(Download Function)

def my_climate_download(gdirs=None, path=True, reset=False):
    """Download climate data for a set of glaciers.

    Parameters
    ----------
    gdirs : list of GlacierDirectory, optional
        Glacier directories to process. If None, downloads all available.
    path : bool or str
        If True, return output path. If str, path to download directory.
    reset : bool
        If True, re-download existing files.

    Returns
    -------
    str
        Path to the downloaded data directory.
    """

处理函数(Process Function)

@entity_task(logger, writes=['climate_historical'])
def process_my_climate(gdir, **kwargs):
    """Process downloaded climate data for a single glacier.

    This function reads the raw downloaded data and writes a
    climate_historical.nc file in the glacier directory.

    Parameters
    ----------
    gdir : GlacierDirectory
    **kwargs : forwarded to implementation
    """

20.2.2 注册

Shop 模块通过修改配置来注册自身:

# In your module's __init__ or at import time:
import oggm.cfg as cfg

# Register as an available climate data source
cfg.PARAMS['baseline_climate'].append('MY_CLIMATE_DATA')

# Optionally set as the active source
cfg.PARAMS['baseline_climate'] = 'MY_CLIMATE_DATA'

20.2.3 与调度器的集成

oggm.core.climate 中的核心函数 process_climate_data() 充当调度器的角色。 当用户调用 process_climate_data(gdir) 时,调度器会查询 cfg.PARAMS['baseline_climate'] 并将请求路由到对应的 shop 模块:

# Simplified dispatcher logic in oggm.core.climate
def process_climate_data(gdir, **kwargs):
    source = cfg.PARAMS['baseline_climate']
    if source == 'CRU':
        from oggm.shop import cru
        cru.process_cru_data(gdir, **kwargs)
    elif source == 'HISTALP':
        from oggm.shop import histalp
        histalp.process_histalp_data(gdir, **kwargs)
    elif source == 'MY_CLIMATE_DATA':
        from my_package import my_shop
        my_shop.process_my_climate(gdir, **kwargs)
    else:
        raise ValueError(f"Unknown climate source: {source}")
提示:Shop 模块开发清单
  1. 实现具有标准签名的 download_*() 函数。
  2. 实现 process_*() 函数,使用 @entity_task 装饰,写入 climate_historical
  3. cfg.PARAMS['baseline_climate'] 中注册。
  4. 使用 gdir.write_climate() 或等效方法写入输出文件。
  5. oggm.core.climate.process_climate_data 的调度器中添加数据源。
  6. 提供测试数据以及至少一个集成测试。

20.3 自定义物质平衡模型

OGGM 中所有物质平衡模型均继承自 oggm.core.massbalance.MassBalanceModel。 要创建自定义模型,需继承此基类并实现所需的接口。

20.3.1 MassBalanceModel 接口

class MassBalanceModel:
    """Base class for all mass balance models."""

    def __init__(self):
        self.valid_bounds = None  # (min_h, max_h) for extrapolation limits
        self.hemisphere = 'nh'     # 'nh' or 'sh'

    def get_monthly_mb(self, heights, year=None, fl_id=None, fl_widths=None):
        """Compute monthly mass balance at given heights.

        Parameters
        ----------
        heights : ndarray
            Heights (m) at which to compute mass balance.
        year : int, optional
            The year for which to compute (not used by all models).
        fl_id : int, optional
            Flowline ID for multi-flowline glaciers.
        fl_widths : ndarray, optional
            Flowline widths at each grid point.

        Returns
        -------
        ndarray
            Monthly mass balance (kg m⁻²) for each height for each month.
            Shape: (12, n_heights)
        """
        raise NotImplementedError()

    def get_annual_mb(self, heights, year=None, fl_id=None, fl_widths=None):
        """Compute annual mass balance.

        Returns
        -------
        ndarray
            Annual mass balance (kg m⁻² yr⁻¹) for each height.
            Shape: (n_heights,)
        """
        raise NotImplementedError()

    def get_specific_mb(self, heights=None, widths=None, fls=None, year=None):
        """Compute glacier-wide specific mass balance."""
        raise NotImplementedError()

    def get_ela(self, year=None, **kwargs):
        """Compute the equilibrium line altitude."""
        raise NotImplementedError()

20.3.2 示例:基于 ELA 的自定义物质平衡模型

import numpy as np
from oggm.core.massbalance import MassBalanceModel

class ELAMassBalance(MassBalanceModel):
    """Simple mass balance model defined by an ELA and gradients.

    Parameterization:
        b(z) = (z - ELA) * grad_above   for z >= ELA
        b(z) = (z - ELA) * grad_below   for z < ELA
    """

    def __init__(self, ela=2800, grad_above=-0.007, grad_below=-0.012,
                 max_mb=4.0):
        super().__init__()
        self.ela = ela
        self.grad_above = grad_above   # m w.e. per m elevation (above ELA)
        self.grad_below = grad_below   # m w.e. per m elevation (below ELA)
        self.max_mb = max_mb           # kg m⁻², maximum accumulation
        self.valid_bounds = [0, 10000]  # m a.s.l.

    def get_monthly_mb(self, heights, year=None, fl_id=None, fl_widths=None):
        # Annual model, distribute evenly across months
        annual = self.get_annual_mb(heights, year, fl_id, fl_widths)
        monthly = np.tile(annual, (12, 1)) / 12.0
        return monthly

    def get_annual_mb(self, heights, year=None, fl_id=None, fl_widths=None):
        z = np.asarray(heights)
        mb = np.where(
            z >= self.ela,
            (z - self.ela) * self.grad_above,
            (z - self.ela) * self.grad_below
        )
        # Cap accumulation
        mb = np.minimum(mb, self.max_mb * 1000)  # convert to kg m⁻²
        return mb

    def get_ela(self, year=None, **kwargs):
        return self.ela

20.3.3 与 MultipleFlowlineMassBalance 的协作

在实际应用中,大多数用于 flowline 演化的物质平衡模型都被包装在 MultipleFlowlineMassBalance 中,该类负责将调用分派给各个 flowline 并管理多 flowline 的几何结构:

from oggm.core.massbalance import MultipleFlowlineMassBalance

# Wrap your custom model
mb_model = ELAMassBalance(ela=2900)
multi_mb = MultipleFlowlineMassBalance(
    gdir, mb_model_class=ELAMassBalance,
    ela=2900
)

20.3.4 标定集成

如果你的物质平衡模型具有可标定的参数,可以通过实现 temp_biasprcp_fac 接口, 或者提供自定义的标定函数来集成到 OGGM 的标定框架中。 oggm.core.climate.mb_calibration 中的标定循环也可以被子类化。

20.4 自定义冰床形状:继承 Flowline

冰川的几何形态由 Flowline 对象表示。OGGM 提供了 MixedBedFlowline(用于处理部分基岩覆盖的情况),但你也可以为特定应用场景创建 自定义的冰床形状。

20.4.1 Flowline 接口

class Flowline:
    """Base class for 1D glacier flowlines."""

    def __init__(self, line=None, dx=1, map_dx=None,
                 surface_h=None, bed_h=None):
        # Core geometry arrays
        self.dis_on_line = None   # distance along the flowline (m)
        self.surface_h = None     # surface elevation (m a.s.l.)
        self.bed_h = None         # bed elevation (m a.s.l.)
        self.widths = None        # glacier width at each grid point (m)
        self.widths_m = None      # widths at staggered grid points (m)
        self.dx = None            # grid spacing (m)
        self.dx_meter = None      # grid spacing in meters
        self.map_dx = None        # map-projected dx (m)

    def _add_attrs_to_save(self):
        """Return dict of attributes to persist."""
        return {
            'dis_on_line': self.dis_on_line,
            'surface_h': self.surface_h,
            'bed_h': self.bed_h,
            'widths': self.widths,
            'dx': self.dx,
        }

20.4.2 示例:抛物线形冰床 Flowline

import numpy as np
from oggm.core.flowline import Flowline

class ParabolicBedFlowline(Flowline):
    """Flowline with a parabolic bed profile.

    z_b(x) = z_head - (z_head - z_terminus) * (x / L)^2
    """

    def __init__(self, line=None, dx=1, map_dx=None,
                 surface_h=None, bed_h=None,
                 z_head=3000, z_terminus=1000):
        super().__init__(line, dx, map_dx, surface_h, bed_h)
        self.z_head = z_head
        self.z_terminus = z_terminus
        if line is not None:
            self._set_parabolic_bed()

    def _set_parabolic_bed(self):
        L = self.dis_on_line[-1]
        x = self.dis_on_line
        self.bed_h = self.z_head - (self.z_head - self.z_terminus) * (x / L)**2

    def _add_attrs_to_save(self):
        attrs = super()._add_attrs_to_save()
        attrs['z_head'] = self.z_head
        attrs['z_terminus'] = self.z_terminus
        return attrs

20.5 自定义演化模型

冰川演化求解器是 OGGM 动力学模型的计算核心。OGGM 支持多种求解器, 每种求解器针对 SIA 方程实现了特定的数值方案。

20.5.1 FlowlineModel 接口

class FlowlineModel:
    """Base class for glacier evolution models."""

    def __init__(self, flowlines, mb_model=None, y0=0,
                 glen_a=None, fs=None, **kwargs):
        self.fls = flowlines
        self.mb_model = mb_model
        self.y0 = y0
        self.glen_a = glen_a or cfg.PARAMS['glen_a']
        self.fs = fs or cfg.PARAMS['fs']

    def step(self, dt):
        """Advance the model by one time step.

        Parameters
        ----------
        dt : float
            Time step in years.

        Returns
        -------
        float
            The actual time step used (may be smaller due to CFL).
        """
        raise NotImplementedError()

    def run_until(self, y1):
        """Run the model from current year to y1.

        Parameters
        ----------
        y1 : float
            End year.

        Returns
        -------
        dict
            Model diagnostics at y1.
        """
        raise NotImplementedError()

    def run_until_and_store(self, y1, **kwargs):
        """Run and store diagnostics along the way."""
        raise NotImplementedError()

    def to_netcdf(self, path):
        """Save model state to NetCDF."""
        raise NotImplementedError()

20.5.2 示例:显式前向欧拉模型

import numpy as np
from oggm.core.flowline import FlowlineModel

class ExplicitEulerModel(FlowlineModel):
    """Simplified glacier evolution using explicit Forward Euler.

    This is for educational or small-scale use only — it does not
    include the semi-implicit scheme or adaptive time-stepping of
    the production FluxBasedModel.
    """

    def __init__(self, flowlines, mb_model=None, y0=0,
                 glen_a=None, fs=None, fixed_dt=0.01):
        super().__init__(flowlines, mb_model, y0, glen_a, fs)
        self.fixed_dt = fixed_dt  # years

    def step(self, dt=None):
        dt = dt or self.fixed_dt
        for fl in self.fls:
            # Compute ice flux via SIA
            thick = fl.thick
            slope = np.gradient(fl.surface_h, fl.dx_meter)
            # Glen's flow law: u ~ A * tau^n * H^(n+2)
            n = 3  # Glen's exponent
            tau = 900 * 9.81 * thick * np.abs(slope)  # density * g * H * sin(alpha)
            flux = (2 * self.glen_a * self.fs * tau**n *
                    thick**(n + 2) * slope / (n + 2))
            # Mass continuity
            dHdt = -np.gradient(flux, fl.dx_meter) / fl.widths
            mb = self.mb_model.get_annual_mb(
                fl.surface_h, year=int(self.y0)
            )
            fl.thick += (dHdt + mb / 910) * dt  # density conversion
            fl.thick = np.maximum(fl.thick, 0)  # no negative thickness
        self.y0 += dt
        return dt

    def run_until(self, y1):
        while self.y0 < y1:
            self.step()
        return {'year': self.y0}

20.5.3 注册

# Register your model so it can be used by flowline_model_run()
cfg.PARAMS['evolution_model'] = 'explicit_euler'
警告:数值稳定性 编写正确的冰川演化求解器并非易事。SIA 方程产生的是一个对薄冰高度刚性的非线性扩散问题。 常用求解器通常采用半隐式或全隐式格式,并配合自适应时间步长。上述显式欧拉示例仅用于 教学演示,在实际参数值下将是不稳定的。

20.6 自定义 Entity Task

@entity_task 装饰器是 OGGM 工作流系统的骨干。创建自定义 entity task 可以让你的处理逻辑无缝集成到 OGGM 的并行执行、日志记录和依赖跟踪体系中。

20.6.1 @entity_task 装饰器

from oggm import entity_task

@entity_task(logger, writes=['custom_output'])
def my_custom_task(gdir, param_a=1.0, param_b='default'):
    """A custom per-glacier processing task.

    Parameters
    ----------
    gdir : GlacierDirectory
        The glacier directory to process.
    param_a : float
        A numerical parameter.
    param_b : str
        A string parameter.

    Returns
    -------
    dict
        Results dictionary saved as gdir.custom_output.
    """
    # Your processing logic here
    result = {'param_a': param_a, 'param_b': param_b,
              'rgi_id': gdir.rgi_id}
    # Write output
    gdir.write_json(result, 'custom_output')
    return result

20.6.2 writes 参数

writes 参数对于依赖跟踪至关重要。它声明任务会生成哪些文件。 下游任务可以声明对这些文件的依赖:

@entity_task(logger, writes=['analysis_result'])
def analysis_task(gdir):
    # This task needs the output of my_custom_task
    # The workflow system ensures proper ordering
    custom = gdir.read_json('custom_output')  # written by my_custom_task
    analysis = process(custom)
    gdir.write_json(analysis, 'analysis_result')
    return analysis

20.6.3 与 execute_entity_task 的集成

from oggm import workflow

gdirs = workflow.init_glacier_directories(['RGI60-11.00897'])

# Run your custom task on all glaciers (serial or parallel)
workflow.execute_entity_task(my_custom_task, gdirs,
                             param_a=2.5, param_b='custom')

# Results are now stored in each glacier directory
for gdir in gdirs:
    result = gdir.read_json('custom_output')
    print(f"{gdir.rgi_id}: {result}")

20.7 添加自定义参数

20.7.1 扩展 params.cfg

自定义参数应添加到本地的 params.cfg 文件中:

# my_extension_params.cfg
[my_extension]
# Calibration factor for my custom mass balance model
my_calib_factor = 1.0
# Switch to enable experimental ice-dynamic scheme
my_experimental_scheme = False
# Number of iterations for my iterative solver
my_max_iter = 100

在初始化时加载自定义参数:

import oggm.cfg as cfg
cfg.initialize()

# Load your custom parameters
cfg.PARAMS['my_calib_factor'] = 1.0
cfg.PARAMS['my_experimental_scheme'] = False
cfg.PARAMS['my_max_iter'] = 100

# Always document in the PARAMS OrderedDict
cfg.PARAMS.doc['my_calib_factor'] = (
    'Calibration factor for the custom mass balance model.'
)
专家提示:PARAMS 文档编写规范 OGGM 要求每个参数都必须在 cfg.PARAMS.doc 中拥有文档条目。 该要求在初始化时强制执行。如果没有正确的文档,OGGM 将发出警告(在严格模式下则为错误)。 文档字符串应包含参数的用途、默认值、允许范围(如有)以及受影响的模块。

20.8 为新文件类型注册 BASENAMES

如果你的扩展会生成新类型的输出文件,请在 cfg.BASENAMES 中注册它们, 以便文件管理系统能够识别:

from oggm import cfg

# Register a new output file type
cfg.BASENAMES['my_extension_output'] = ('my_output.nc', 'My extension NetCDF output')
# Format: (filename, description)

# Now you can use it with gdir methods:
# gdir.get_filepath('my_extension_output')
# gdir.write_netcdf(data_dict, 'my_extension_output')

20.9 完整示例:自定义 ELA 模型扩展

以下扩展示例将所有扩展机制串联在一起,构建一个完整、可复用的自定义 ELA 模型插件。

# File: custom_ela_model.py
"""Custom equilibrium line altitude model for OGGM.

This module demonstrates all OGGM extension patterns:
- A mass balance model subclass
- A custom entity task
- Parameter registration
- Integration with the workflow system
"""

import numpy as np
import oggm.cfg as cfg
from oggm import entity_task
from oggm.core.massbalance import MassBalanceModel, MultipleFlowlineMassBalance
from oggm import workflow

# ---- 1. Register custom parameters ----
cfg.PARAMS['custom_ela_default'] = 3000.0
cfg.PARAMS.doc['custom_ela_default'] = (
    'Default ELA (m a.s.l.) for the CustomELAModel when no calibration '
    'data is available.'
)

# ---- 2. Define the mass balance model ----
class CustomELAModel(MassBalanceModel):
    """Mass balance model with a user-specified ELA.

    Monthly mass balance is derived from a sinusoidal annual cycle
    with the ELA controlling the zero-crossing point.
    """

    def __init__(self, ela=3000, amplitude=2000, melt_factor=0.005):
        super().__init__()
        self.ela = ela
        self.amplitude = amplitude  # kg m⁻², seasonal amplitude
        self.melt_factor = melt_factor  # m w.e. per m below ELA
        self.valid_bounds = [-1000, 9000]

    def get_monthly_mb(self, heights, year=None, fl_id=None, fl_widths=None):
        z = np.asarray(heights)
        # Sinusoidal annual cycle
        months = np.arange(12)
        seasonal = (self.amplitude / 1000 *
                    np.sin(2 * np.pi * (months - 3) / 12))  # max in July
        # Elevation gradient
        elev_effect = np.where(
            z > self.ela,
            -0.005 * (z - self.ela),   # net accumulation above ELA
            -0.010 * (self.ela - z)     # net ablation below ELA
        )
        mb = np.outer(seasonal, np.ones_like(z)) + elev_effect[np.newaxis, :]
        return mb * 1000  # to kg m⁻²

    def get_annual_mb(self, heights, year=None, fl_id=None, fl_widths=None):
        monthly = self.get_monthly_mb(heights, year, fl_id, fl_widths)
        return monthly.sum(axis=0)

    def get_ela(self, year=None, **kwargs):
        return self.ela

# ---- 3. Create a custom entity task ----
@entity_task('logger', writes=['custom_ela_results'])
def compute_ela_statistics(gdir, ela_values=None):
    """Compute glacier statistics for different ELA values.

    Parameters
    ----------
    gdir : GlacierDirectory
    ela_values : list of float, optional
        ELAs to test. Defaults to [2500, 3000, 3500].

    Returns
    -------
    dict
        AAR, accumulation area, and total area for each ELA.
    """
    if ela_values is None:
        ela_values = [2500, 3000, 3500]

    results = {}
    for ela in ela_values:
        mb_model = CustomELAModel(ela=ela)
        heights = gdir.gridded_data['glacier_topo']
        masks = gdir.gridded_data['glacier_mask'].astype(bool)
        glacier_h = heights[masks]
        if len(glacier_h) == 0:
            results[ela] = {'aar': 0, 'acc_area_km2': 0, 'total_area_km2': 0}
            continue
        aar = (glacier_h >= ela).sum() / len(glacier_h)
        dx = gdir.grid.dx
        pixel_area_km2 = (dx ** 2) / 1e6
        acc_area_km2 = (glacier_h >= ela).sum() * pixel_area_km2
        total_area_km2 = len(glacier_h) * pixel_area_km2
        results[ela] = {
            'aar': round(aar, 4),
            'acc_area_km2': round(acc_area_km2, 3),
            'total_area_km2': round(total_area_km2, 3)
        }

    gdir.write_json(results, 'custom_ela_results')
    return results

# ---- 4. Example workflow ----
if __name__ == '__main__':
    # Standard pipeline
    cfg.initialize()
    gdirs = workflow.init_glacier_directories(['RGI60-11.00897'])
    workflow.execute_entity_task(workflow.gis_prepro_tasks, gdirs)
    # Run custom analysis
    workflow.execute_entity_task(compute_ela_statistics, gdirs)
    # Inspect results
    for gdir in gdirs:
        print(gdir.read_json('custom_ela_results'))

20.10 最佳实践

20.10.1 测试

20.10.2 文档

20.10.3 版本管理

← 第19章 Sandbox与实验模块 第21章 测试框架与质量保障 →