"""
hydroravens.hydrograph_separation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Estimate reservoir timescales and initial storage depths from an observed
discharge time series, without running the hydrological model.
The pipeline has two stages:
1. **Spectral stage** — fit a cascade of linear-reservoir transfer functions
(product of Lorentzians) to the power spectral density of Q. AIC selects
the number of fast reservoirs. Timescales and initial storage for the fast
components come from this stage.
2. **Recession stage** — remove the fast components with a recursive digital
filter, leaving a slowly varying residual. Fit log(Q_residual) vs. time
on dry-season recession segments to extract τ_karst and its initial storage.
The primary output is physically informed initial conditions and calibration
parameter priors for hydroRaVENS, reducing spin-up requirements and improving
optimizer starting points.
References
----------
Maillet (1905) : multi-exponential recession decomposition of spring flows.
Young (1984) : recursive estimation and modelling of nonstationary time series.
Jakeman & Hornberger (1993) : IHACRES transfer-function identification.
Kirchner (2009): catchments as simple dynamical systems — spectral approach.
Eckhardt (2005): recursive digital baseflow separation filter.
"""
import numpy as np
from scipy.optimize import curve_fit
from scipy.signal import csd, welch
from scipy.stats import linregress
[docs]
class HydrographSeparation:
"""
Separate observed discharge into reservoir components and estimate
timescales and initial storage depths.
Parameters
----------
Q : array-like
Observed specific discharge [mm/day].
dt : float
Timestep [days]. Default 1.0.
n_reservoirs : int or None
Number of reservoirs. If None, selected by AIC on the spectral fit.
The slowest reservoir (karst/deep) is always fitted by recession
analysis regardless of this setting.
max_reservoirs : int
Maximum number of reservoirs to test when n_reservoirs is None.
Default 4.
precip : array-like or None
Daily precipitation [mm/day], same length as Q. Used to mask
recession periods; if None, only discharge decline is used.
recession_min_duration : int
Minimum qualifying recession length [days]. Default 10.
recession_precip_threshold : float
Maximum daily precipitation within a recession segment [mm/day].
Default 0.5.
recession_antecedent_days : int
Number of days immediately before a recession that must also be dry.
Default 3.
tau_deep : float or None
Externally supplied timescale [days] for the deep (karst) reservoir.
When provided, recession fitting is skipped for τ_karst and this
value is used directly. Recommended when τ_karst >> T_record, since
a decay too slow to observe cannot be reliably estimated from the
discharge record alone.
tau_fast : array-like or None
Externally supplied timescales [days] for the fast reservoirs (all
except the deepest), e.g. from a prior calibration run. When
provided, spectral fitting is skipped. Note: spectral corner periods
are 2πτ, not τ — a 29-day reservoir has its -3 dB corner at 182 days,
and a 466-day soil reservoir at ~8 years, which may lie near or
beyond the record length and be unreliable from spectral fitting alone.
Attributes (populated after fit())
------------------------------------
tau : ndarray
Reservoir timescales [days], ordered slowest to fastest.
H0 : ndarray
Initial storage depths [mm], same order as tau.
n_reservoirs_fitted : int
Number of reservoirs selected (spectral fast + 1 karst).
aic_scores : list of float
AIC values for k = 1 … max_reservoirs-1 spectral fits.
tau_karst : float
Deep-reservoir timescale [days] from recession fitting.
"""
def __init__(self, Q, dt=1.0, n_reservoirs=None, max_reservoirs=4,
precip=None, recession_min_duration=10,
recession_precip_threshold=0.5, recession_antecedent_days=3,
tau_deep=None, tau_fast=None):
self.Q = np.asarray(Q, dtype=float)
self.dt = float(dt)
self.n_reservoirs = n_reservoirs
self.max_reservoirs = max_reservoirs
self.precip = (np.asarray(precip, dtype=float)
if precip is not None else None)
self.recession_min_duration = recession_min_duration
self.recession_precip_threshold = recession_precip_threshold
self.recession_antecedent_days = recession_antecedent_days
self.tau_deep = float(tau_deep) if tau_deep is not None else None
# Externally supplied fast timescales [days], shortest first.
# When provided, skip spectral fitting and use these directly.
self.tau_fast = (np.sort(np.asarray(tau_fast, dtype=float))
if tau_fast is not None else None)
# Results
self.tau = None
self.H0 = None
self.n_reservoirs_fitted = None
self.aic_scores = None
self.tau_karst = None
self.tau_karst_longterm = None # long-term absolute-time estimate (comparison)
self._tau_spectral = None # fast timescales only (fastest first)
self._Q_residual = None # Q after removing fast components
self._Q_components_fast = None # list of fast-component time series
self._psd_freqs = None
self._psd = None
self._tau_shallow_td = None # time-domain τ_shallow estimate
self._tau_soil_td = None # time-domain τ_soil estimate
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
[docs]
def fit(self):
"""Run the full separation pipeline. Returns self."""
self._compute_psd()
self._fit_spectral_model()
self._fit_timescales_td()
self._separate_components()
self._fit_recession()
self._compute_initial_conditions()
return self
[docs]
def get_initial_conditions(self):
"""
Initial reservoir storage depths, ordered fastest to slowest
(shallow first, karst last), matching the hydroRaVENS reservoir
index convention.
Returns
-------
dict
``{'H0': [H0_shallow, H0_soil, ..., H0_karst]}`` in mm.
"""
self._check_fitted()
return {'H0': list(self.H0[::-1])}
[docs]
def get_parameter_priors(self):
"""
Suggested initial values and bounds (in params.yml units) for the
hydroRaVENS log__t_efold_* calibration parameters.
Bounds are ±0.5 in log10 around the estimated timescale — wide enough
to allow the optimizer freedom, tight enough to stay physical.
Returns
-------
dict
Keyed by hydroRaVENS parameter name. Each value is a dict with
``'initial'``, ``'lower'``, ``'upper'`` in log10(days).
"""
self._check_fitted()
all_names = ['log__t_efold_shallow', 'log__t_efold_soil', 'log__t_efold_karst']
tau_fast_to_slow = self.tau[::-1] # shallow first, karst last
n = len(tau_fast_to_slow)
# Always end with karst; fill fast slots from the front.
# For 2 reservoirs: ['shallow', 'karst'] — skip soil.
names = all_names[:n - 1] + [all_names[-1]]
priors = {}
for name, tau_i in zip(names, tau_fast_to_slow):
if not np.isfinite(tau_i):
# For τ_soil, fall back to the time-domain rolling-minimum
# estimate (not used for LP separation but good enough as a
# prior hint — still biased low, so bounds are left wide).
if name == 'log__t_efold_soil' and self._tau_soil_td is not None:
tau_i = self._tau_soil_td
else:
priors[name] = None # could not be estimated; keep params.yml defaults
continue
log_tau = np.log10(tau_i)
priors[name] = {
'initial': round(float(log_tau), 3),
'lower': round(float(log_tau) - 0.5, 3),
'upper': round(float(log_tau) + 0.5, 3),
}
return priors
[docs]
def summary(self):
"""Print a brief summary of fitted timescales and initial conditions."""
self._check_fitted()
tau_f2s = self.tau[::-1]
H0_f2s = self.H0[::-1]
labels = ['shallow', 'soil', 'karst'] + [f'res{i}' for i in range(3, len(tau_f2s))]
print('HydrographSeparation results')
print(f' n_reservoirs : {self.n_reservoirs_fitted}')
print(f' {"Reservoir":<10} {"τ (days)":>12} {"H₀ (mm)":>12}')
print(f' {"-"*10} {"-"*12} {"-"*12}')
for label, tau_i, H0_i in zip(labels, tau_f2s, H0_f2s):
print(f' {label:<10} {tau_i:>12.1f} {H0_i:>12.1f}')
if self.aic_scores is not None:
print(f'\n AIC by fast-reservoir order: '
f'{[round(a, 1) for a in self.aic_scores]}')
print(f'\n Time-domain estimates:')
td_sh = (f'{self._tau_shallow_td:.1f} d'
if self._tau_shallow_td is not None else 'not estimated')
print(f' τ_shallow : {td_sh}')
if self.n_reservoirs_fitted >= 3 or self._tau_soil_td is not None:
td_so = (f'{self._tau_soil_td:.1f} d'
if self._tau_soil_td is not None
else 'not estimated (insufficient long dry-period recessions)')
print(f' τ_soil : {td_so}')
fallback_tau = 10.0 * len(self.Q) * self.dt
is_fallback = abs(self.tau_karst / fallback_tau - 1.0) < 0.01
fb_note = ' ← fallback: no declining trend detected' if is_fallback else ''
print(f'\n τ_karst (segment minima): {self.tau_karst:.1f} days{fb_note}')
if self.tau_karst_longterm is not None and not is_fallback:
agree = abs(np.log(self.tau_karst / self.tau_karst_longterm)) < 0.5
print(f' τ_karst (long-term slope): {self.tau_karst_longterm:.1f} days'
f' {"← agree" if agree else "← disagree — check interannual trend"}')
# ------------------------------------------------------------------
# Stage 1a: time-domain τ estimation (primary for fast reservoirs)
# ------------------------------------------------------------------
def _fit_recession_scale(self, Q_signal, min_dur, smooth_days, antecedent_days,
precip_threshold=None, min_tau=1.0, max_tau=None,
mode='within'):
"""
Estimate a recession timescale from recession-segment analysis.
Both current callers (τ_shallow and τ_soil in :meth:`_fit_timescales_td`)
use ``mode='within'``. τ_karst has its own dedicated pipeline in
:meth:`_fit_recession`, which uses the same per-segment-minimum approach
as ``mode='cross'`` but adds a long-term cross-check and reconciliation
step not available here.
Two modes:
``'within'`` (default) — for fast/intermediate timescales:
Fit log(Q) vs relative time within each qualifying segment; take
the median slope. Captures the LOCAL recession rate, which is
dominated by the fastest reservoir still active over that duration.
Use short segments (5-20 d) for τ_shallow; longer segments (≥15 d)
on the rolling-minimum envelope for τ_soil.
``'cross'`` — for inter-annual timescales:
Record the minimum Q within each segment and fit log(Q_min) vs
absolute time across all segments. Captures the INTER-ANNUAL
drainage trend that persists after fast reservoirs empty.
Not called internally; available for external or diagnostic use.
Parameters
----------
Q_signal : ndarray
Discharge signal to analyse [mm/day].
min_dur : int
Minimum qualifying segment length [days].
smooth_days : float
Moving-average window [days] for the declining test only.
antecedent_days : int
Days before segment start that must also satisfy the precipitation
gate.
precip_threshold : float or None
Precipitation gate [mm/day]. Defaults to
self.recession_precip_threshold.
min_tau : float
Reject fits with τ < min_tau [days].
max_tau : float or None
Reject fits with τ > max_tau [days].
mode : {'within', 'cross'}
Regression strategy (see above).
Returns
-------
tau : float or None
Estimated timescale [days], or None if fewer than three segments
qualify or the slope is non-negative.
n_segs : int
Number of qualifying segments used.
"""
n = len(Q_signal)
threshold = (precip_threshold if precip_threshold is not None
else self.recession_precip_threshold)
# Smooth signal for declining test only
w = max(1, int(round(smooth_days / self.dt)))
if w > 1:
kernel = np.ones(w) / w
Q_smooth = np.convolve(Q_signal, kernel, mode='same')
else:
Q_smooth = Q_signal.copy()
Q_smooth = np.maximum(Q_smooth, 0.0)
# Precipitation gate
if self.precip is not None:
dry = self.precip < threshold
for lag in range(1, antecedent_days + 1):
dry[lag:] &= dry[:-lag]
else:
dry = np.ones(n, dtype=bool)
# Declining test on smoothed signal
declining = np.zeros(n, dtype=bool)
declining[:-1] = Q_smooth[1:] < Q_smooth[:-1]
candidate = dry & declining
# Collect qualifying segments
mask = np.zeros(n, dtype=bool)
i = 0
while i < n:
if candidate[i]:
j = i + 1
while j < n and candidate[j]:
j += 1
if j - i >= min_dur:
mask[i:j] = True
i = j
else:
i += 1
if not np.any(mask):
return None, 0
idx = np.where(mask)[0]
breaks = np.where(np.diff(idx) > 1)[0] + 1
segs = np.split(idx, breaks)
min_valid = max(3, min_dur // 3)
if mode == 'within':
slopes = []
for seg in segs:
Q_seg = Q_signal[seg]
valid = Q_seg > 0
if valid.sum() < min_valid:
continue
t_rel = np.arange(valid.sum()) * self.dt
s, _, _, _, _ = linregress(t_rel, np.log(Q_seg[valid]))
if s < -1e-12:
slopes.append(s)
n_segs = len(slopes)
if n_segs < 3:
return None, n_segs
slope = float(np.median(slopes))
else: # 'cross': per-segment minimum vs absolute time
t_min_list = []
Q_min_list = []
for seg in segs:
Q_seg = Q_signal[seg]
valid = Q_seg > 0
if valid.sum() < min_valid:
continue
min_pos = np.argmin(Q_seg[valid])
abs_idx = seg[valid][min_pos]
t_min_list.append(abs_idx * self.dt)
Q_min_list.append(float(Q_seg[valid][min_pos]))
n_segs = len(t_min_list)
if n_segs < 3:
return None, n_segs
slope, _, _, _, _ = linregress(np.array(t_min_list),
np.log(np.array(Q_min_list)))
if slope >= -1e-12:
return None, n_segs
tau = -1.0 / slope
if tau < min_tau:
return None, n_segs
if max_tau is not None and tau > max_tau:
return None, n_segs
return tau, n_segs
def _fit_timescales_td(self):
"""
Estimate fast-reservoir timescales via time-domain recession analysis
and update self._tau_spectral with the results.
Two stages of sequential peeling:
1. **τ_shallow** — within-segment log(Q) slope on raw Q with short (≥5 day)
segments and a 1-day smooth. Accepts τ in [2, 200] days.
2. **τ_soil** — per-segment minimum on Q after removing the shallow
component (low-pass at τ_shallow). Uses a moving-average window
equal to τ_shallow to suppress residual fast noise, a relaxed
precipitation threshold (2 mm/day — small rain barely recharges the
soil), and an antecedent window ≈ τ_shallow to wait for the shallow
reservoir to drain. Only attempted when n_reservoirs ≥ 3.
Time-domain estimates replace the corresponding spectral values in
self._tau_spectral when available; spectral values serve as fallbacks
only if they are well-separated from the shallow scale (> 5×τ_shallow).
Diagnostic attributes _tau_shallow_td and _tau_soil_td are set
regardless of whether they replace spectral values.
"""
if self.tau_fast is not None:
self._tau_shallow_td = None
self._tau_soil_td = None
return
n_fast_target = (self.n_reservoirs - 1 if self.n_reservoirs is not None
else self.max_reservoirs - 1)
# ---- τ_shallow -------------------------------------------------------
# Within-segment median slope on short (5-15 d) recession segments.
# Short segments capture the fast shallow reservoir; deeper reservoirs
# act as a near-constant baseline and barely affect the slope.
tau_sh, _ = self._fit_recession_scale(
self.Q,
min_dur=5,
smooth_days=1.0,
antecedent_days=self.recession_antecedent_days,
min_tau=2.0,
max_tau=200.0,
mode='within',
)
self._tau_shallow_td = tau_sh
# ---- τ_soil (only when ≥ 3 reservoirs expected) ---------------------
# Problem with LP filtering for τ_soil estimation: applying a low-pass
# filter at τ_shallow creates a signal (Q_slow) whose FILTER IMPULSE
# RESPONSE decays with timescale τ_shallow. Recession segments in Q_slow
# that start within ~3×τ_shallow days of a rain event are still in the
# filter-decay regime, so within-segment slopes give τ_eff ≈ τ_shallow,
# not τ_soil. With Minnesota dry periods of only 20-35 days, most
# segments start before the filter has fully settled.
#
# Fix: use a ROLLING MINIMUM over a 2×τ_shallow window. The rolling
# minimum extracts the lower envelope of Q (slow baseflow), cleanly
# suppressing storm pulses without creating a lingering exponential tail.
# The resulting Q_rmin ≈ Q_soil + Q_karst essentially everywhere it is
# declining, with no multi-week filter-decay artefact.
# Within-segment slopes on Q_rmin give
# τ_eff = (Q_soil/Q_rmin)/τ_soil + (Q_karst/Q_rmin)/τ_karst
# Theoretically τ_eff > τ_soil (karst floor slows the apparent decay),
# but in practice residual shallow contamination in early recession
# segments biases τ_eff short. The min_tau = 8×τ_shallow guard rejects
# the worst-contaminated segments; the result is a rough lower bound on
# τ_soil, useful as a prior but not a precise estimate.
self._tau_soil_td = None
if n_fast_target >= 2 and tau_sh is not None:
n_sig = len(self.Q)
w_rmin = max(1, int(round(2.0 * tau_sh / self.dt)))
Q_rmin = np.array([
float(np.min(self.Q[max(0, t - w_rmin + 1):t + 1]))
for t in range(n_sig)
])
tau_so, _ = self._fit_recession_scale(
Q_rmin,
min_dur=15,
smooth_days=tau_sh,
antecedent_days=5,
precip_threshold=2.0,
min_tau=8.0 * tau_sh,
mode='within',
)
self._tau_soil_td = tau_so
# ---- update _tau_spectral --------------------------------------------
# NOTE: _tau_soil_td is intentionally NOT added to _tau_spectral here.
# The rolling-minimum estimate is still biased low (transition regime),
# so including it in the LP separation would corrupt Q_residual and
# degrade τ_karst. It is used only in get_parameter_priors().
n_sp = len(self._tau_spectral)
td_taus = []
# Shallow position (fastest)
if tau_sh is not None:
td_taus.append(tau_sh)
elif n_sp >= 1:
td_taus.append(float(self._tau_spectral[0]))
# Soil position (second fastest), only when ≥ 3 reservoirs requested
if n_fast_target >= 2:
if n_sp >= 2:
# Accept spectral τ_soil only when it is well-separated from
# τ_shallow — if spectral fitting collapsed both to the same
# corner (common when seasonal peaks dominate the PSD), discard
ref = td_taus[0] if td_taus else None
sp_soil = float(self._tau_spectral[1])
if ref is not None and sp_soil > 5.0 * ref:
td_taus.append(sp_soil)
# else: drop to 2-reservoir initialisation
if td_taus:
self._tau_spectral = np.sort(np.array(td_taus)) # shortest first
self.n_reservoirs_fitted = len(td_taus) + 1 # +1 for karst
# ------------------------------------------------------------------
# Stage 1b: spectral fitting (fallback / diagnostic)
# ------------------------------------------------------------------
def _compute_psd(self):
"""
Estimate the spectral target for Lorentzian fitting.
When precipitation is provided, compute the transfer function magnitude
|H(f)|² = |S_QP(f)|² / S_PP(f) (cross-spectrum / input PSD). This
cancels the colour of the precipitation spectrum and gives a clean
estimate of the linear-reservoir cascade response regardless of whether
the input is white noise, intermittent, or seasonal.
Without precipitation, fall back to the output PSD S_QQ(f), which
conflates input colour with reservoir structure but is still useful
when timescale separation is large.
"""
Q = self.Q.copy()
Q[~np.isfinite(Q)] = np.nanmean(Q)
nperseg = min(len(Q) // 4, 1024)
kw = dict(fs=1.0 / self.dt, nperseg=nperseg,
window='hann', detrend='constant')
if self.precip is not None:
P = self.precip.copy()
P[~np.isfinite(P)] = 0.0
freqs, S_PP = welch(P, **kw)
freqs, S_QP = csd(Q, P, **kw)
# Transfer function magnitude squared; guard against near-zero S_PP
safe_spp = np.where(S_PP > 0, S_PP, np.nan)
H2 = np.abs(S_QP) ** 2 / safe_spp
spectral = H2
else:
freqs, spectral = welch(Q, **kw)
# The above uses nperseg = T_record/4, which may not resolve the
# slow-reservoir corner. Supplement with a full-record periodogram
# (nperseg = N) for the low-frequency bins that are missing. The
# periodogram is noisier but gives the only spectral estimates below
# 4/T_record, where the soil or karst corners may sit.
n = len(Q)
if nperseg < n:
kw_full = dict(fs=1.0 / self.dt, nperseg=n,
window='hann', detrend='constant')
if self.precip is not None:
f2, S_PP2 = welch(P, **kw_full)
f2, S_QP2 = csd(Q, P, **kw_full)
safe2 = np.where(S_PP2 > 0, S_PP2, np.nan)
spec2 = np.abs(S_QP2) ** 2 / safe2
else:
f2, spec2 = welch(Q, **kw_full)
# Use full-record estimate for frequencies below the Welch resolution
f_cutover = 4.0 / (n * self.dt)
low_mask = (f2 > 0) & (f2 < f_cutover) & np.isfinite(spec2) & (spec2 > 0)
freqs = np.concatenate([f2[low_mask], freqs])
spectral = np.concatenate([spec2[low_mask], spectral])
# Drop DC and any non-finite bins
mask = (freqs > 0) & np.isfinite(spectral) & (spectral > 0)
self._psd_freqs = freqs[mask]
self._psd = spectral[mask]
@staticmethod
def _log_cascade_psd(log_freqs, log_A, *log_taus):
"""
log PSD of a cascade of k linear reservoirs driven by white noise:
log S(f) = log_A - Σ_i log(1 + (2π f τ_i)²)
Parameters passed as (log_A, log_τ_1, ..., log_τ_k) so that
scipy.optimize.curve_fit can treat them as a flat vector.
"""
freqs = np.exp(log_freqs)
log_S = np.full_like(freqs, log_A)
for log_tau in log_taus:
tau = np.exp(log_tau)
log_S -= np.log(1.0 + (2.0 * np.pi * freqs * tau) ** 2)
return log_S
def _fit_k_reservoirs(self, k):
"""
Fit k Lorentzians to log PSD.
Returns
-------
taus : ndarray or None
Fitted timescales [days], sorted longest first.
aic : float
AIC of the fit (lower is better).
"""
log_f = np.log(self._psd_freqs)
log_S = np.log(self._psd)
# Spread initial τ guesses log-uniformly across the observable range
T_record = len(self.Q) * self.dt
tau_init = np.logspace(0, np.log10(T_record / 4.0), k)
p0 = [float(np.mean(log_S))] + list(np.log(tau_init))
tau_lo = np.log(0.5)
tau_hi = np.log(5.0 * T_record)
bounds = ([-np.inf] + [tau_lo] * k,
[ np.inf] + [tau_hi] * k)
try:
popt, _ = curve_fit(self._log_cascade_psd, log_f, log_S,
p0=p0, bounds=bounds, maxfev=20000)
except (RuntimeError, ValueError):
return None, np.inf
taus = np.sort(np.exp(popt[1:]))[::-1] # longest first
resid = log_S - self._log_cascade_psd(log_f, *popt)
n = len(log_S)
rss = np.dot(resid, resid)
aic = n * np.log(rss / n) + 2.0 * (k + 1) # k taus + amplitude
return taus, aic
def _fit_spectral_model(self):
"""
Determine fast-reservoir timescales (all reservoirs except the deepest).
If tau_fast was supplied at construction, use it directly and skip
spectral fitting. Spectral fitting works well when the system is a
nearly pure cascade (small exfiltration fractions so most water
reaches the deep reservoir) and when the corner periods 2πτ_i fall
within the observable frequency band. For real data with seasonal
forcing and short records relative to τ_soil, externally supplied
values from a prior calibration run are more reliable.
"""
if self.tau_fast is not None:
self._tau_spectral = self.tau_fast # already sorted
self.n_reservoirs_fitted = len(self.tau_fast) + 1 # +1 for deep
self.aic_scores = None
return
if self.n_reservoirs is not None:
n_fast_max = self.n_reservoirs - 1
else:
n_fast_max = self.max_reservoirs - 1
aic_scores = []
tau_by_k = []
for k in range(1, n_fast_max + 1):
taus, aic = self._fit_k_reservoirs(k)
aic_scores.append(aic)
tau_by_k.append(taus)
self.aic_scores = aic_scores
if self.n_reservoirs is not None:
best_k = self.n_reservoirs - 1
else:
best_k = int(np.argmin(aic_scores)) + 1
taus_best = tau_by_k[best_k - 1]
if taus_best is None:
# curve_fit failed for the selected k; fall back to 1 fast reservoir
for k_try in range(1, n_fast_max + 1):
if tau_by_k[k_try - 1] is not None:
taus_best = tau_by_k[k_try - 1]
best_k = k_try
break
else:
taus_best = np.array([1.0])
best_k = 1
self._tau_spectral = np.sort(taus_best) # fastest first
self.n_reservoirs_fitted = best_k + 1 # +1 for karst
# ------------------------------------------------------------------
# Stage 2: component separation
# ------------------------------------------------------------------
def _apply_lowpass(self, signal, tau):
"""
Forward recursive low-pass filter with timescale τ [days].
Returns the slowly varying component of signal.
"""
alpha = np.exp(-self.dt / tau)
result = np.empty_like(signal)
result[0] = signal[0]
for t in range(1, len(signal)):
result[t] = alpha * result[t - 1] + (1.0 - alpha) * signal[t]
return result
def _separate_components(self):
"""
Peel fast components from Q one by one (fastest first).
Each pass: apply low-pass filter at τ_i, subtract → fast component.
Residual after all passes is the slow (karst) signal.
"""
Q = self.Q.copy()
Q[~np.isfinite(Q)] = np.nanmean(Q)
components = []
for tau_i in self._tau_spectral: # shortest τ first
Q_slow = self._apply_lowpass(Q, tau_i)
components.append(Q - Q_slow) # fast component at this τ
Q = Q_slow
self._Q_components_fast = components # list: fastest component first
self._Q_residual = np.maximum(Q, 0.0) # slow residual for recession fit
# ------------------------------------------------------------------
# Stage 3: recession fitting for τ_karst
# ------------------------------------------------------------------
def _recession_mask(self):
"""
Boolean mask of timesteps within qualifying recession segments:
Q_residual monotonically declining, precipitation absent, minimum
duration satisfied.
"""
n = len(self._Q_residual)
# Precipitation gate
if self.precip is not None:
dry = self.precip < self.recession_precip_threshold
for lag in range(1, self.recession_antecedent_days + 1):
dry[lag:] &= dry[:-lag]
else:
dry = np.ones(n, dtype=bool)
# Discharge declining
declining = np.zeros(n, dtype=bool)
declining[:-1] = self._Q_residual[1:] < self._Q_residual[:-1]
candidate = dry & declining
# Enforce minimum run length
mask = np.zeros(n, dtype=bool)
i = 0
while i < n:
if candidate[i]:
j = i + 1
while j < n and candidate[j]:
j += 1
if j - i >= self.recession_min_duration:
mask[i:j] = True
i = j
else:
i += 1
return mask
def _fit_recession(self):
"""
Estimate τ_karst using per-segment minimum-flow analysis, cross-checked
against a long-term absolute-time regression on all recession timesteps.
Primary method — per-segment minima:
For each qualifying recession segment, record the minimum
Q_residual value and its absolute time position. The minimum
represents the most-depleted state within that segment, after fast
reservoirs have substantially drained. OLS on log(Q_min) vs
absolute time gives a slope -1/τ_karst.
Cross-check — long-term slope:
Fit log(Q_residual) vs absolute time across all recession timesteps.
This averages over more data points and is more robust to soil
contamination of the per-segment estimate (unremoved soil components
in Q_residual shorten the per-segment estimate because per-segment
minima track the seasonal soil cycle rather than the interannual
karst trend).
Reconciliation:
When both estimates are available and they disagree by more than
factor 2 (|log ratio| > log 2 ≈ 0.69), the LONGER τ is adopted as
the final tau_karst. Physically, soil contamination always biases
the per-segment estimate SHORT, so the longer of the two is a
better lower-bound on τ_karst.
Both raw estimates are stored (tau_karst and tau_karst_longterm).
If tau_deep was supplied at construction, skip fitting and use it.
"""
if self.tau_deep is not None:
self.tau_karst = self.tau_deep
self.tau_karst_longterm = self.tau_deep
return
mask = self._recession_mask()
fallback_tau = 10.0 * len(self.Q) * self.dt
# ---- long-term slope (always computed for comparison) ----
if np.any(mask):
idx = np.where(mask)[0]
Q_dry = self._Q_residual[idx]
pos = Q_dry > 0
if pos.sum() >= 5:
slope_lt, _, _, _, _ = linregress(idx[pos] * self.dt,
np.log(Q_dry[pos]))
self.tau_karst_longterm = (fallback_tau if slope_lt >= -1e-12
else -1.0 / slope_lt)
else:
self.tau_karst_longterm = fallback_tau
else:
self.tau_karst_longterm = fallback_tau
if not np.any(mask):
self.tau_karst = fallback_tau
return
# ---- per-segment minima ----
idx = np.where(mask)[0]
breaks = np.where(np.diff(idx) > 1)[0] + 1
segs = np.split(idx, breaks)
t_min_list = []
Q_min_list = []
for seg in segs:
Q_seg = self._Q_residual[seg]
valid = Q_seg > 0
if valid.sum() < self.recession_min_duration:
continue
min_pos = np.argmin(Q_seg[valid])
abs_idx = seg[valid][min_pos]
t_min_list.append(abs_idx * self.dt)
Q_min_list.append(Q_seg[valid][min_pos])
if len(t_min_list) < 3:
self.tau_karst = self.tau_karst_longterm
return
t_min = np.array(t_min_list)
logQ_m = np.log(np.array(Q_min_list))
slope, _, _, _, _ = linregress(t_min, logQ_m)
tau_segmin = fallback_tau if slope >= -1e-12 else -1.0 / slope
# Reconcile: prefer the LONGER estimate when there is strong disagreement.
# Per-segment minimum is biased short when Q_residual still contains a
# soil component whose seasonal drainage dominates the segment minima.
tau_lt = self.tau_karst_longterm
if (tau_lt < fallback_tau and
abs(np.log(tau_segmin / tau_lt)) > np.log(2.0)):
self.tau_karst = max(tau_segmin, tau_lt)
else:
self.tau_karst = tau_segmin
# ------------------------------------------------------------------
# Stage 4: initial conditions
# ------------------------------------------------------------------
def _compute_initial_conditions(self):
"""
H₀_i = Q_i(t = 0) × τ_i for each reservoir.
tau and H0 are stored slowest-first (karst, …, shallow).
When fewer fast reservoirs were estimated than n_reservoirs specifies
(e.g., τ_soil not identifiable from recession analysis), the missing
intermediate reservoirs are padded with H₀ = 0 and τ = NaN so that
get_initial_conditions() returns the expected number of entries.
"""
H0_karst = float(self._Q_residual[0]) * self.tau_karst
H0_fast = [max(float(Q_c[0]), 0.0) * tau_i
for Q_c, tau_i in zip(self._Q_components_fast,
self._tau_spectral)]
# Pad with NaN/0 for any fast reservoirs that could not be estimated
n_fast_needed = (self.n_reservoirs - 1 if self.n_reservoirs is not None
else len(self._tau_spectral))
n_missing = n_fast_needed - len(H0_fast)
if n_missing > 0:
# Insert unestimated reservoirs between the fastest and the karst.
# tau = NaN signals "unknown"; H0 = 0 is a conservative start.
H0_fast = [0.0] * n_missing + H0_fast
tau_pad = [float('nan')] * n_missing + list(self._tau_spectral[::-1])
else:
tau_pad = list(self._tau_spectral[::-1])
# Slowest first
self.tau = np.array([self.tau_karst] + tau_pad)
self.H0 = np.array([H0_karst] + H0_fast)
# ------------------------------------------------------------------
# Utility
# ------------------------------------------------------------------
def _check_fitted(self):
if self.tau is None:
raise RuntimeError('Call fit() before accessing results.')