Source code for hydroravens.hydrograph_separation

"""
hydroravens.hydrograph_separation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Estimate reservoir timescales and initial storage depths from an observed
discharge time series, without running the hydrological model.

The pipeline has two stages:

1. **Spectral stage** — fit a cascade of linear-reservoir transfer functions
   (product of Lorentzians) to the power spectral density of Q.  AIC selects
   the number of fast reservoirs.  Timescales and initial storage for the fast
   components come from this stage.

2. **Recession stage** — remove the fast components with a recursive digital
   filter, leaving a slowly varying residual.  Fit log(Q_residual) vs. time
   on dry-season recession segments to extract τ_karst and its initial storage.

The primary output is physically informed initial conditions and calibration
parameter priors for hydroRaVENS, reducing spin-up requirements and improving
optimizer starting points.

References
----------
Maillet (1905) : multi-exponential recession decomposition of spring flows.
Young (1984)   : recursive estimation and modelling of nonstationary time series.
Jakeman & Hornberger (1993) : IHACRES transfer-function identification.
Kirchner (2009): catchments as simple dynamical systems — spectral approach.
Eckhardt (2005): recursive digital baseflow separation filter.
"""

import numpy as np
from scipy.optimize import curve_fit
from scipy.signal import csd, welch
from scipy.stats import linregress


