Source code for pomata.indicators.volume

"""
Volume indicators.
"""

import polars as pl

from pomata._expr import float64_expr, validate_window, validate_window_order
from pomata.indicators.moving_average import ema
from pomata.indicators.price_transform import price_typical

__all__ = (
    "accumulation_distribution",
    "accumulation_distribution_oscillator",
    "chaikin_money_flow",
    "money_flow_index",
    "obv",
    "vwap",
)


[docs] def accumulation_distribution( high: pl.Expr, low: pl.Expr, close: pl.Expr, volume: pl.Expr, ) -> pl.Expr: r""" Accumulation/Distribution Line (AD), also known as the Accumulation/Distribution Index or the Chaikin A/D Line. A cumulative volume-flow indicator popularized by Marc Chaikin that gauges the cumulative flow of money into and out of an instrument. Each bar contributes a fraction of its ``volume`` weighted by where the ``close`` sits inside the bar's high-low range. The per-bar weight is the **Money Flow Multiplier** (``MFM``), bounded in :math:`[-1, +1]` (``+1`` at the high, ``-1`` at the low); multiplying it by ``volume`` gives the **Money Flow Volume** (``MFV``); the line is the running cumulative sum of ``MFV``: .. math:: \mathrm{MFM}_t &= \frac{(\mathrm{close}_t - \mathrm{low}_t) - (\mathrm{high}_t - \mathrm{close}_t)} {\mathrm{high}_t - \mathrm{low}_t}, \\ \mathrm{MFV}_t &= \mathrm{MFM}_t \cdot \mathrm{volume}_t, \\ \mathrm{AD}_t &= \sum_{i=0}^{t} \mathrm{MFV}_i = \mathrm{AD}_{t-1} + \mathrm{MFV}_t. The line is unbounded and its absolute level is arbitrary (it depends on where the cumulative sum starts); only its slope and divergences from price are interpreted. There is no moving window: every bar from the start of the series contributes to the running total. Args: high: High-price series (e.g. ``pl.col("high")``). low: Low-price series (e.g. ``pl.col("low")``). close: Close-price series (e.g. ``pl.col("close")``). volume: Traded-volume series (e.g. ``pl.col("volume")``). Returns: The Accumulation/Distribution Line for each row, the same length as the inputs. There is no warm-up: the first row already carries the first bar's Money Flow Volume, and the line is the running cumulative sum from there. Raises: TypeError: If any input is not a ``pl.Expr``. Note: **Precision** -- agrees with its independent reference oracle to ten significant figures (a ``1e-10`` band) on any finite input within a sane dynamic range; ``CORRECTNESS.md`` gives the method and the float-conditioning limit beyond it. **Zero-range bars:** On a doji bar (``high == low``) the Money Flow Multiplier is ``0`` by convention, so the denominator never hits ``0 / 0`` and ``close`` does not enter the bar's contribution. The zero-range convention applies only to a genuine equal-range bar (``high == low``), where the multiplier is ``0`` and ``close`` does not enter the contribution. A ``null`` or ``NaN`` in any input instead leaves the range ``null`` or ``NaN`` (never ``== 0``), so missing data propagates rather than being silently zeroed. **Edge-case behavior:** - **Null** — a row in which ``high``, ``low``, ``close``, or ``volume`` is ``null`` yields ``null`` at that position and leaves the running total untouched (the cumulative sum skips it and continues from the prior total). On a genuine doji bar (``high == low``, both finite) the multiplier is ``0`` and ``close`` is irrelevant, so a ``null`` in ``close`` on such a bar still yields ``0`` rather than ``null``. - **NaN** — a ``NaN`` in any operand that reaches the cumulative sum latches: once present, every later non-null row of the line is ``NaN``. A bar whose ``high`` and ``low`` are both ``NaN`` does **not** take the doji branch (``NaN - NaN`` is ``NaN``, never ``== 0``), so the ``NaN`` poisons the line rather than contributing ``0``. - **Partitioning** — wrap the call in ``.over(...)`` for a multi-series panel so the cumulative sum restarts per series and never spans series boundaries, e.g. ``accumulation_distribution(pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume")).over("ticker")`` See Also: - :func:`accumulation_distribution_oscillator`: The Chaikin oscillator — fast minus slow EMA of this line. - :func:`chaikin_money_flow`: The windowed money-flow ratio over the same multiplier. - :func:`obv`: Another cumulative volume-flow line. References: - Chaikin, Marc. "Accumulation/Distribution Line". - https://en.wikipedia.org/wiki/Accumulation/distribution_index - https://www.investopedia.com/terms/a/accumulationdistribution.asp Examples: >>> import polars as pl >>> from pomata.indicators import accumulation_distribution >>> >>> frame = pl.DataFrame( ... { ... "high": [10.0, 11.0, 12.0, 13.0, 14.0], ... "low": [8.0, 9.0, 10.0, 11.0, 12.0], ... "close": [9.0, 10.5, 10.0, 13.0, 12.5], ... "volume": [100.0, 200.0, 300.0, 400.0, 500.0], ... } ... ) >>> frame.select( ... accumulation_distribution( ... pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume") ... ).round(4).alias("ad") ... )["ad"].to_list() [0.0, 100.0, -200.0, 200.0, -50.0] On a multi-ticker panel, wrap the call in ``.over`` so each ticker warms up independently: >>> frame = pl.DataFrame( ... { ... "ticker": ["A"] * 4 + ["B"] * 4, ... "high": [12.0, 13.0, 12.5, 14.0, 22.0, 24.0, 23.0, 25.0], ... "low": [10.0, 11.0, 11.0, 12.0, 20.0, 21.0, 21.0, 23.0], ... "close": [11.0, 12.5, 11.5, 13.5, 21.5, 21.5, 22.5, 24.0], ... "volume": [100.0, 120.0, 90.0, 110.0, 100.0, 120.0, 90.0, 110.0], ... } ... ) >>> expr = accumulation_distribution( ... pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume") ... ).over("ticker").round(4) >>> frame.with_columns(expr.alias("accumulation_distribution"))["accumulation_distribution"].to_list() [0.0, 60.0, 30.0, 85.0, 50.0, -30.0, 15.0, 15.0] A ``null`` (skipped, the running total carrying across it) and a ``NaN`` (which propagates) make the exact handling visible at a glance: >>> frame = pl.DataFrame( ... { ... "high": [12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0], ... "low": [10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0], ... "close": [11.5, 12.5, 13.0, 14.5, None, 16.0, float("nan"), 18.0], ... "volume": [100.0, 120.0, 90.0, 110.0, 130.0, 100.0, 95.0, 140.0], ... } ... ) >>> expr = accumulation_distribution(pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume")).round(4) >>> frame.select(expr.alias("accumulation_distribution"))["accumulation_distribution"].to_list() [50.0, 110.0, 110.0, 165.0, None, 165.0, nan, nan] """ high = float64_expr(high) low = float64_expr(low) close = float64_expr(close) volume = float64_expr(volume) # Doji guard: high - low == 0 pins MFM to 0 (avoids 0/0). A null/NaN input makes the range null/NaN (not == 0), so # missing data propagates to a null/NaN money-flow volume rather than being silently zeroed. high_low_range = high - low money_flow_multiplier = ( pl.when(high_low_range == 0).then(pl.lit(0.0)).otherwise(((close - low) - (high - close)) / high_low_range) ) money_flow_volume = money_flow_multiplier * volume return money_flow_volume.cum_sum()
[docs] def accumulation_distribution_oscillator( high: pl.Expr, low: pl.Expr, close: pl.Expr, volume: pl.Expr, *, window_fast: int, window_slow: int, ) -> pl.Expr: r""" Accumulation/Distribution Oscillator, also known as the Chaikin Oscillator. Marc Chaikin's volume-momentum oscillator: the gap between a fast and a slow :func:`ema` of the :func:`accumulation_distribution` line, so it measures the momentum of accumulation rather than its level and oscillates around zero. With :math:`\mathrm{AD}` the accumulation/distribution line and :math:`n_f`, :math:`n_s` the fast and slow spans: .. math:: \mathrm{ADOSC}_t = \mathrm{EMA}(\mathrm{AD}, n_f)_t - \mathrm{EMA}(\mathrm{AD}, n_s)_t. Args: high: High-price series (e.g. ``pl.col("high")``). low: Low-price series (e.g. ``pl.col("low")``). close: Close-price series (e.g. ``pl.col("close")``). volume: Traded-volume series (e.g. ``pl.col("volume")``). window_fast: Span of the fast EMA (canonically ``3``). Must be ``>= 1``. window_slow: Span of the slow EMA (canonically ``10``). Must be ``>= 1`` and ``>= window_fast``. Returns: The oscillator for each row, the same length as the inputs. The first ``window_slow - 1`` values are ``null`` (warm-up), inherited from the slow :func:`ema` of the accumulation/distribution line, the later of the two to warm up. Raises: TypeError: If any input is not a ``pl.Expr``. ValueError: If ``window_fast < 1``, ``window_slow < 1``, or ``window_fast > window_slow``. Note: **Precision** -- agrees with its independent reference oracle to ten significant figures (a ``1e-10`` band) on any finite input within a sane dynamic range; ``CORRECTNESS.md`` gives the method and the float-conditioning limit beyond it. **Scaling:** homogeneous of degree ``1`` — the accumulation/distribution multiplier is scale-invariant in price while the line scales with ``volume``, so multiplying all four inputs by ``k`` scales the oscillator by ``k``. **Edge-case behavior:** - **Null** — a ``null`` contaminates the accumulation/distribution line and the EMAs, yielding ``null``. - **NaN** — a ``NaN`` propagates through the line and the EMAs, yielding ``NaN``. - **Partitioning** — wrap the call in ``.over(...)`` for a multi-series panel so the running sum and the EMAs do not span series boundaries. See Also: - :func:`accumulation_distribution`: The line this oscillates. - :func:`ema`: The exponential moving average of the two smoothings. - :func:`macd`: The same fast-minus-slow-EMA oscillator, applied to price. References: - Chaikin, Marc. "Chaikin Oscillator". - https://www.investopedia.com/terms/c/chaikinoscillator.asp Examples: >>> import polars as pl >>> from pomata.indicators import accumulation_distribution_oscillator >>> >>> frame = pl.DataFrame( ... { ... "high": [10.2, 10.5, 10.7, 10.3, 10.8], ... "low": [9.8, 10.0, 10.2, 9.9, 10.3], ... "close": [10.0, 10.3, 10.5, 10.1, 10.6], ... "volume": [100.0, 150.0, 120.0, 200.0, 180.0], ... } ... ) >>> expr = accumulation_distribution_oscillator( ... pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume"), window_fast=2, window_slow=3 ... ).round(4) >>> frame.select(expr.alias("adosc"))["adosc"].to_list() [None, None, 13.0, 8.6667, 11.0556] On a multi-ticker panel, wrap the call in ``.over`` so each ticker warms up independently: >>> frame = pl.DataFrame( ... { ... "ticker": ["A"] * 4 + ["B"] * 4, ... "high": [12.0, 13.0, 12.5, 14.0, 22.0, 24.0, 23.0, 25.0], ... "low": [10.0, 11.0, 11.0, 12.0, 20.0, 21.0, 21.0, 23.0], ... "close": [11.0, 12.5, 11.5, 13.5, 21.5, 21.5, 22.5, 24.0], ... "volume": [100.0, 120.0, 90.0, 110.0, 100.0, 120.0, 90.0, 110.0], ... } ... ) >>> expr = ( ... accumulation_distribution_oscillator( ... pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume"), window_fast=2, window_slow=3 ... ) ... .over("ticker") ... .round(4) ... ) >>> frame.with_columns(expr.alias("adosc"))["adosc"].to_list() [None, None, 0.0, 9.1667, None, None, 1.6667, 1.1111] A ``null`` (which voids the line and its EMAs) and a ``NaN`` (which propagates) make the exact handling visible at a glance: >>> frame = pl.DataFrame( ... { ... "high": [12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0], ... "low": [10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0], ... "close": [11.5, 12.5, 13.0, 14.5, None, 16.0, float("nan"), 18.0], ... "volume": [100.0, 120.0, 90.0, 110.0, 130.0, 100.0, 95.0, 140.0], ... } ... ) >>> expr = accumulation_distribution_oscillator( ... pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume"), window_fast=2, window_slow=3 ... ).round(4) >>> frame.select(expr.alias("adosc"))["adosc"].to_list() [None, None, 10.0, 15.8333, None, 9.4048, nan, nan] """ high = float64_expr(high) low = float64_expr(low) close = float64_expr(close) volume = float64_expr(volume) validate_window(window_fast, name="window_fast") validate_window(window_slow, name="window_slow") validate_window_order(window_fast, window_slow) line = accumulation_distribution(high, low, close, volume) return ema(line, window_fast) - ema(line, window_slow)
[docs] def chaikin_money_flow( high: pl.Expr, low: pl.Expr, close: pl.Expr, volume: pl.Expr, window: int, ) -> pl.Expr: r""" Chaikin Money Flow (CMF). A volume-weighted breadth oscillator (Marc Chaikin) that gauges buying versus selling pressure over a rolling window. Each bar is first scored by where its ``close`` sits inside the bar's range via the Money Flow Multiplier :math:`\mathrm{MFM}`, scaled by ``volume`` into the Money Flow Volume :math:`\mathrm{MFV}`, and the CMF is the ratio of the windowed sums of money-flow volume to raw volume: .. math:: \mathrm{MFM}_t &= \frac{(C_t - L_t) - (H_t - C_t)}{H_t - L_t}, \\ \mathrm{MFV}_t &= \mathrm{MFM}_t \, V_t, \\ \mathrm{CMF}_t &= \frac{\sum_{i=0}^{n-1} \mathrm{MFV}_{t-i}}{\sum_{i=0}^{n-1} V_{t-i}}, \qquad n = \text{window}, where :math:`H`, :math:`L`, :math:`C`, :math:`V` are ``high``, ``low``, ``close``, ``volume``. The multiplier lives in :math:`[-1, +1]`: it is :math:`+1` when the close prints at the high (maximum buying pressure), :math:`-1` at the low (maximum selling pressure), and :math:`0` at the midpoint. A zero-range bar (:math:`H_t = L_t`) has an undefined multiplier, so by convention :math:`\mathrm{MFM}_t = 0` there — that bar contributes nothing to the numerator while its volume still counts in the denominator. Because the CMF is a volume-weighted average of multipliers, it is itself bounded in :math:`[-1, +1]`. Args: high: High-price series (e.g. ``pl.col("high")``). low: Low-price series (e.g. ``pl.col("low")``). close: Close-price series (e.g. ``pl.col("close")``). volume: Traded-volume series (e.g. ``pl.col("volume")``). window: Number of observations in the moving window. Must be ``>= 1``. Returns: The CMF for each row, the same length as the inputs. The first ``window - 1`` values are ``null`` (warm-up): the value is defined only once a full window of bars is available. Raises: TypeError: If any input is not a ``pl.Expr``. ValueError: If ``window < 1``. Note: **Precision** -- agrees with its independent reference oracle to ten significant figures (a ``1e-10`` band) on any finite input within a sane dynamic range; ``CORRECTNESS.md`` gives the method and the float-conditioning limit beyond it. **Zero-range bars:** The zero-range convention applies only to a genuine equal-range bar (``high == low``), where the multiplier is ``0`` (it adds ``0`` to the numerator while its volume still counts in the denominator). A ``null`` or ``NaN`` in any input instead leaves that bar's money-flow volume ``null`` or ``NaN``, so missing data propagates rather than being silently zeroed. **Edge-case behavior:** - **Null** — a window in which any of ``high`` / ``low`` / ``close`` / ``volume`` contains a ``null`` yields ``null``; ``null`` takes precedence over ``NaN``. - **NaN** — a window containing a ``NaN`` (and no ``null``) yields ``NaN``. - **Zero volume** — a window whose volume is all zero is the ``0 / 0`` degenerate; the window is detected exactly (the rolling maximum of the absolute volume is zero) and the result is ``NaN``, not the rounding noise a sub-ULP residual in the rolling-sum denominator would otherwise produce. With non-negative volume this is the only reachable division-by-zero case, since an all-zero volume window also zeroes the numerator. - **Partitioning** — wrap the call in ``.over(...)`` for a multi-series panel so neither rolling sum spans series boundaries, e.g. ``chaikin_money_flow(pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume"), 20).over("ticker")``. See Also: - :func:`accumulation_distribution`: The cumulative (window-less) money-flow line. - :func:`money_flow_index`: A bounded windowed money-flow oscillator. - :func:`accumulation_distribution_oscillator`: Chaikin's momentum oscillator over the same line. References: - Chaikin, Marc. "Chaikin Money Flow". - https://en.wikipedia.org/wiki/Chaikin_Analytics#Chaikin_Money_Flow - https://www.investopedia.com/terms/c/chaikinmoneyflow.asp Examples: >>> import polars as pl >>> from pomata.indicators import chaikin_money_flow >>> >>> frame = pl.DataFrame( ... { ... "high": [10.0, 12.0, 11.0, 13.0, 14.0], ... "low": [8.0, 9.0, 9.0, 10.0, 11.0], ... "close": [9.0, 11.0, 10.0, 12.0, 13.0], ... "volume": [100.0, 200.0, 150.0, 300.0, 250.0], ... } ... ) >>> frame.select( ... chaikin_money_flow( ... pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume"), window=3 ... ).round(4).alias("cmf_3") ... )["cmf_3"].to_list() [None, None, 0.1481, 0.2564, 0.2619] On a multi-ticker panel, wrap the call in ``.over`` so each ticker warms up independently: >>> frame = pl.DataFrame( ... { ... "ticker": ["A"] * 4 + ["B"] * 4, ... "high": [12.0, 13.0, 12.5, 14.0, 22.0, 24.0, 23.0, 25.0], ... "low": [10.0, 11.0, 11.0, 12.0, 20.0, 21.0, 21.0, 23.0], ... "close": [11.0, 12.5, 11.5, 13.5, 21.5, 21.5, 22.5, 24.0], ... "volume": [100.0, 120.0, 90.0, 110.0, 100.0, 120.0, 90.0, 110.0], ... } ... ) >>> expr = chaikin_money_flow( ... pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume"), 2 ... ).over("ticker").round(4) >>> frame.with_columns(expr.alias("chaikin_money_flow"))["chaikin_money_flow"].to_list() [None, 0.2727, 0.1429, 0.125, None, -0.1364, -0.1667, 0.225] A ``null`` (skipped, and any window it touches yields ``null``) and a ``NaN`` (which propagates) make the exact handling visible at a glance: >>> frame = pl.DataFrame( ... { ... "high": [12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0], ... "low": [10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0], ... "close": [11.5, 12.5, 13.0, 14.5, None, 16.0, float("nan"), 18.0], ... "volume": [100.0, 120.0, 90.0, 110.0, 130.0, 100.0, 95.0, 140.0], ... } ... ) >>> expr = chaikin_money_flow(pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume"), 2).round(4) >>> frame.select(expr.alias("chaikin_money_flow"))["chaikin_money_flow"].to_list() [None, 0.5, 0.2857, 0.275, None, None, nan, nan] """ high = float64_expr(high) low = float64_expr(low) close = float64_expr(close) volume = float64_expr(volume) validate_window(window) # Doji guard: high - low == 0 pins MFM to 0 (avoids 0/0). A null/NaN input makes the range null/NaN (not == 0), # so missing data propagates to a null/NaN money-flow volume rather than being silently zeroed. high_low_range = high - low money_flow_multiplier = ( pl.when(high_low_range == 0).then(0.0).otherwise(((close - low) - (high - close)) / high_low_range) ) money_flow_volume = money_flow_multiplier * volume weighted_sum = money_flow_volume.rolling_sum(window_size=window) raw = weighted_sum / volume.rolling_sum(window_size=window) # A window whose volume is all zero is the 0/0 degenerate: detect it exactly via the rolling maximum of the absolute # volume (which is exactly 0 only when every volume in the window is 0), so a sub-ULP residual left in the rolling # sum cannot fake a finite or infinite reading, and return NaN as documented. The ``& weighted_sum.is_not_null()`` # gate preserves null precedence: a ``null`` in a price input nulls the weighted sum even though the all-zero # volume's rolling maximum is still ``0``, so such a window stays ``null`` rather than taking the guard's ``NaN``. # The clip pins the bound: the ratio is mathematically in [-1, 1] (so is the multiplier), so past a sane dynamic # range a residual-dominated near-zero-volume window degrades but stays in range (see CORRECTNESS.md). is_zero_volume = (volume.abs().rolling_max(window_size=window) == 0) & weighted_sum.is_not_null() return pl.when(is_zero_volume).then(float("nan")).otherwise(raw.clip(-1.0, 1.0))
[docs] def money_flow_index( high: pl.Expr, low: pl.Expr, close: pl.Expr, volume: pl.Expr, window: int, ) -> pl.Expr: r""" Money Flow Index (MFI), also known as the volume-weighted Relative Strength Index. A bounded ``[0, 100]`` momentum oscillator (Quong & Soudack, 1989) that grades buying versus selling pressure by weighting each bar's typical price by its traded volume. It is the volume-aware analogue of the RSI: where the RSI accumulates price gains and losses, the MFI accumulates the *raw money flow* (typical price times volume) on up-days versus down-days. With :math:`n = \text{window}`, typical price :math:`\mathrm{TP}_t = (H_t + L_t + C_t) / 3` and raw money flow :math:`\mathrm{RMF}_t = \mathrm{TP}_t \cdot V_t`: .. math:: \mathrm{PF}_t &= \mathrm{RMF}_t \,\mathbf{1}\!\left[\mathrm{TP}_t > \mathrm{TP}_{t-1}\right], \qquad \mathrm{NF}_t = \mathrm{RMF}_t \,\mathbf{1}\!\left[\mathrm{TP}_t < \mathrm{TP}_{t-1}\right], \\[4pt] \mathrm{MR}_t &= \frac{\sum_{i=0}^{n-1} \mathrm{PF}_{t-i}}{\sum_{i=0}^{n-1} \mathrm{NF}_{t-i}}, \qquad \mathrm{MFI}_t = 100 - \frac{100}{1 + \mathrm{MR}_t}. A bar whose typical price is unchanged (:math:`\mathrm{TP}_t = \mathrm{TP}_{t-1}`) contributes to neither the positive nor the negative money flow. The first row has no predecessor and so contributes no money flow, which is why a full ``window`` of *changes* (and therefore ``window + 1`` price bars) is needed before the first value is defined. As a volume-weighted RSI it is bounded in :math:`[0, 100]`: a window with no negative money flow saturates the oscillator at ``100`` and one with no positive money flow at ``0``. Args: high: High-price series (e.g. ``pl.col("high")``). low: Low-price series (e.g. ``pl.col("low")``). close: Close-price series (e.g. ``pl.col("close")``). volume: Traded-volume series (e.g. ``pl.col("volume")``). window: Number of observations in the moving window. Must be ``>= 1``. Returns: The MFI for each row, bounded in ``[0, 100]`` and the same length as the inputs. The first ``window`` values are ``null`` (warm-up): the value is defined only once ``window`` price *changes* have accumulated, so the first defined row is at index ``window`` rather than ``window - 1``. Raises: TypeError: If any input is not a ``pl.Expr``. ValueError: If ``window < 1``. Note: **Precision** -- agrees with its independent reference oracle to ten significant figures (a ``1e-10`` band) on any finite input within a sane dynamic range; ``CORRECTNESS.md`` gives the method and the float-conditioning limit beyond it. **Classification:** The up / down classification compares each typical price with the previous one, so a missing or undefined typical price taints two consecutive change positions (its own and the following one). **Edge-case behavior:** - **Null** — a ``null`` in ``high``, ``low``, or ``close`` voids the typical price at that row and at the next change, so any window reaching either yields ``null``; a ``null`` in ``volume`` voids only that row's money flow. ``null`` takes precedence over ``NaN``. - **NaN** — a ``NaN`` in any input contaminates the affected money flow and yields ``NaN`` for every window that contains it. A ``NaN`` typical price makes both its own change and the next one undefined in sign, so each is poisoned into the positive *and* the negative money flow as ``NaN``, voiding every window that reaches either change (the same two-position taint as the ``null`` analogue, but surfaced as ``NaN`` rather than ``null``). - **Division by zero** — a window with no negative money flow but non-zero positive flow has money ratio ``+inf`` and the MFI saturates at ``100``; symmetrically an all-down window gives ``0``. A window in which both flows are zero (the typical price never moves) leaves the money ratio at ``0 / 0`` and yields ``NaN`` -- the oscillator is genuinely undefined there. - **Partitioning** — wrap the call in ``.over(...)`` for a multi-series panel so neither the difference nor the rolling sums span series boundaries, e.g. ``money_flow_index(pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume"), 14).over("ticker")``. See Also: - :func:`rsi`: The price-only analogue (no volume weighting). - :func:`chaikin_money_flow`: Another volume-weighted money-flow oscillator. - :func:`price_typical`: The per-bar typical price this weights by volume. References: - Quong, Gene & Soudack, Avrum (1989). "Volume-Weighted RSI: Money Flow". *Technical Analysis of Stocks & Commodities*, 7(3). - https://en.wikipedia.org/wiki/Money_flow_index - https://www.investopedia.com/terms/m/mfi.asp Examples: >>> import polars as pl >>> from pomata.indicators import money_flow_index >>> >>> frame = pl.DataFrame( ... { ... "high": [10.0, 11.0, 12.0, 11.0, 13.0, 14.0, 13.0, 15.0], ... "low": [8.0, 9.0, 10.0, 9.0, 11.0, 12.0, 11.0, 13.0], ... "close": [9.0, 10.0, 11.0, 10.0, 12.0, 13.0, 12.0, 14.0], ... "volume": [100.0, 150.0, 120.0, 130.0, 110.0, 160.0, 140.0, 170.0], ... } ... ) >>> frame.select( ... money_flow_index(pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume"), window=3) ... .round(4) ... .alias("mfi_3") ... )["mfi_3"].to_list() [None, None, None, 68.4466, 67.0051, 72.3404, 66.9291, 72.6384] On a multi-ticker panel, wrap the call in ``.over`` so each ticker warms up independently: >>> frame = pl.DataFrame( ... { ... "ticker": ["A"] * 4 + ["B"] * 4, ... "high": [12.0, 13.0, 12.5, 14.0, 22.0, 24.0, 23.0, 25.0], ... "low": [10.0, 11.0, 11.0, 12.0, 20.0, 21.0, 21.0, 23.0], ... "close": [11.0, 12.5, 11.5, 13.5, 21.5, 21.5, 22.5, 24.0], ... "volume": [100.0, 120.0, 90.0, 110.0, 100.0, 120.0, 90.0, 110.0], ... } ... ) >>> expr = ( ... money_flow_index(pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume"), 2) ... .over("ticker") ... .round(4) ... ) >>> frame.with_columns(expr.alias("money_flow_index"))["money_flow_index"].to_list() [None, None, 58.1673, 57.972, None, None, 100.0, 100.0] A ``null`` (skipped, and any window it touches yields ``null``) and a ``NaN`` (which propagates) make the exact handling visible at a glance: >>> frame = pl.DataFrame( ... { ... "high": [12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0], ... "low": [10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0], ... "close": [11.5, 12.5, 13.0, 14.5, None, 16.0, float("nan"), 18.0], ... "volume": [100.0, 120.0, 90.0, 110.0, 130.0, 100.0, 95.0, 140.0], ... } ... ) >>> expr = money_flow_index(pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume"), 2).round(4) >>> frame.select(expr.alias("money_flow_index"))["money_flow_index"].to_list() [None, None, 100.0, 100.0, None, None, None, nan] """ high = float64_expr(high) low = float64_expr(low) close = float64_expr(close) volume = float64_expr(volume) validate_window(window) typical_price = price_typical(high, low, close) raw_money_flow = typical_price * volume typical_change = typical_price.diff() # A NaN typical change is undefined in sign: route it to NaN in *both* flows so it poisons every window reaching the # successor change, matching the null analogue (which voids its own and the following change). Without this guard # Polars' total order (``NaN > 0`` is ``True``) would route the NaN change into the positive flow at its finite raw # money flow, faking a fully-positive bar after a NaN typical price. change_is_nan = typical_change.is_nan() positive_flow = ( pl.when(change_is_nan) .then(pl.lit(float("nan"))) .when(typical_change > 0.0) .then(raw_money_flow) .when(typical_change <= 0.0) .then(pl.lit(0.0)) .otherwise(None) ) negative_flow = ( pl.when(change_is_nan) .then(pl.lit(float("nan"))) .when(typical_change < 0.0) .then(raw_money_flow) .when(typical_change >= 0.0) .then(pl.lit(0.0)) .otherwise(None) ) money_ratio = positive_flow.rolling_sum(window_size=window) / negative_flow.rolling_sum(window_size=window) index = 100.0 - 100.0 / (1.0 + money_ratio) # Flat window: when every typical-price change is exactly zero both flows are zero, the documented 0/0 -> NaN. # Detect it exactly via the rolling maximum of the absolute change (zero only when every change is exactly 0), so a # sub-ULP residual left in the rolling sums after large flows slide out cannot fake a saturated reading. A null/NaN # change makes the rolling maximum null/NaN (never == 0), so the guard does not fire and the null-precedence and # NaN-poisoning above are unchanged. The clip pins the [0, 100] bound: beyond a sane dynamic range a residual- # dominated near-flat window degrades but stays in range rather than escaping (see CORRECTNESS.md). is_flat = typical_change.abs().rolling_max(window_size=window) == 0 return pl.when(is_flat).then(float("nan")).otherwise(index.clip(0.0, 100.0))
[docs] def obv( expr: pl.Expr, volume: pl.Expr, ) -> pl.Expr: r""" On-Balance Volume (OBV), also known as Granville's cumulative volume. A momentum indicator that ties volume to price direction (Joseph Granville, 1963). Each bar adds the whole bar volume to a running total when the close rises, subtracts it when the close falls, and leaves the total unchanged when the close is flat; the cumulative line is read for divergences against price rather than for its absolute level: .. math:: \mathrm{OBV}_t = \sum_{i=1}^{t} \operatorname{sign}(x_i - x_{i-1})\, V_i, \qquad \operatorname{sign}(d) = \begin{cases} +1, & d > 0, \\ \phantom{+}0, & d = 0, \\ -1, & d < 0, \end{cases} where :math:`x` is ``expr`` and :math:`V` is ``volume``. The first bar has no predecessor, so its direction is undefined; it contributes ``0`` and the series therefore starts at :math:`\mathrm{OBV}_0 = 0`. OBV is an unbounded, level-arbitrary cumulative series; only its slope and divergences against price are meaningful, not its absolute magnitude. Because the direction is the sign of the bar-to-bar close change, it is invariant to any additive shift of the price level and homogeneous of degree one in ``volume`` (scaling all volumes by a constant scales OBV by that constant). Args: expr: Input series, conventionally the close (any series is accepted; e.g. ``pl.col("close")``). volume: Traded-volume series (e.g. ``pl.col("volume")``). Returns: The OBV for each row, the same length as the inputs. There is no window and no warm-up: every row is defined, starting at ``0`` on the first row. Raises: TypeError: If any input is not a ``pl.Expr``. Note: **Precision** -- agrees with its independent reference oracle to ten significant figures (a ``1e-10`` band) on any finite input within a sane dynamic range; ``CORRECTNESS.md`` gives the method and the float-conditioning limit beyond it. **Edge-case behavior:** - **Null** — a ``null`` close zeroes the direction both at its own row and at the following row (each ``diff`` touching the ``null`` is itself ``null`` and is filled to ``0``), so those bars contribute nothing while the running total carries on. A ``null`` volume makes that bar's contribution ``null`` (``0 * null`` is ``null``, so this holds even on the first or a flat bar where the direction is ``0``): the output is ``null`` at exactly that row while the cumulative sum skips it and continues from the prior total. - **NaN** — a ``NaN`` close (via ``diff``) or a ``NaN`` volume poisons the contribution at its row and, once summed, latches the running total to ``NaN`` for every subsequent row; because ``0 * NaN`` is ``NaN`` under IEEE-754, a ``NaN`` volume contaminates the total even on a flat or first bar where the direction is ``0``. A ``null``-contribution row still emits ``null`` at its own position even after the latch. - **Partitioning** — wrap the call in ``.over(...)`` for a multi-series panel so neither the ``diff`` nor the cumulative sum spans series boundaries, e.g. ``obv(pl.col("close"), pl.col("volume")).over("ticker")``. See Also: - :func:`accumulation_distribution`: Another cumulative volume line. - :func:`money_flow_index`: A bounded volume-weighted oscillator. - :func:`chaikin_money_flow`: A windowed volume-weighted money-flow ratio. References: - Granville, Joseph E. (1963). *Granville's New Key to Stock Market Profits*. Prentice-Hall. - https://en.wikipedia.org/wiki/On-balance_volume - https://www.investopedia.com/terms/o/onbalancevolume.asp Examples: >>> import polars as pl >>> from pomata.indicators import obv >>> >>> frame = pl.DataFrame( ... { ... "close": [10.0, 12.0, 11.0, 11.0, 13.0, 9.0], ... "volume": [100.0, 200.0, 150.0, 80.0, 300.0, 250.0], ... } ... ) >>> frame.select(obv(pl.col("close"), pl.col("volume")).round(4).alias("obv"))["obv"].to_list() [0.0, 200.0, 50.0, 50.0, 350.0, 100.0] On a multi-ticker panel, wrap the call in ``.over`` so each ticker warms up independently: >>> frame = pl.DataFrame( ... { ... "ticker": ["A"] * 4 + ["B"] * 4, ... "close": [11.0, 12.5, 11.5, 13.5, 21.5, 21.5, 22.5, 24.0], ... "volume": [100.0, 120.0, 90.0, 110.0, 100.0, 120.0, 90.0, 110.0], ... } ... ) >>> expr = obv(pl.col("close"), pl.col("volume")).over("ticker").round(4) >>> frame.with_columns(expr.alias("obv"))["obv"].to_list() [0.0, 120.0, 30.0, 140.0, 0.0, 0.0, 90.0, 200.0] A ``null`` (skipped, the running total carrying across it) and a ``NaN`` (which propagates) make the exact handling visible at a glance: >>> frame = pl.DataFrame( ... { ... "close": [10.0, 11.0, 12.0, 13.0, None, 15.0, float("nan"), 17.0, 18.0, 19.0], ... "volume": [100.0, 120.0, 90.0, 110.0, 130.0, 100.0, 95.0, 140.0, 105.0, 115.0], ... } ... ) >>> expr = obv(pl.col("close"), pl.col("volume")).round(4) >>> frame.select(expr.alias("obv"))["obv"].to_list() [0.0, 120.0, 210.0, 320.0, 320.0, 320.0, nan, nan, nan, nan] """ expr = float64_expr(expr) volume = float64_expr(volume) # Bar-local direction + one cumulative sum — no path-dependent recursion, so no Rust kernel is needed. direction = expr.diff().sign().fill_null(0) return (direction * volume).cum_sum()
[docs] def vwap( high: pl.Expr, low: pl.Expr, close: pl.Expr, volume: pl.Expr, ) -> pl.Expr: r""" Volume-Weighted Average Price (VWAP) — the running volume-weighted mean of the typical price. The benchmark intraday price: each bar's typical price (:func:`price_typical`) weighted by its volume, accumulated from the start of the partition. It anchors to that start, so the canonical use is one session per ``.over(...)`` group -- an un-anchored VWAP over years of data is rarely meaningful: .. math:: \mathrm{VWAP}_t = \frac{\sum_{i \le t} \mathrm{typical}_i \, V_i}{\sum_{i \le t} V_i}, \quad V_i = \mathrm{volume}_i, \quad \mathrm{typical}_i = \frac{\mathrm{high}_i + \mathrm{low}_i + \mathrm{close}_i}{3}. Args: high: High-price series (e.g. ``pl.col("high")``). low: Low-price series (e.g. ``pl.col("low")``). close: Close-price series (e.g. ``pl.col("close")``). volume: Traded-volume series (e.g. ``pl.col("volume")``). Returns: The running VWAP for each row, the same length as the inputs. There is no warm-up: row ``0`` is defined as soon as its cumulative volume is positive (a leading zero-volume run reads ``NaN`` until volume accrues). Raises: TypeError: If any input is not a ``pl.Expr``. Note: **Precision** -- agrees with its independent reference oracle to ten significant figures (a ``1e-10`` band) on any finite input within a sane dynamic range; ``CORRECTNESS.md`` gives the method and the conditioning limit of the long cumulative sums beyond it. **Anchoring:** VWAP accumulates from the start of the partition, so wrap the call in ``.over(session_key)`` to reset it per session (e.g. one trading day): ``vwap(...).over("session")``. Without an anchor it accumulates across the whole series, the classic VWAP misuse. **Inputs:** ``high`` / ``low`` / ``close`` / ``volume`` must share a length and alignment; ``volume`` is expected non-negative (a negative volume is summed as-is, with no guard -- garbage in, garbage out). **Edge-case behavior:** - **Zero volume** — at the head a zero cumulative volume gives ``0 / 0 == NaN`` until volume accrues; an interior zero-volume bar adds nothing (the prefix sums carry forward, with no subtract-on-exit residual). - **Null** — a ``null`` in any input nulls that bar's contribution at its own row; both cumulative sums skip the bar together (a ``null`` price input drops its volume from the denominator too), so the bar is a clean missing observation, not a denominator-only contribution. - **NaN** — a ``NaN`` in any input poisons the cumulative sum from its row onward (it cannot be subtracted out). - **Partitioning** — see Anchoring above; ``.over(...)`` is the intended use, not an afterthought. See Also: - :func:`vwma`: The windowed volume-weighted moving average, for a rolling rather than anchored weight. - :func:`price_typical`: The per-bar price this weights. - :func:`sma`: The equal-weighted moving average, the volume-blind analogue. References: - https://en.wikipedia.org/wiki/Volume-weighted_average_price - https://www.investopedia.com/terms/v/vwap.asp Examples: >>> import polars as pl >>> from pomata.indicators import vwap >>> >>> frame = pl.DataFrame( ... { ... "high": [2.0, 4.0, 6.0], ... "low": [0.0, 2.0, 4.0], ... "close": [1.0, 3.0, 5.0], ... "volume": [10.0, 20.0, 30.0], ... } ... ) >>> expr = vwap(pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume")) >>> frame.select(expr.round(4).alias("vwap"))["vwap"].to_list() [1.0, 2.3333, 3.6667] Anchor per session with ``.over`` so each day's VWAP restarts: >>> frame = pl.DataFrame( ... { ... "session": ["a", "a", "b", "b"], ... "high": [2.0, 4.0, 12.0, 14.0], ... "low": [0.0, 2.0, 10.0, 12.0], ... "close": [1.0, 3.0, 11.0, 13.0], ... "volume": [10.0, 20.0, 10.0, 20.0], ... } ... ) >>> expr = vwap(pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume")).over("session") >>> frame.select(expr.round(4).alias("vwap"))["vwap"].to_list() [1.0, 2.3333, 11.0, 12.3333] A ``null`` (yields ``null`` at that row) and a ``NaN`` (which latches in the running totals) make it visible: >>> frame = pl.DataFrame( ... { ... "high": [10.0, 11.0, 12.0, 13.0, 14.0, 15.0], ... "low": [8.0, 9.0, 10.0, 11.0, 12.0, 13.0], ... "close": [9.0, 10.0, None, 12.0, float("nan"), 14.0], ... "volume": [100.0, 200.0, 300.0, 400.0, 500.0, 600.0], ... } ... ) >>> expr = vwap(pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume")) >>> frame.select(expr.round(4).alias("vwap"))["vwap"].to_list() [9.0, 9.6667, None, 11.0, nan, nan] """ high = float64_expr(high) low = float64_expr(low) close = float64_expr(close) volume = float64_expr(volume) # Cumulative typical-price-times-volume over cumulative volume; anchors to the partition start (use .over). A null # in a price input nulls the bar's weighted term, so the denominator must drop that bar's volume too -- both sums # skip the bar together, so a null is a clean missing observation rather than a denominator-only contribution. typical = price_typical(high, low, close) weighted = typical * volume volume_masked = pl.when(weighted.is_null()).then(None).otherwise(volume) return weighted.cum_sum() / volume_masked.cum_sum()