Advanced Developer
OGGM 从一开始就被设计为可扩展的。它不是强制所有用户使用单一的建模流程, 而是在技术栈的每个层面都暴露了定义良好的扩展点:气候数据源、物质平衡模型、 冰床几何形态、流动求解器,甚至包括任务执行框架本身。本章将逐一说明每个扩展点, 提供可运行的示例,并讨论构建可维护、可测试的 OGGM 扩展的最佳实践。
扩展OGGM时不要只写一个函数。一个可维护扩展至少应包含:实现代码、一个最小示例冰川、一个能重复运行的entity task或脚本、一个检查输出文件的测试、以及写入diagnostics.json或独立结果文件的溯源信息。这样后续更换OGGM版本、RGI版本或气候数据时,才能判断差异来自模型还是来自输入。
OGGM 的可扩展性建立在以下几个核心设计模式之上:
| 扩展点 | 机制 | 稳定性 |
|---|---|---|
| 气候数据源 | Shop 模块模式:在 cfg.PARAMS['baseline_climate'] 中注册 download + process 函数 |
稳定 |
| 物质平衡模型 | 继承 MassBalanceModel;实现 get_monthly_mb() 和 get_annual_mb() |
稳定 |
| 冰床形状 / flowline | 继承 Flowline 或 MixedBedFlowline;实现几何方法 |
稳定 |
| 演化模型 | 继承 FlowlineModel;实现 step();在 cfg.PARAMS['evolution_model'] 中注册 |
稳定 |
| Entity Task | 使用 @entity_task 装饰器;通过 writes 声明依赖关系 |
稳定 |
| 配置参数 | 向 params.cfg 添加条目;使用前在 PARAMS 中编写文档 |
半稳定 |
| 输出文件类型 | 在 cfg.BASENAMES 中注册新的键名 |
稳定 |
"Shop" 模式是 OGGM 实现可插拔气候数据提供者的机制。每个 shop 模块实现统一的接口, 使得用户可以在不改变工作流程代码的情况下切换不同的气候数据集。
一个完整的 shop 模块需要两个具有标准化签名的函数:
def my_climate_download(gdirs=None, path=True, reset=False):
"""Download climate data for a set of glaciers.
Parameters
----------
gdirs : list of GlacierDirectory, optional
Glacier directories to process. If None, downloads all available.
path : bool or str
If True, return output path. If str, path to download directory.
reset : bool
If True, re-download existing files.
Returns
-------
str
Path to the downloaded data directory.
"""
@entity_task(logger, writes=['climate_historical'])
def process_my_climate(gdir, **kwargs):
"""Process downloaded climate data for a single glacier.
This function reads the raw downloaded data and writes a
climate_historical.nc file in the glacier directory.
Parameters
----------
gdir : GlacierDirectory
**kwargs : forwarded to implementation
"""
Shop 模块通过修改配置来注册自身:
# In your module's __init__ or at import time:
import oggm.cfg as cfg
# Register as an available climate data source
cfg.PARAMS['baseline_climate'].append('MY_CLIMATE_DATA')
# Optionally set as the active source
cfg.PARAMS['baseline_climate'] = 'MY_CLIMATE_DATA'
oggm.core.climate 中的核心函数 process_climate_data() 充当调度器的角色。
当用户调用 process_climate_data(gdir) 时,调度器会查询
cfg.PARAMS['baseline_climate'] 并将请求路由到对应的 shop 模块:
# Simplified dispatcher logic in oggm.core.climate
def process_climate_data(gdir, **kwargs):
source = cfg.PARAMS['baseline_climate']
if source == 'CRU':
from oggm.shop import cru
cru.process_cru_data(gdir, **kwargs)
elif source == 'HISTALP':
from oggm.shop import histalp
histalp.process_histalp_data(gdir, **kwargs)
elif source == 'MY_CLIMATE_DATA':
from my_package import my_shop
my_shop.process_my_climate(gdir, **kwargs)
else:
raise ValueError(f"Unknown climate source: {source}")
download_*() 函数。process_*() 函数,使用 @entity_task 装饰,写入 climate_historical。cfg.PARAMS['baseline_climate'] 中注册。gdir.write_climate() 或等效方法写入输出文件。oggm.core.climate.process_climate_data 的调度器中添加数据源。
OGGM 中所有物质平衡模型均继承自 oggm.core.massbalance.MassBalanceModel。
要创建自定义模型,需继承此基类并实现所需的接口。
class MassBalanceModel:
"""Base class for all mass balance models."""
def __init__(self):
self.valid_bounds = None # (min_h, max_h) for extrapolation limits
self.hemisphere = 'nh' # 'nh' or 'sh'
def get_monthly_mb(self, heights, year=None, fl_id=None, fl_widths=None):
"""Compute monthly mass balance at given heights.
Parameters
----------
heights : ndarray
Heights (m) at which to compute mass balance.
year : int, optional
The year for which to compute (not used by all models).
fl_id : int, optional
Flowline ID for multi-flowline glaciers.
fl_widths : ndarray, optional
Flowline widths at each grid point.
Returns
-------
ndarray
Monthly mass balance (kg m⁻²) for each height for each month.
Shape: (12, n_heights)
"""
raise NotImplementedError()
def get_annual_mb(self, heights, year=None, fl_id=None, fl_widths=None):
"""Compute annual mass balance.
Returns
-------
ndarray
Annual mass balance (kg m⁻² yr⁻¹) for each height.
Shape: (n_heights,)
"""
raise NotImplementedError()
def get_specific_mb(self, heights=None, widths=None, fls=None, year=None):
"""Compute glacier-wide specific mass balance."""
raise NotImplementedError()
def get_ela(self, year=None, **kwargs):
"""Compute the equilibrium line altitude."""
raise NotImplementedError()
import numpy as np
from oggm.core.massbalance import MassBalanceModel
class ELAMassBalance(MassBalanceModel):
"""Simple mass balance model defined by an ELA and gradients.
Parameterization:
b(z) = (z - ELA) * grad_above for z >= ELA
b(z) = (z - ELA) * grad_below for z < ELA
"""
def __init__(self, ela=2800, grad_above=-0.007, grad_below=-0.012,
max_mb=4.0):
super().__init__()
self.ela = ela
self.grad_above = grad_above # m w.e. per m elevation (above ELA)
self.grad_below = grad_below # m w.e. per m elevation (below ELA)
self.max_mb = max_mb # kg m⁻², maximum accumulation
self.valid_bounds = [0, 10000] # m a.s.l.
def get_monthly_mb(self, heights, year=None, fl_id=None, fl_widths=None):
# Annual model, distribute evenly across months
annual = self.get_annual_mb(heights, year, fl_id, fl_widths)
monthly = np.tile(annual, (12, 1)) / 12.0
return monthly
def get_annual_mb(self, heights, year=None, fl_id=None, fl_widths=None):
z = np.asarray(heights)
mb = np.where(
z >= self.ela,
(z - self.ela) * self.grad_above,
(z - self.ela) * self.grad_below
)
# Cap accumulation
mb = np.minimum(mb, self.max_mb * 1000) # convert to kg m⁻²
return mb
def get_ela(self, year=None, **kwargs):
return self.ela
在实际应用中,大多数用于 flowline 演化的物质平衡模型都被包装在
MultipleFlowlineMassBalance 中,该类负责将调用分派给各个 flowline
并管理多 flowline 的几何结构:
from oggm.core.massbalance import MultipleFlowlineMassBalance
# Wrap your custom model
mb_model = ELAMassBalance(ela=2900)
multi_mb = MultipleFlowlineMassBalance(
gdir, mb_model_class=ELAMassBalance,
ela=2900
)
如果你的物质平衡模型具有可标定的参数,可以通过实现 temp_bias 或 prcp_fac 接口,
或者提供自定义的标定函数来集成到 OGGM 的标定框架中。
oggm.core.climate.mb_calibration 中的标定循环也可以被子类化。
冰川的几何形态由 Flowline 对象表示。OGGM 提供了
MixedBedFlowline(用于处理部分基岩覆盖的情况),但你也可以为特定应用场景创建
自定义的冰床形状。
class Flowline:
"""Base class for 1D glacier flowlines."""
def __init__(self, line=None, dx=1, map_dx=None,
surface_h=None, bed_h=None):
# Core geometry arrays
self.dis_on_line = None # distance along the flowline (m)
self.surface_h = None # surface elevation (m a.s.l.)
self.bed_h = None # bed elevation (m a.s.l.)
self.widths = None # glacier width at each grid point (m)
self.widths_m = None # widths at staggered grid points (m)
self.dx = None # grid spacing (m)
self.dx_meter = None # grid spacing in meters
self.map_dx = None # map-projected dx (m)
def _add_attrs_to_save(self):
"""Return dict of attributes to persist."""
return {
'dis_on_line': self.dis_on_line,
'surface_h': self.surface_h,
'bed_h': self.bed_h,
'widths': self.widths,
'dx': self.dx,
}
import numpy as np
from oggm.core.flowline import Flowline
class ParabolicBedFlowline(Flowline):
"""Flowline with a parabolic bed profile.
z_b(x) = z_head - (z_head - z_terminus) * (x / L)^2
"""
def __init__(self, line=None, dx=1, map_dx=None,
surface_h=None, bed_h=None,
z_head=3000, z_terminus=1000):
super().__init__(line, dx, map_dx, surface_h, bed_h)
self.z_head = z_head
self.z_terminus = z_terminus
if line is not None:
self._set_parabolic_bed()
def _set_parabolic_bed(self):
L = self.dis_on_line[-1]
x = self.dis_on_line
self.bed_h = self.z_head - (self.z_head - self.z_terminus) * (x / L)**2
def _add_attrs_to_save(self):
attrs = super()._add_attrs_to_save()
attrs['z_head'] = self.z_head
attrs['z_terminus'] = self.z_terminus
return attrs
冰川演化求解器是 OGGM 动力学模型的计算核心。OGGM 支持多种求解器, 每种求解器针对 SIA 方程实现了特定的数值方案。
class FlowlineModel:
"""Base class for glacier evolution models."""
def __init__(self, flowlines, mb_model=None, y0=0,
glen_a=None, fs=None, **kwargs):
self.fls = flowlines
self.mb_model = mb_model
self.y0 = y0
self.glen_a = glen_a or cfg.PARAMS['glen_a']
self.fs = fs or cfg.PARAMS['fs']
def step(self, dt):
"""Advance the model by one time step.
Parameters
----------
dt : float
Time step in years.
Returns
-------
float
The actual time step used (may be smaller due to CFL).
"""
raise NotImplementedError()
def run_until(self, y1):
"""Run the model from current year to y1.
Parameters
----------
y1 : float
End year.
Returns
-------
dict
Model diagnostics at y1.
"""
raise NotImplementedError()
def run_until_and_store(self, y1, **kwargs):
"""Run and store diagnostics along the way."""
raise NotImplementedError()
def to_netcdf(self, path):
"""Save model state to NetCDF."""
raise NotImplementedError()
import numpy as np
from oggm.core.flowline import FlowlineModel
class ExplicitEulerModel(FlowlineModel):
"""Simplified glacier evolution using explicit Forward Euler.
This is for educational or small-scale use only — it does not
include the semi-implicit scheme or adaptive time-stepping of
the production FluxBasedModel.
"""
def __init__(self, flowlines, mb_model=None, y0=0,
glen_a=None, fs=None, fixed_dt=0.01):
super().__init__(flowlines, mb_model, y0, glen_a, fs)
self.fixed_dt = fixed_dt # years
def step(self, dt=None):
dt = dt or self.fixed_dt
for fl in self.fls:
# Compute ice flux via SIA
thick = fl.thick
slope = np.gradient(fl.surface_h, fl.dx_meter)
# Glen's flow law: u ~ A * tau^n * H^(n+2)
n = 3 # Glen's exponent
tau = 900 * 9.81 * thick * np.abs(slope) # density * g * H * sin(alpha)
flux = (2 * self.glen_a * self.fs * tau**n *
thick**(n + 2) * slope / (n + 2))
# Mass continuity
dHdt = -np.gradient(flux, fl.dx_meter) / fl.widths
mb = self.mb_model.get_annual_mb(
fl.surface_h, year=int(self.y0)
)
fl.thick += (dHdt + mb / 910) * dt # density conversion
fl.thick = np.maximum(fl.thick, 0) # no negative thickness
self.y0 += dt
return dt
def run_until(self, y1):
while self.y0 < y1:
self.step()
return {'year': self.y0}
# Register your model so it can be used by flowline_model_run()
cfg.PARAMS['evolution_model'] = 'explicit_euler'
@entity_task 装饰器是 OGGM 工作流系统的骨干。创建自定义 entity task
可以让你的处理逻辑无缝集成到 OGGM 的并行执行、日志记录和依赖跟踪体系中。
from oggm import entity_task
@entity_task(logger, writes=['custom_output'])
def my_custom_task(gdir, param_a=1.0, param_b='default'):
"""A custom per-glacier processing task.
Parameters
----------
gdir : GlacierDirectory
The glacier directory to process.
param_a : float
A numerical parameter.
param_b : str
A string parameter.
Returns
-------
dict
Results dictionary saved as gdir.custom_output.
"""
# Your processing logic here
result = {'param_a': param_a, 'param_b': param_b,
'rgi_id': gdir.rgi_id}
# Write output
gdir.write_json(result, 'custom_output')
return result
writes 参数对于依赖跟踪至关重要。它声明任务会生成哪些文件。
下游任务可以声明对这些文件的依赖:
@entity_task(logger, writes=['analysis_result'])
def analysis_task(gdir):
# This task needs the output of my_custom_task
# The workflow system ensures proper ordering
custom = gdir.read_json('custom_output') # written by my_custom_task
analysis = process(custom)
gdir.write_json(analysis, 'analysis_result')
return analysis
from oggm import workflow
gdirs = workflow.init_glacier_directories(['RGI60-11.00897'])
# Run your custom task on all glaciers (serial or parallel)
workflow.execute_entity_task(my_custom_task, gdirs,
param_a=2.5, param_b='custom')
# Results are now stored in each glacier directory
for gdir in gdirs:
result = gdir.read_json('custom_output')
print(f"{gdir.rgi_id}: {result}")
自定义参数应添加到本地的 params.cfg 文件中:
# my_extension_params.cfg
[my_extension]
# Calibration factor for my custom mass balance model
my_calib_factor = 1.0
# Switch to enable experimental ice-dynamic scheme
my_experimental_scheme = False
# Number of iterations for my iterative solver
my_max_iter = 100
在初始化时加载自定义参数:
import oggm.cfg as cfg
cfg.initialize()
# Load your custom parameters
cfg.PARAMS['my_calib_factor'] = 1.0
cfg.PARAMS['my_experimental_scheme'] = False
cfg.PARAMS['my_max_iter'] = 100
# Always document in the PARAMS OrderedDict
cfg.PARAMS.doc['my_calib_factor'] = (
'Calibration factor for the custom mass balance model.'
)
cfg.PARAMS.doc 中拥有文档条目。
该要求在初始化时强制执行。如果没有正确的文档,OGGM 将发出警告(在严格模式下则为错误)。
文档字符串应包含参数的用途、默认值、允许范围(如有)以及受影响的模块。
如果你的扩展会生成新类型的输出文件,请在 cfg.BASENAMES 中注册它们,
以便文件管理系统能够识别:
from oggm import cfg
# Register a new output file type
cfg.BASENAMES['my_extension_output'] = ('my_output.nc', 'My extension NetCDF output')
# Format: (filename, description)
# Now you can use it with gdir methods:
# gdir.get_filepath('my_extension_output')
# gdir.write_netcdf(data_dict, 'my_extension_output')
以下扩展示例将所有扩展机制串联在一起,构建一个完整、可复用的自定义 ELA 模型插件。
# File: custom_ela_model.py
"""Custom equilibrium line altitude model for OGGM.
This module demonstrates all OGGM extension patterns:
- A mass balance model subclass
- A custom entity task
- Parameter registration
- Integration with the workflow system
"""
import numpy as np
import oggm.cfg as cfg
from oggm import entity_task
from oggm.core.massbalance import MassBalanceModel, MultipleFlowlineMassBalance
from oggm import workflow
# ---- 1. Register custom parameters ----
cfg.PARAMS['custom_ela_default'] = 3000.0
cfg.PARAMS.doc['custom_ela_default'] = (
'Default ELA (m a.s.l.) for the CustomELAModel when no calibration '
'data is available.'
)
# ---- 2. Define the mass balance model ----
class CustomELAModel(MassBalanceModel):
"""Mass balance model with a user-specified ELA.
Monthly mass balance is derived from a sinusoidal annual cycle
with the ELA controlling the zero-crossing point.
"""
def __init__(self, ela=3000, amplitude=2000, melt_factor=0.005):
super().__init__()
self.ela = ela
self.amplitude = amplitude # kg m⁻², seasonal amplitude
self.melt_factor = melt_factor # m w.e. per m below ELA
self.valid_bounds = [-1000, 9000]
def get_monthly_mb(self, heights, year=None, fl_id=None, fl_widths=None):
z = np.asarray(heights)
# Sinusoidal annual cycle
months = np.arange(12)
seasonal = (self.amplitude / 1000 *
np.sin(2 * np.pi * (months - 3) / 12)) # max in July
# Elevation gradient
elev_effect = np.where(
z > self.ela,
-0.005 * (z - self.ela), # net accumulation above ELA
-0.010 * (self.ela - z) # net ablation below ELA
)
mb = np.outer(seasonal, np.ones_like(z)) + elev_effect[np.newaxis, :]
return mb * 1000 # to kg m⁻²
def get_annual_mb(self, heights, year=None, fl_id=None, fl_widths=None):
monthly = self.get_monthly_mb(heights, year, fl_id, fl_widths)
return monthly.sum(axis=0)
def get_ela(self, year=None, **kwargs):
return self.ela
# ---- 3. Create a custom entity task ----
@entity_task('logger', writes=['custom_ela_results'])
def compute_ela_statistics(gdir, ela_values=None):
"""Compute glacier statistics for different ELA values.
Parameters
----------
gdir : GlacierDirectory
ela_values : list of float, optional
ELAs to test. Defaults to [2500, 3000, 3500].
Returns
-------
dict
AAR, accumulation area, and total area for each ELA.
"""
if ela_values is None:
ela_values = [2500, 3000, 3500]
results = {}
for ela in ela_values:
mb_model = CustomELAModel(ela=ela)
heights = gdir.gridded_data['glacier_topo']
masks = gdir.gridded_data['glacier_mask'].astype(bool)
glacier_h = heights[masks]
if len(glacier_h) == 0:
results[ela] = {'aar': 0, 'acc_area_km2': 0, 'total_area_km2': 0}
continue
aar = (glacier_h >= ela).sum() / len(glacier_h)
dx = gdir.grid.dx
pixel_area_km2 = (dx ** 2) / 1e6
acc_area_km2 = (glacier_h >= ela).sum() * pixel_area_km2
total_area_km2 = len(glacier_h) * pixel_area_km2
results[ela] = {
'aar': round(aar, 4),
'acc_area_km2': round(acc_area_km2, 3),
'total_area_km2': round(total_area_km2, 3)
}
gdir.write_json(results, 'custom_ela_results')
return results
# ---- 4. Example workflow ----
if __name__ == '__main__':
# Standard pipeline
cfg.initialize()
gdirs = workflow.init_glacier_directories(['RGI60-11.00897'])
workflow.execute_entity_task(workflow.gis_prepro_tasks, gdirs)
# Run custom analysis
workflow.execute_entity_task(compute_ela_statistics, gdirs)
# Inspect results
for gdir in gdirs:
print(gdir.read_json('custom_ela_results'))
get_monthly_mb() 返回 shape 为 (12, n_heights) 的结果,以及
get_annual_mb() 返回 shape 为 (n_heights,) 的结果。get_demo_file('Hintereisferner_RGI5.shp'))。__version__ 属性中。