[docs] class HydrographSeparation: """ Separate observed discharge into reservoir components and estimate timescales and initial storage depths. Parameters ---------- Q : array-like Observed specific discharge [mm/day]. dt : float Timestep [days]. Default 1.0. n_reservoirs : int or None Number of reservoirs. If None, selected by AIC on the spectral fit. The slowest reservoir (karst/deep) is always fitted by recession analysis regardless of this setting. max_reservoirs : int Maximum number of reservoirs to test when n_reservoirs is None. Default 4. precip : array-like or None Daily precipitation [mm/day], same length as Q. Used to mask recession periods; if None, only discharge decline is used. recession_min_duration : int Minimum qualifying recession length [days]. Default 10. recession_precip_threshold : float Maximum daily precipitation within a recession segment [mm/day]. Default 0.5. recession_antecedent_days : int Number of days immediately before a recession that must also be dry. Default 3. tau_deep : float or None Externally supplied timescale [days] for the deep (karst) reservoir. When provided, recession fitting is skipped for τ_karst and this value is used directly. Recommended when τ_karst >> T_record, since a decay too slow to observe cannot be reliably estimated from the discharge record alone. tau_fast : array-like or None Externally supplied timescales [days] for the fast reservoirs (all except the deepest), e.g. from a prior calibration run. When provided, spectral fitting is skipped. Note: spectral corner periods are 2πτ, not τ — a 29-day reservoir has its -3 dB corner at 182 days, and a 466-day soil reservoir at ~8 years, which may lie near or beyond the record length and be unreliable from spectral fitting alone. Attributes (populated after fit()) ------------------------------------ tau : ndarray Reservoir timescales [days], ordered slowest to fastest. H0 : ndarray Initial storage depths [mm], same order as tau. n_reservoirs_fitted : int Number of reservoirs selected (spectral fast + 1 karst). aic_scores : list of float AIC values for k = 1 … max_reservoirs-1 spectral fits. tau_karst : float Deep-reservoir timescale [days] from recession fitting. """ def __init__(self, Q, dt=1.0, n_reservoirs=None, max_reservoirs=4, precip=None, recession_min_duration=10, recession_precip_threshold=0.5, recession_antecedent_days=3, tau_deep=None, tau_fast=None): self.Q = np.asarray(Q, dtype=float) self.dt = float(dt) self.n_reservoirs = n_reservoirs self.max_reservoirs = max_reservoirs self.precip = (np.asarray(precip, dtype=float) if precip is not None else None) self.recession_min_duration = recession_min_duration self.recession_precip_threshold = recession_precip_threshold self.recession_antecedent_days = recession_antecedent_days self.tau_deep = float(tau_deep) if tau_deep is not None else None # Externally supplied fast timescales [days], shortest first. # When provided, skip spectral fitting and use these directly. self.tau_fast = (np.sort(np.asarray(tau_fast, dtype=float)) if tau_fast is not None else None) # Results self.tau = None self.H0 = None self.n_reservoirs_fitted = None self.aic_scores = None self.tau_karst = None self.tau_karst_longterm = None # long-term absolute-time estimate (comparison) self._tau_spectral = None # fast timescales only (fastest first) self._Q_residual = None # Q after removing fast components self._Q_components_fast = None # list of fast-component time series self._psd_freqs = None self._psd = None self._tau_shallow_td = None # time-domain τ_shallow estimate self._tau_soil_td = None # time-domain τ_soil estimate # ------------------------------------------------------------------ # Public interface # ------------------------------------------------------------------
[docs] def fit(self): """Run the full separation pipeline. Returns self.""" self._compute_psd() self._fit_spectral_model() self._fit_timescales_td() self._separate_components() self._fit_recession() self._compute_initial_conditions() return self
[docs] def get_initial_conditions(self): """ Initial reservoir storage depths, ordered fastest to slowest (shallow first, karst last), matching the hydroRaVENS reservoir index convention. Returns ------- dict ``{'H0': [H0_shallow, H0_soil, ..., H0_karst]}`` in mm. """ self._check_fitted() return {'H0': list(self.H0[::-1])}
[docs] def get_parameter_priors(self): """ Suggested initial values and bounds (in params.yml units) for the hydroRaVENS log__t_efold_* calibration parameters. Bounds are ±0.5 in log10 around the estimated timescale — wide enough to allow the optimizer freedom, tight enough to stay physical. Returns ------- dict Keyed by hydroRaVENS parameter name. Each value is a dict with ``'initial'``, ``'lower'``, ``'upper'`` in log10(days). """ self._check_fitted() all_names = ['log__t_efold_shallow', 'log__t_efold_soil', 'log__t_efold_karst'] tau_fast_to_slow = self.tau[::-1] # shallow first, karst last n = len(tau_fast_to_slow) # Always end with karst; fill fast slots from the front. # For 2 reservoirs: ['shallow', 'karst'] — skip soil. names = all_names[:n - 1] + [all_names[-1]] priors = {} for name, tau_i in zip(names, tau_fast_to_slow): if not np.isfinite(tau_i): # For τ_soil, fall back to the time-domain rolling-minimum # estimate (not used for LP separation but good enough as a # prior hint — still biased low, so bounds are left wide). if name == 'log__t_efold_soil' and self._tau_soil_td is not None: tau_i = self._tau_soil_td else: priors[name] = None # could not be estimated; keep params.yml defaults continue log_tau = np.log10(tau_i) priors[name] = { 'initial': round(float(log_tau), 3), 'lower': round(float(log_tau) - 0.5, 3), 'upper': round(float(log_tau) + 0.5, 3), } return priors
[docs] def summary(self): """Print a brief summary of fitted timescales and initial conditions.""" self._check_fitted() tau_f2s = self.tau[::-1] H0_f2s = self.H0[::-1] labels = ['shallow', 'soil', 'karst'] + [f'res{i}' for i in range(3, len(tau_f2s))] print('HydrographSeparation results') print(f' n_reservoirs : {self.n_reservoirs_fitted}') print(f' {"Reservoir":<10} {"τ (days)":>12} {"H₀ (mm)":>12}') print(f' {"-"*10} {"-"*12} {"-"*12}') for label, tau_i, H0_i in zip(labels, tau_f2s, H0_f2s): print(f' {label:<10} {tau_i:>12.1f} {H0_i:>12.1f}') if self.aic_scores is not None: print(f'\n AIC by fast-reservoir order: ' f'{[round(a, 1) for a in self.aic_scores]}') print(f'\n Time-domain estimates:') td_sh = (f'{self._tau_shallow_td:.1f} d' if self._tau_shallow_td is not None else 'not estimated') print(f' τ_shallow : {td_sh}') if self.n_reservoirs_fitted >= 3 or self._tau_soil_td is not None: td_so = (f'{self._tau_soil_td:.1f} d' if self._tau_soil_td is not None else 'not estimated (insufficient long dry-period recessions)') print(f' τ_soil : {td_so}') fallback_tau = 10.0 * len(self.Q) * self.dt is_fallback = abs(self.tau_karst / fallback_tau - 1.0) < 0.01 fb_note = ' ← fallback: no declining trend detected' if is_fallback else '' print(f'\n τ_karst (segment minima): {self.tau_karst:.1f} days{fb_note}') if self.tau_karst_longterm is not None and not is_fallback: agree = abs(np.log(self.tau_karst / self.tau_karst_longterm)) < 0.5 print(f' τ_karst (long-term slope): {self.tau_karst_longterm:.1f} days' f' {"← agree" if agree else "← disagree — check interannual trend"}')
# ------------------------------------------------------------------ # Stage 1a: time-domain τ estimation (primary for fast reservoirs) # ------------------------------------------------------------------ def _fit_recession_scale(self, Q_signal, min_dur, smooth_days, antecedent_days, precip_threshold=None, min_tau=1.0, max_tau=None, mode='within'): """ Estimate a recession timescale from recession-segment analysis. Both current callers (τ_shallow and τ_soil in :meth:`_fit_timescales_td`) use ``mode='within'``. τ_karst has its own dedicated pipeline in :meth:`_fit_recession`, which uses the same per-segment-minimum approach as ``mode='cross'`` but adds a long-term cross-check and reconciliation step not available here. Two modes: ``'within'`` (default) — for fast/intermediate timescales: Fit log(Q) vs relative time within each qualifying segment; take the median slope. Captures the LOCAL recession rate, which is dominated by the fastest reservoir still active over that duration. Use short segments (5-20 d) for τ_shallow; longer segments (≥15 d) on the rolling-minimum envelope for τ_soil. ``'cross'`` — for inter-annual timescales: Record the minimum Q within each segment and fit log(Q_min) vs absolute time across all segments. Captures the INTER-ANNUAL drainage trend that persists after fast reservoirs empty. Not called internally; available for external or diagnostic use. Parameters ---------- Q_signal : ndarray Discharge signal to analyse [mm/day]. min_dur : int Minimum qualifying segment length [days]. smooth_days : float Moving-average window [days] for the declining test only. antecedent_days : int Days before segment start that must also satisfy the precipitation gate. precip_threshold : float or None Precipitation gate [mm/day]. Defaults to self.recession_precip_threshold. min_tau : float Reject fits with τ < min_tau [days]. max_tau : float or None Reject fits with τ > max_tau [days]. mode : {'within', 'cross'} Regression strategy (see above). Returns ------- tau : float or None Estimated timescale [days], or None if fewer than three segments qualify or the slope is non-negative. n_segs : int Number of qualifying segments used. """ n = len(Q_signal) threshold = (precip_threshold if precip_threshold is not None else self.recession_precip_threshold) # Smooth signal for declining test only w = max(1, int(round(smooth_days / self.dt))) if w > 1: kernel = np.ones(w) / w Q_smooth = np.convolve(Q_signal, kernel, mode='same') else: Q_smooth = Q_signal.copy() Q_smooth = np.maximum(Q_smooth, 0.0) # Precipitation gate if self.precip is not None: dry = self.precip < threshold for lag in range(1, antecedent_days + 1): dry[lag:] &= dry[:-lag] else: dry = np.ones(n, dtype=bool) # Declining test on smoothed signal declining = np.zeros(n, dtype=bool) declining[:-1] = Q_smooth[1:] < Q_smooth[:-1] candidate = dry & declining # Collect qualifying segments mask = np.zeros(n, dtype=bool) i = 0 while i < n: if candidate[i]: j = i + 1 while j < n and candidate[j]: j += 1 if j - i >= min_dur: mask[i:j] = True i = j else: i += 1 if not np.any(mask): return None, 0 idx = np.where(mask)[0] breaks = np.where(np.diff(idx) > 1)[0] + 1 segs = np.split(idx, breaks) min_valid = max(3, min_dur // 3) if mode == 'within': slopes = [] for seg in segs: Q_seg = Q_signal[seg] valid = Q_seg > 0 if valid.sum() < min_valid: continue t_rel = np.arange(valid.sum()) * self.dt s, _, _, _, _ = linregress(t_rel, np.log(Q_seg[valid])) if s < -1e-12: slopes.append(s) n_segs = len(slopes) if n_segs < 3: return None, n_segs slope = float(np.median(slopes)) else: # 'cross': per-segment minimum vs absolute time t_min_list = [] Q_min_list = [] for seg in segs: Q_seg = Q_signal[seg] valid = Q_seg > 0 if valid.sum() < min_valid: continue min_pos = np.argmin(Q_seg[valid]) abs_idx = seg[valid][min_pos] t_min_list.append(abs_idx * self.dt) Q_min_list.append(float(Q_seg[valid][min_pos])) n_segs = len(t_min_list) if n_segs < 3: return None, n_segs slope, _, _, _, _ = linregress(np.array(t_min_list), np.log(np.array(Q_min_list))) if slope >= -1e-12: return None, n_segs tau = -1.0 / slope if tau < min_tau: return None, n_segs if max_tau is not None and tau > max_tau: return None, n_segs return tau, n_segs def _fit_timescales_td(self): """ Estimate fast-reservoir timescales via time-domain recession analysis and update self._tau_spectral with the results. Two stages of sequential peeling: 1. **τ_shallow** — within-segment log(Q) slope on raw Q with short (≥5 day) segments and a 1-day smooth. Accepts τ in [2, 200] days. 2. **τ_soil** — per-segment minimum on Q after removing the shallow component (low-pass at τ_shallow). Uses a moving-average window equal to τ_shallow to suppress residual fast noise, a relaxed precipitation threshold (2 mm/day — small rain barely recharges the soil), and an antecedent window ≈ τ_shallow to wait for the shallow reservoir to drain. Only attempted when n_reservoirs ≥ 3. Time-domain estimates replace the corresponding spectral values in self._tau_spectral when available; spectral values serve as fallbacks only if they are well-separated from the shallow scale (> 5×τ_shallow). Diagnostic attributes _tau_shallow_td and _tau_soil_td are set regardless of whether they replace spectral values. """ if self.tau_fast is not None: self._tau_shallow_td = None self._tau_soil_td = None return n_fast_target = (self.n_reservoirs - 1 if self.n_reservoirs is not None else self.max_reservoirs - 1) # ---- τ_shallow ------------------------------------------------------- # Within-segment median slope on short (5-15 d) recession segments. # Short segments capture the fast shallow reservoir; deeper reservoirs # act as a near-constant baseline and barely affect the slope. tau_sh, _ = self._fit_recession_scale( self.Q, min_dur=5, smooth_days=1.0, antecedent_days=self.recession_antecedent_days, min_tau=2.0, max_tau=200.0, mode='within', ) self._tau_shallow_td = tau_sh # ---- τ_soil (only when ≥ 3 reservoirs expected) --------------------- # Problem with LP filtering for τ_soil estimation: applying a low-pass # filter at τ_shallow creates a signal (Q_slow) whose FILTER IMPULSE # RESPONSE decays with timescale τ_shallow. Recession segments in Q_slow # that start within ~3×τ_shallow days of a rain event are still in the # filter-decay regime, so within-segment slopes give τ_eff ≈ τ_shallow, # not τ_soil. With Minnesota dry periods of only 20-35 days, most # segments start before the filter has fully settled. # # Fix: use a ROLLING MINIMUM over a 2×τ_shallow window. The rolling # minimum extracts the lower envelope of Q (slow baseflow), cleanly # suppressing storm pulses without creating a lingering exponential tail. # The resulting Q_rmin ≈ Q_soil + Q_karst essentially everywhere it is # declining, with no multi-week filter-decay artefact. # Within-segment slopes on Q_rmin give # τ_eff = (Q_soil/Q_rmin)/τ_soil + (Q_karst/Q_rmin)/τ_karst # Theoretically τ_eff > τ_soil (karst floor slows the apparent decay), # but in practice residual shallow contamination in early recession # segments biases τ_eff short. The min_tau = 8×τ_shallow guard rejects # the worst-contaminated segments; the result is a rough lower bound on # τ_soil, useful as a prior but not a precise estimate. self._tau_soil_td = None if n_fast_target >= 2 and tau_sh is not None: n_sig = len(self.Q) w_rmin = max(1, int(round(2.0 * tau_sh / self.dt))) Q_rmin = np.array([ float(np.min(self.Q[max(0, t - w_rmin + 1):t + 1])) for t in range(n_sig) ]) tau_so, _ = self._fit_recession_scale( Q_rmin, min_dur=15, smooth_days=tau_sh, antecedent_days=5, precip_threshold=2.0, min_tau=8.0 * tau_sh, mode='within', ) self._tau_soil_td = tau_so # ---- update _tau_spectral -------------------------------------------- # NOTE: _tau_soil_td is intentionally NOT added to _tau_spectral here. # The rolling-minimum estimate is still biased low (transition regime), # so including it in the LP separation would corrupt Q_residual and # degrade τ_karst. It is used only in get_parameter_priors(). n_sp = len(self._tau_spectral) td_taus = [] # Shallow position (fastest) if tau_sh is not None: td_taus.append(tau_sh) elif n_sp >= 1: td_taus.append(float(self._tau_spectral[0])) # Soil position (second fastest), only when ≥ 3 reservoirs requested if n_fast_target >= 2: if n_sp >= 2: # Accept spectral τ_soil only when it is well-separated from # τ_shallow — if spectral fitting collapsed both to the same # corner (common when seasonal peaks dominate the PSD), discard ref = td_taus[0] if td_taus else None sp_soil = float(self._tau_spectral[1]) if ref is not None and sp_soil > 5.0 * ref: td_taus.append(sp_soil) # else: drop to 2-reservoir initialisation if td_taus: self._tau_spectral = np.sort(np.array(td_taus)) # shortest first self.n_reservoirs_fitted = len(td_taus) + 1 # +1 for karst # ------------------------------------------------------------------ # Stage 1b: spectral fitting (fallback / diagnostic) # ------------------------------------------------------------------ def _compute_psd(self): """ Estimate the spectral target for Lorentzian fitting. When precipitation is provided, compute the transfer function magnitude |H(f)|² = |S_QP(f)|² / S_PP(f) (cross-spectrum / input PSD). This cancels the colour of the precipitation spectrum and gives a clean estimate of the linear-reservoir cascade response regardless of whether the input is white noise, intermittent, or seasonal. Without precipitation, fall back to the output PSD S_QQ(f), which conflates input colour with reservoir structure but is still useful when timescale separation is large. """ Q = self.Q.copy() Q[~np.isfinite(Q)] = np.nanmean(Q) nperseg = min(len(Q) // 4, 1024) kw = dict(fs=1.0 / self.dt, nperseg=nperseg, window='hann', detrend='constant') if self.precip is not None: P = self.precip.copy() P[~np.isfinite(P)] = 0.0 freqs, S_PP = welch(P, **kw) freqs, S_QP = csd(Q, P, **kw) # Transfer function magnitude squared; guard against near-zero S_PP safe_spp = np.where(S_PP > 0, S_PP, np.nan) H2 = np.abs(S_QP) ** 2 / safe_spp spectral = H2 else: freqs, spectral = welch(Q, **kw) # The above uses nperseg = T_record/4, which may not resolve the # slow-reservoir corner. Supplement with a full-record periodogram # (nperseg = N) for the low-frequency bins that are missing. The # periodogram is noisier but gives the only spectral estimates below # 4/T_record, where the soil or karst corners may sit. n = len(Q) if nperseg < n: kw_full = dict(fs=1.0 / self.dt, nperseg=n, window='hann', detrend='constant') if self.precip is not None: f2, S_PP2 = welch(P, **kw_full) f2, S_QP2 = csd(Q, P, **kw_full) safe2 = np.where(S_PP2 > 0, S_PP2, np.nan) spec2 = np.abs(S_QP2) ** 2 / safe2 else: f2, spec2 = welch(Q, **kw_full) # Use full-record estimate for frequencies below the Welch resolution f_cutover = 4.0 / (n * self.dt) low_mask = (f2 > 0) & (f2 < f_cutover) & np.isfinite(spec2) & (spec2 > 0) freqs = np.concatenate([f2[low_mask], freqs]) spectral = np.concatenate([spec2[low_mask], spectral]) # Drop DC and any non-finite bins mask = (freqs > 0) & np.isfinite(spectral) & (spectral > 0) self._psd_freqs = freqs[mask] self._psd = spectral[mask] @staticmethod def _log_cascade_psd(log_freqs, log_A, *log_taus): """ log PSD of a cascade of k linear reservoirs driven by white noise: log S(f) = log_A - Σ_i log(1 + (2π f τ_i)²) Parameters passed as (log_A, log_τ_1, ..., log_τ_k) so that scipy.optimize.curve_fit can treat them as a flat vector. """ freqs = np.exp(log_freqs) log_S = np.full_like(freqs, log_A) for log_tau in log_taus: tau = np.exp(log_tau) log_S -= np.log(1.0 + (2.0 * np.pi * freqs * tau) ** 2) return log_S def _fit_k_reservoirs(self, k): """ Fit k Lorentzians to log PSD. Returns ------- taus : ndarray or None Fitted timescales [days], sorted longest first. aic : float AIC of the fit (lower is better). """ log_f = np.log(self._psd_freqs) log_S = np.log(self._psd) # Spread initial τ guesses log-uniformly across the observable range T_record = len(self.Q) * self.dt tau_init = np.logspace(0, np.log10(T_record / 4.0), k) p0 = [float(np.mean(log_S))] + list(np.log(tau_init)) tau_lo = np.log(0.5) tau_hi = np.log(5.0 * T_record) bounds = ([-np.inf] + [tau_lo] * k, [ np.inf] + [tau_hi] * k) try: popt, _ = curve_fit(self._log_cascade_psd, log_f, log_S, p0=p0, bounds=bounds, maxfev=20000) except (RuntimeError, ValueError): return None, np.inf taus = np.sort(np.exp(popt[1:]))[::-1] # longest first resid = log_S - self._log_cascade_psd(log_f, *popt) n = len(log_S) rss = np.dot(resid, resid) aic = n * np.log(rss / n) + 2.0 * (k + 1) # k taus + amplitude return taus, aic def _fit_spectral_model(self): """ Determine fast-reservoir timescales (all reservoirs except the deepest). If tau_fast was supplied at construction, use it directly and skip spectral fitting. Spectral fitting works well when the system is a nearly pure cascade (small exfiltration fractions so most water reaches the deep reservoir) and when the corner periods 2πτ_i fall within the observable frequency band. For real data with seasonal forcing and short records relative to τ_soil, externally supplied values from a prior calibration run are more reliable. """ if self.tau_fast is not None: self._tau_spectral = self.tau_fast # already sorted self.n_reservoirs_fitted = len(self.tau_fast) + 1 # +1 for deep self.aic_scores = None return if self.n_reservoirs is not None: n_fast_max = self.n_reservoirs - 1 else: n_fast_max = self.max_reservoirs - 1 aic_scores = [] tau_by_k = [] for k in range(1, n_fast_max + 1): taus, aic = self._fit_k_reservoirs(k) aic_scores.append(aic) tau_by_k.append(taus) self.aic_scores = aic_scores if self.n_reservoirs is not None: best_k = self.n_reservoirs - 1 else: best_k = int(np.argmin(aic_scores)) + 1 taus_best = tau_by_k[best_k - 1] if taus_best is None: # curve_fit failed for the selected k; fall back to 1 fast reservoir for k_try in range(1, n_fast_max + 1): if tau_by_k[k_try - 1] is not None: taus_best = tau_by_k[k_try - 1] best_k = k_try break else: taus_best = np.array([1.0]) best_k = 1 self._tau_spectral = np.sort(taus_best) # fastest first self.n_reservoirs_fitted = best_k + 1 # +1 for karst # ------------------------------------------------------------------ # Stage 2: component separation # ------------------------------------------------------------------ def _apply_lowpass(self, signal, tau): """ Forward recursive low-pass filter with timescale τ [days]. Returns the slowly varying component of signal. """ alpha = np.exp(-self.dt / tau) result = np.empty_like(signal) result[0] = signal[0] for t in range(1, len(signal)): result[t] = alpha * result[t - 1] + (1.0 - alpha) * signal[t] return result def _separate_components(self): """ Peel fast components from Q one by one (fastest first). Each pass: apply low-pass filter at τ_i, subtract → fast component. Residual after all passes is the slow (karst) signal. """ Q = self.Q.copy() Q[~np.isfinite(Q)] = np.nanmean(Q) components = [] for tau_i in self._tau_spectral: # shortest τ first Q_slow = self._apply_lowpass(Q, tau_i) components.append(Q - Q_slow) # fast component at this τ Q = Q_slow self._Q_components_fast = components # list: fastest component first self._Q_residual = np.maximum(Q, 0.0) # slow residual for recession fit # ------------------------------------------------------------------ # Stage 3: recession fitting for τ_karst # ------------------------------------------------------------------ def _recession_mask(self): """ Boolean mask of timesteps within qualifying recession segments: Q_residual monotonically declining, precipitation absent, minimum duration satisfied. """ n = len(self._Q_residual) # Precipitation gate if self.precip is not None: dry = self.precip < self.recession_precip_threshold for lag in range(1, self.recession_antecedent_days + 1): dry[lag:] &= dry[:-lag] else: dry = np.ones(n, dtype=bool) # Discharge declining declining = np.zeros(n, dtype=bool) declining[:-1] = self._Q_residual[1:] < self._Q_residual[:-1] candidate = dry & declining # Enforce minimum run length mask = np.zeros(n, dtype=bool) i = 0 while i < n: if candidate[i]: j = i + 1 while j < n and candidate[j]: j += 1 if j - i >= self.recession_min_duration: mask[i:j] = True i = j else: i += 1 return mask def _fit_recession(self): """ Estimate τ_karst using per-segment minimum-flow analysis, cross-checked against a long-term absolute-time regression on all recession timesteps. Primary method — per-segment minima: For each qualifying recession segment, record the minimum Q_residual value and its absolute time position. The minimum represents the most-depleted state within that segment, after fast reservoirs have substantially drained. OLS on log(Q_min) vs absolute time gives a slope -1/τ_karst. Cross-check — long-term slope: Fit log(Q_residual) vs absolute time across all recession timesteps. This averages over more data points and is more robust to soil contamination of the per-segment estimate (unremoved soil components in Q_residual shorten the per-segment estimate because per-segment minima track the seasonal soil cycle rather than the interannual karst trend). Reconciliation: When both estimates are available and they disagree by more than factor 2 (|log ratio| > log 2 ≈ 0.69), the LONGER τ is adopted as the final tau_karst. Physically, soil contamination always biases the per-segment estimate SHORT, so the longer of the two is a better lower-bound on τ_karst. Both raw estimates are stored (tau_karst and tau_karst_longterm). If tau_deep was supplied at construction, skip fitting and use it. """ if self.tau_deep is not None: self.tau_karst = self.tau_deep self.tau_karst_longterm = self.tau_deep return mask = self._recession_mask() fallback_tau = 10.0 * len(self.Q) * self.dt # ---- long-term slope (always computed for comparison) ---- if np.any(mask): idx = np.where(mask)[0] Q_dry = self._Q_residual[idx] pos = Q_dry > 0 if pos.sum() >= 5: slope_lt, _, _, _, _ = linregress(idx[pos] * self.dt, np.log(Q_dry[pos])) self.tau_karst_longterm = (fallback_tau if slope_lt >= -1e-12 else -1.0 / slope_lt) else: self.tau_karst_longterm = fallback_tau else: self.tau_karst_longterm = fallback_tau if not np.any(mask): self.tau_karst = fallback_tau return # ---- per-segment minima ---- idx = np.where(mask)[0] breaks = np.where(np.diff(idx) > 1)[0] + 1 segs = np.split(idx, breaks) t_min_list = [] Q_min_list = [] for seg in segs: Q_seg = self._Q_residual[seg] valid = Q_seg > 0 if valid.sum() < self.recession_min_duration: continue min_pos = np.argmin(Q_seg[valid]) abs_idx = seg[valid][min_pos] t_min_list.append(abs_idx * self.dt) Q_min_list.append(Q_seg[valid][min_pos]) if len(t_min_list) < 3: self.tau_karst = self.tau_karst_longterm return t_min = np.array(t_min_list) logQ_m = np.log(np.array(Q_min_list)) slope, _, _, _, _ = linregress(t_min, logQ_m) tau_segmin = fallback_tau if slope >= -1e-12 else -1.0 / slope # Reconcile: prefer the LONGER estimate when there is strong disagreement. # Per-segment minimum is biased short when Q_residual still contains a # soil component whose seasonal drainage dominates the segment minima. tau_lt = self.tau_karst_longterm if (tau_lt < fallback_tau and abs(np.log(tau_segmin / tau_lt)) > np.log(2.0)): self.tau_karst = max(tau_segmin, tau_lt) else: self.tau_karst = tau_segmin # ------------------------------------------------------------------ # Stage 4: initial conditions # ------------------------------------------------------------------ def _compute_initial_conditions(self): """ H₀_i = Q_i(t = 0) × τ_i for each reservoir. tau and H0 are stored slowest-first (karst, …, shallow). When fewer fast reservoirs were estimated than n_reservoirs specifies (e.g., τ_soil not identifiable from recession analysis), the missing intermediate reservoirs are padded with H₀ = 0 and τ = NaN so that get_initial_conditions() returns the expected number of entries. """ H0_karst = float(self._Q_residual[0]) * self.tau_karst H0_fast = [max(float(Q_c[0]), 0.0) * tau_i for Q_c, tau_i in zip(self._Q_components_fast, self._tau_spectral)] # Pad with NaN/0 for any fast reservoirs that could not be estimated n_fast_needed = (self.n_reservoirs - 1 if self.n_reservoirs is not None else len(self._tau_spectral)) n_missing = n_fast_needed - len(H0_fast) if n_missing > 0: # Insert unestimated reservoirs between the fastest and the karst. # tau = NaN signals "unknown"; H0 = 0 is a conservative start. H0_fast = [0.0] * n_missing + H0_fast tau_pad = [float('nan')] * n_missing + list(self._tau_spectral[::-1]) else: tau_pad = list(self._tau_spectral[::-1]) # Slowest first self.tau = np.array([self.tau_karst] + tau_pad) self.H0 = np.array([H0_karst] + H0_fast) # ------------------------------------------------------------------ # Utility # ------------------------------------------------------------------ def _check_fitted(self): if self.tau is None: raise RuntimeError('Call fit() before accessing results.')