From b5f6381faccef4549925ede766bf02649a887ba1 Mon Sep 17 00:00:00 2001 From: Max Halford Date: Fri, 4 Feb 2022 00:55:35 +0100 Subject: [PATCH] Batch update for Mean/Var/Cov (#838) * simplify __add__ and __sub__ * implement Mean.update_many * implement Var.update_many * don't inherit from SortedWindow * implement test_add_bivariate * remove tests, they're already there * typo * implement Cov.update_many * flake * Update test_var.py * unweighted * types * Update cov.py * Update iqr.py * Update unreleased.md * rename special to misc Former-commit-id: be2a9047b22d35d51408dcb862c7574972bab2be --- docs/releases/unreleased.md | 8 ++- river/__init__.py | 2 + river/{special => misc}/__init__.py | 1 + river/{special => misc}/histogram.py | 0 river/{special => misc}/sdft.py | 0 river/{special => misc}/skyline.py | 0 river/stats/cov.py | 66 +++++++++++++++---------- river/stats/iqr.py | 4 +- river/stats/maximum.py | 20 ++++---- river/stats/mean.py | 13 +++-- river/stats/minimum.py | 10 ++-- river/stats/quantile.py | 26 +++++----- river/stats/{test_.py => test_stats.py} | 55 ++++++++++++++------- river/stats/test_var.py | 31 ++++++++++++ river/stats/var.py | 11 +++++ 15 files changed, 165 insertions(+), 82 deletions(-) rename river/{special => misc}/__init__.py (80%) rename river/{special => misc}/histogram.py (100%) rename river/{special => misc}/sdft.py (100%) rename river/{special => misc}/skyline.py (100%) rename river/stats/{test_.py => test_stats.py} (83%) create mode 100644 river/stats/test_var.py diff --git a/docs/releases/unreleased.md b/docs/releases/unreleased.md index 24f53a8bee..b7b71da731 100644 --- a/docs/releases/unreleased.md +++ b/docs/releases/unreleased.md @@ -31,6 +31,10 @@ - `metrics.ROCAUC` works with `base.AnomalyDetectors`s. +## misc + +- Created this module to store some stuff that was in the `utils` module but wasn't necessarily shared between modules. + ## reco - Renamed the `Recommender` base class into `Ranker`. @@ -38,9 +42,9 @@ - Removed `reco.SurpriseWrapper` as it wasn't really useful. - Added an `is_contextual` property to each ranker to indicate if a model makes use of contextual features or not. -## special +## stats -- Created this module to store some stuff that was in the `utils` module. +- `stats.Mean`, `stats.Var`, and `stats.Cov` each now have an `update_many` method which accepts numpy arrays. ## utils diff --git a/river/__init__.py b/river/__init__.py index 5f661a5ef0..dfefaebbbd 100644 --- a/river/__init__.py +++ b/river/__init__.py @@ -23,6 +23,7 @@ imblearn, linear_model, metrics, + misc, model_selection, multiclass, multioutput, @@ -60,6 +61,7 @@ "imblearn", "linear_model", "metrics", + "misc", "model_selection", "multiclass", "multioutput", diff --git a/river/special/__init__.py b/river/misc/__init__.py similarity index 80% rename from river/special/__init__.py rename to river/misc/__init__.py index 7e50e351ed..8ac9544acc 100644 --- a/river/special/__init__.py +++ b/river/misc/__init__.py @@ -1,3 +1,4 @@ +"""Miscellaneous algorithms.""" from .histogram import Histogram from .sdft import SDFT from .skyline import Skyline diff --git a/river/special/histogram.py b/river/misc/histogram.py similarity index 100% rename from river/special/histogram.py rename to river/misc/histogram.py diff --git a/river/special/sdft.py b/river/misc/sdft.py similarity index 100% rename from river/special/sdft.py rename to river/misc/sdft.py diff --git a/river/special/skyline.py b/river/misc/skyline.py similarity index 100% rename from river/special/skyline.py rename to river/misc/skyline.py diff --git a/river/stats/cov.py b/river/stats/cov.py index 53c317ba17..e35221908d 100644 --- a/river/stats/cov.py +++ b/river/stats/cov.py @@ -1,6 +1,8 @@ import copy -from . import base, mean, summing +import numpy as np + +from . import base, summing class Cov(base.Bivariate): @@ -43,91 +45,101 @@ class Cov(base.Bivariate): def __init__(self, ddof=1): self.ddof = ddof - self.mean_x = mean.Mean() - self.mean_y = mean.Mean() + self.mean_x = 0 + self.mean_y = 0 + self.n = 0 + self._C = 0 self.cov = 0 def update(self, x, y, w=1.0): - dx = x - self.mean_x.get() - self.mean_x.update(x, w) - self.mean_y.update(y, w) - dy = y - self.mean_y.get() - self.cov += w * (dx * dy - self.cov) / max(1, self.mean_x.n - self.ddof) + self.n += w + dx = x - self.mean_x + self.mean_x += (w / self.n) * dx + self.mean_y += (w / self.n) * (y - self.mean_y) + self._C += w * dx * (y - self.mean_y) + self.cov = self._C / max(1, self.n - self.ddof) + return self + + def update_many(self, X: np.ndarray, Y: np.ndarray): + self.n += len(X) + dx = X - self.mean_x + self.mean_x += dx.sum() / self.n + self.mean_y += ((Y - self.mean_y)).sum() / self.n + self._C += (dx * (Y - self.mean_y)).sum() + self.cov = self._C / max(1, self.n - self.ddof) return self def get(self): return self.cov def __iadd__(self, other): - old_mean_x = self.mean_x.get() - old_mean_y = self.mean_y.get() - old_n = self.mean_x.n + old_mean_x = self.mean_x + old_mean_y = self.mean_y + old_n = self.n # Update mean estimates self.mean_x += other.mean_x self.mean_y += other.mean_y - if self.mean_x.n <= self.ddof: + if self.n <= self.ddof: return self # Scale factors scale_a = old_n - self.ddof - scale_b = other.mean_x.n - other.ddof + scale_b = other.n - other.ddof # Scale the covariances self.cov = scale_a * self.cov + scale_b * other.cov # Apply correction factor self.cov += ( - (old_mean_x - other.mean_x.get()) - * (old_mean_y - other.mean_y.get()) - * ((old_n * other.mean_x.n) / self.mean_x.n) + (old_mean_x - other.mean_x) + * (old_mean_y - other.mean_y) + * ((old_n * other.n) / self.n) ) # Reapply scale - self.cov /= self.mean_x.n - self.ddof + self.cov /= self.n - self.ddof return self def __add__(self, other): result = copy.deepcopy(self) result += other - return result def __isub__(self, other): - if self.mean_x.n <= self.ddof: + if self.n <= self.ddof: return self - old_n = self.mean_x.n + old_n = self.n # Update mean estimates self.mean_x -= other.mean_x self.mean_y -= other.mean_y - if self.mean_x.n <= self.ddof: + if self.n <= self.ddof: self.cov = 0 return self # Scale factors scale_x = old_n - self.ddof - scale_b = other.mean_x.n - other.ddof + scale_b = other.n - other.ddof # Scale the covariances self.cov = scale_x * self.cov - scale_b * other.cov # Apply correction self.cov -= ( - (self.mean_x.get() - other.mean_x.get()) - * (self.mean_y.get() - other.mean_y.get()) - * ((self.mean_x.n * other.mean_x.n) / old_n) + (self.mean_x - other.mean_x) + * (self.mean_y - other.mean_y) + * ((self.n * other.n) / old_n) ) # Re-apply scale factor - self.cov /= self.mean_x.n - self.ddof + self.cov /= self.n - self.ddof return self def __sub__(self, other): result = copy.deepcopy(self) result -= other - return result diff --git a/river/stats/iqr.py b/river/stats/iqr.py index 8c524993f2..eeeab9368c 100644 --- a/river/stats/iqr.py +++ b/river/stats/iqr.py @@ -1,4 +1,3 @@ -from .. import utils from . import base, quantile @@ -64,7 +63,7 @@ def get(self): return q_sup - q_inf -class RollingIQR(base.RollingUnivariate, utils.SortedWindow): +class RollingIQR(base.RollingUnivariate): """Computes the rolling interquartile range. Parameters @@ -105,7 +104,6 @@ class RollingIQR(base.RollingUnivariate, utils.SortedWindow): """ def __init__(self, window_size: int, q_inf=0.25, q_sup=0.75): - super().__init__(size=window_size) if q_inf >= q_sup: raise ValueError("q_inf must be strictly less than q_sup") self.q_inf = q_inf diff --git a/river/stats/maximum.py b/river/stats/maximum.py index 32dcf5584a..7c1902d959 100644 --- a/river/stats/maximum.py +++ b/river/stats/maximum.py @@ -43,7 +43,7 @@ def get(self): return self.max -class RollingMax(base.RollingUnivariate, utils.SortedWindow): +class RollingMax(base.RollingUnivariate): """Running max over a window. Parameters @@ -70,19 +70,19 @@ class RollingMax(base.RollingUnivariate, utils.SortedWindow): """ def __init__(self, window_size: int): - super().__init__(size=window_size) + self.window = utils.SortedWindow(size=window_size) @property def window_size(self): - return self.size + return self.window.size def update(self, x): - self.append(x) + self.window.append(x) return self def get(self): try: - return self[-1] + return self.window[-1] except IndexError: return None @@ -125,7 +125,7 @@ def get(self): return self.abs_max -class RollingAbsMax(base.RollingUnivariate, utils.SortedWindow): +class RollingAbsMax(base.RollingUnivariate): """Running absolute max over a window. Parameters @@ -152,18 +152,18 @@ class RollingAbsMax(base.RollingUnivariate, utils.SortedWindow): """ def __init__(self, window_size: int): - super().__init__(size=window_size) + self.window = utils.SortedWindow(size=window_size) @property def window_size(self): - return self.size + return self.window.size def update(self, x): - self.append(abs(x)) + self.window.append(abs(x)) return self def get(self): try: - return self[-1] + return self.window[-1] except IndexError: return None diff --git a/river/stats/mean.py b/river/stats/mean.py index 7685b32a56..dd7c6d8a42 100644 --- a/river/stats/mean.py +++ b/river/stats/mean.py @@ -1,5 +1,7 @@ import copy +import numpy as np + from . import base, summing @@ -45,6 +47,13 @@ def update(self, x, w=1.0): self._mean += (w / self.n) * (x - self._mean) return self + def update_many(self, X: np.ndarray): + a = self.n / (self.n + len(X)) + b = len(X) / (self.n + len(X)) + self._mean = a * self._mean + b * np.mean(X) + self.n += len(X) + return self + def revert(self, x, w=1.0): self.n -= w if self.n < 0: @@ -70,13 +79,11 @@ def __iadd__(self, other): old_n = self.n self.n += other.n self._mean = (old_n * self._mean + other.n * other._mean) / self.n - return self def __add__(self, other): result = copy.deepcopy(self) result += other - return result def __isub__(self, other): @@ -88,13 +95,11 @@ def __isub__(self, other): else: self.n = 0.0 self._mean = 0.0 - return self def __sub__(self, other): result = copy.deepcopy(self) result -= other - return result diff --git a/river/stats/minimum.py b/river/stats/minimum.py index 6b16ffd78b..dc1cf35181 100644 --- a/river/stats/minimum.py +++ b/river/stats/minimum.py @@ -27,7 +27,7 @@ def get(self): return self.min -class RollingMin(base.RollingUnivariate, utils.SortedWindow): +class RollingMin(base.RollingUnivariate): """Running min over a window. Parameters @@ -54,18 +54,18 @@ class RollingMin(base.RollingUnivariate, utils.SortedWindow): """ def __init__(self, window_size: int): - super().__init__(size=window_size) + self.window = utils.SortedWindow(size=window_size) @property def window_size(self): - return self.size + return self.window.size def update(self, x): - self.append(x) + self.window.append(x) return self def get(self): try: - return self[0] + return self.window[0] except IndexError: return None diff --git a/river/stats/quantile.py b/river/stats/quantile.py index f436c7daa0..394730c9f3 100644 --- a/river/stats/quantile.py +++ b/river/stats/quantile.py @@ -164,7 +164,7 @@ def get(self): return None -class RollingQuantile(base.RollingUnivariate, utils.SortedWindow): +class RollingQuantile(base.RollingUnivariate): """Running quantile over a window. Parameters @@ -207,23 +207,23 @@ class RollingQuantile(base.RollingUnivariate, utils.SortedWindow): """ def __init__(self, q, window_size): - super().__init__(size=window_size) + self.window = utils.SortedWindow(size=window_size) self.q = q - idx = self.q * (self.size - 1) + idx = self.q * (self.window.size - 1) self._lower = int(math.floor(idx)) self._higher = self._lower + 1 - if self._higher > self.size - 1: - self._higher = self.size - 1 + if self._higher > self.window.size - 1: + self._higher = self.window.size - 1 self._frac = idx - self._lower def _prepare(self): - if len(self) < self.size: - idx = self.q * (len(self) - 1) + if len(self.window) < self.window.size: + idx = self.q * (len(self.window) - 1) lower = int(math.floor(idx)) higher = lower + 1 - if higher > len(self) - 1: - higher = len(self) - 1 + if higher > len(self.window) - 1: + higher = len(self.window) - 1 frac = idx - lower return lower, higher, frac @@ -231,15 +231,17 @@ def _prepare(self): @property def window_size(self): - return self.size + return self.window.size def update(self, x): - self.append(x) + self.window.append(x) return self def get(self): lower, higher, frac = self._prepare() try: - return self[lower] + (self[higher] - self[lower]) * frac + return ( + self.window[lower] + (self.window[higher] - self.window[lower]) * frac + ) except IndexError: return None diff --git a/river/stats/test_.py b/river/stats/test_stats.py similarity index 83% rename from river/stats/test_.py rename to river/stats/test_stats.py index 0dc3230bf4..62656fb315 100644 --- a/river/stats/test_.py +++ b/river/stats/test_stats.py @@ -178,29 +178,46 @@ def tail(iterable, n): assert math.isclose(stat.get(), func(x_tail, y_tail), abs_tol=1e-10) -def test_weighted_variance_with_close_numbers(): - """ +@pytest.mark.parametrize( + "stat", + filter( + lambda stat: hasattr(stat, "update_many") + and issubclass(stat.__class__, stats.Univariate), + load_stats(), + ), + ids=lambda stat: stat.__class__.__name__, +) +def test_update_many_univariate(stat): + + batch_stat = stat.clone() - Origin of this test: https://github.com/online-ml/river/issues/732 + for _ in range(5): + X = np.random.random(10) + batch_stat.update_many(X) + for x in X: + stat.update(x) - This test would fail if Var were implemented with a numerically unstable algorithm. + assert math.isclose(batch_stat.get(), stat.get()) - """ - D = [ - (99.99999978143265, 6), - (99.99999989071631, 8), - (99.99999994535816, 6), - (99.99999997267908, 9), - (99.99999998633952, 10), - (99.99999999316977, 3), - (99.99999999829245, 5), - (99.99999999957309, 9), - ] +@pytest.mark.parametrize( + "stat", + filter( + lambda stat: hasattr(stat, "update_many") + and issubclass(stat.__class__, stats.Bivariate), + load_stats(), + ), + ids=lambda stat: stat.__class__.__name__, +) +def test_update_many_bivariate(stat): - var = stats.Var() + batch_stat = stat.clone() - for x, w in D: - var.update(x, w) + for _ in range(5): + X = np.random.random(10) + Y = np.random.random(10) + batch_stat.update_many(X, Y) + for x, y in zip(X, Y): + stat.update(x, y) - assert var.get() > 0 and math.isclose(var.get(), 4.648047194845607e-15) + assert math.isclose(batch_stat.get(), stat.get()) diff --git a/river/stats/test_var.py b/river/stats/test_var.py new file mode 100644 index 0000000000..b9784a90d2 --- /dev/null +++ b/river/stats/test_var.py @@ -0,0 +1,31 @@ +import math + +from river import stats + + +def test_weighted_variance_with_close_numbers(): + """ + + Origin of this test: https://github.com/online-ml/river/issues/732 + + This test would fail if Var were implemented with a numerically unstable algorithm. + + """ + + D = [ + (99.99999978143265, 6), + (99.99999989071631, 8), + (99.99999994535816, 6), + (99.99999997267908, 9), + (99.99999998633952, 10), + (99.99999999316977, 3), + (99.99999999829245, 5), + (99.99999999957309, 9), + ] + + var = stats.Var() + + for x, w in D: + var.update(x, w) + + assert var.get() > 0 and math.isclose(var.get(), 4.648047194845607e-15) diff --git a/river/stats/var.py b/river/stats/var.py index c13515fd91..2f432fcbac 100644 --- a/river/stats/var.py +++ b/river/stats/var.py @@ -1,5 +1,7 @@ import copy +import numpy as np + from . import base, mean @@ -58,6 +60,15 @@ def update(self, x, w=1.0): self._S += w * (x - mean_old) * (x - mean_new) return self + def update_many(self, X: np.ndarray): + mean_old = self.mean.get() + self.mean.update_many(X) + mean_new = self.mean.get() + self._S += np.sum( + np.multiply(np.subtract(X, mean_old), np.subtract(X, mean_new)) + ) + return self + def get(self): if self.mean.n > self.ddof: return self._S / (self.mean.n - self.ddof)