Update incremental processor to use new interfaces and track total_events

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
This commit is contained in:
Olivier Wilkinson (reivilibre) 2019-08-08 11:43:50 +01:00
parent a59da511ce
commit 1c9732d64b

View file

@ -85,27 +85,36 @@ class StatsHandler(StateDeltasHandler):
@defer.inlineCallbacks
def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = yield self.store.get_stats_stream_pos()
if self.pos is None or None in self.pos.values():
self.pos = yield self.store.get_stats_positions()
# If still None then the initial background update hasn't happened yet
if self.pos is None:
# If still None then the initial background update hasn't started yet
if self.pos is None or None in self.pos.values():
defer.returnValue(None)
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "stats_delta"):
deltas = yield self.store.get_current_state_deltas(self.pos)
with Measure(self.clock, "stats_delta"):
while True:
deltas = yield self.store.get_current_state_deltas(
self.pos["state_delta_stream_id"]
)
if not deltas:
return
break
logger.info("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
self.pos = deltas[-1]["stream_id"]
yield self.store.update_stats_stream_pos(self.pos)
self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
event_processing_positions.labels("stats").set(self.pos)
event_processing_positions.labels("stats").set(
self.pos["state_delta_stream_id"]
)
if self.pos is not None:
yield self.store.update_stats_positions(self.pos)
with Measure(self.clock, "stats_total_events"):
self.pos = yield self.store.incremental_update_total_events(self.pos)
@defer.inlineCallbacks
def _handle_deltas(self, deltas):