mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-28 15:08:49 +03:00
Quick & dirty metric for background update status (#15740)
* Quick & dirty metric for background update status * Changelog * Remove debug Co-authored-by: Mathieu Velten <mathieuv@matrix.org> * Actually write to _aborted --------- Co-authored-by: Mathieu Velten <mathieuv@matrix.org>
This commit is contained in:
parent
e536f02f68
commit
d162aecaac
4 changed files with 40 additions and 1 deletions
1
changelog.d/15740.feature
Normal file
1
changelog.d/15740.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Expose a metric reporting the database background update status.
|
|
@ -77,6 +77,8 @@ RegistryProxy = cast(CollectorRegistry, _RegistryProxy)
|
||||||
|
|
||||||
@attr.s(slots=True, hash=True, auto_attribs=True)
|
@attr.s(slots=True, hash=True, auto_attribs=True)
|
||||||
class LaterGauge(Collector):
|
class LaterGauge(Collector):
|
||||||
|
"""A Gauge which periodically calls a user-provided callback to produce metrics."""
|
||||||
|
|
||||||
name: str
|
name: str
|
||||||
desc: str
|
desc: str
|
||||||
labels: Optional[Sequence[str]] = attr.ib(hash=False)
|
labels: Optional[Sequence[str]] = attr.ib(hash=False)
|
||||||
|
|
|
@ -12,6 +12,7 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import logging
|
import logging
|
||||||
|
from enum import IntEnum
|
||||||
from types import TracebackType
|
from types import TracebackType
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
@ -136,6 +137,15 @@ class BackgroundUpdatePerformance:
|
||||||
return float(self.total_item_count) / float(self.total_duration_ms)
|
return float(self.total_item_count) / float(self.total_duration_ms)
|
||||||
|
|
||||||
|
|
||||||
|
class UpdaterStatus(IntEnum):
|
||||||
|
# Use negative values for error conditions.
|
||||||
|
ABORTED = -1
|
||||||
|
DISABLED = 0
|
||||||
|
NOT_STARTED = 1
|
||||||
|
RUNNING_UPDATE = 2
|
||||||
|
COMPLETE = 3
|
||||||
|
|
||||||
|
|
||||||
class BackgroundUpdater:
|
class BackgroundUpdater:
|
||||||
"""Background updates are updates to the database that run in the
|
"""Background updates are updates to the database that run in the
|
||||||
background. Each update processes a batch of data at once. We attempt to
|
background. Each update processes a batch of data at once. We attempt to
|
||||||
|
@ -158,11 +168,16 @@ class BackgroundUpdater:
|
||||||
|
|
||||||
self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {}
|
self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {}
|
||||||
self._background_update_handlers: Dict[str, _BackgroundUpdateHandler] = {}
|
self._background_update_handlers: Dict[str, _BackgroundUpdateHandler] = {}
|
||||||
|
# TODO: all these bool flags make me feel icky---can we combine into a status
|
||||||
|
# enum?
|
||||||
self._all_done = False
|
self._all_done = False
|
||||||
|
|
||||||
# Whether we're currently running updates
|
# Whether we're currently running updates
|
||||||
self._running = False
|
self._running = False
|
||||||
|
|
||||||
|
# Marker to be set if we abort and halt all background updates.
|
||||||
|
self._aborted = False
|
||||||
|
|
||||||
# Whether background updates are enabled. This allows us to
|
# Whether background updates are enabled. This allows us to
|
||||||
# enable/disable background updates via the admin API.
|
# enable/disable background updates via the admin API.
|
||||||
self.enabled = True
|
self.enabled = True
|
||||||
|
@ -175,6 +190,20 @@ class BackgroundUpdater:
|
||||||
self.sleep_duration_ms = hs.config.background_updates.sleep_duration_ms
|
self.sleep_duration_ms = hs.config.background_updates.sleep_duration_ms
|
||||||
self.sleep_enabled = hs.config.background_updates.sleep_enabled
|
self.sleep_enabled = hs.config.background_updates.sleep_enabled
|
||||||
|
|
||||||
|
def get_status(self) -> UpdaterStatus:
|
||||||
|
"""An integer summarising the updater status. Used as a metric."""
|
||||||
|
if self._aborted:
|
||||||
|
return UpdaterStatus.ABORTED
|
||||||
|
# TODO: a status for "have seen at least one failure, but haven't aborted yet".
|
||||||
|
if not self.enabled:
|
||||||
|
return UpdaterStatus.DISABLED
|
||||||
|
|
||||||
|
if self._all_done:
|
||||||
|
return UpdaterStatus.COMPLETE
|
||||||
|
if self._running:
|
||||||
|
return UpdaterStatus.RUNNING_UPDATE
|
||||||
|
return UpdaterStatus.NOT_STARTED
|
||||||
|
|
||||||
def register_update_controller_callbacks(
|
def register_update_controller_callbacks(
|
||||||
self,
|
self,
|
||||||
on_update: ON_UPDATE_CALLBACK,
|
on_update: ON_UPDATE_CALLBACK,
|
||||||
|
@ -296,6 +325,7 @@ class BackgroundUpdater:
|
||||||
except Exception:
|
except Exception:
|
||||||
back_to_back_failures += 1
|
back_to_back_failures += 1
|
||||||
if back_to_back_failures >= 5:
|
if back_to_back_failures >= 5:
|
||||||
|
self._aborted = True
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
"5 back-to-back background update failures; aborting."
|
"5 back-to-back background update failures; aborting."
|
||||||
)
|
)
|
||||||
|
|
|
@ -54,7 +54,7 @@ from synapse.logging.context import (
|
||||||
current_context,
|
current_context,
|
||||||
make_deferred_yieldable,
|
make_deferred_yieldable,
|
||||||
)
|
)
|
||||||
from synapse.metrics import register_threadpool
|
from synapse.metrics import LaterGauge, register_threadpool
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.storage.background_updates import BackgroundUpdater
|
from synapse.storage.background_updates import BackgroundUpdater
|
||||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||||
|
@ -547,6 +547,12 @@ class DatabasePool:
|
||||||
self._db_pool = make_pool(hs.get_reactor(), database_config, engine)
|
self._db_pool = make_pool(hs.get_reactor(), database_config, engine)
|
||||||
|
|
||||||
self.updates = BackgroundUpdater(hs, self)
|
self.updates = BackgroundUpdater(hs, self)
|
||||||
|
LaterGauge(
|
||||||
|
"synapse_background_update_status",
|
||||||
|
"Background update status",
|
||||||
|
[],
|
||||||
|
self.updates.get_status,
|
||||||
|
)
|
||||||
|
|
||||||
self._previous_txn_total_time = 0.0
|
self._previous_txn_total_time = 0.0
|
||||||
self._current_txn_total_time = 0.0
|
self._current_txn_total_time = 0.0
|
||||||
|
|
Loading…
Reference in a new issue