Compare commits

...

2 Commits

Author SHA1 Message Date
Sebastian Goscik
df2dec57a8 Capture existing_ids after freezing timestamps 2025-07-29 20:24:35 +01:00
Sebastian Goscik
9b113f3609 Review feedback 2025-07-29 20:13:47 +01:00

View File

@@ -69,6 +69,7 @@ class MissingEventChecker:
self.interval: int = interval
self.missing_events: Dict[str, MissingEvent] = {}
self.last_check_time: datetime | None = None
self.shown_warning: bool = False
async def _get_backedup_event_ids(self) -> Set[str]:
# Get ids of events successfully backed up, or ignored
@@ -92,23 +93,28 @@ class MissingEventChecker:
return downloading_event_ids | uploading_event_ids
async def _get_new_missing_events(self) -> AsyncIterator[MissingEvent]:
async def _get_new_missing_events(self) -> AsyncIterator[Event]:
now = datetime.now(timezone.utc)
retention_start = now - self.retention
# If it's the first check we've done, check the entire retention period
if self.last_check_time is None:
start_time = datetime.now(timezone.utc) - self.retention
start_time = retention_start
# Otherwise only check the time since the last check + a buffer period
# however, if the retention is smaller than the buffer, only check the
# however, if the retention is smaller than the buffer, check the whole
# retention period
else:
now = datetime.now(timezone.utc)
retention_start = now - self.retention
buffer_start = self.last_check_time - relativedelta(hours=3)
start_time = max(retention_start, buffer_start)
end_time = datetime.now(timezone.utc)
end_time = now
new_last_check_time = end_time
chunk_size = 500
existing_ids = (
await self._get_ongoing_event_ids() | await self._get_backedup_event_ids() | set(self.missing_events.keys())
)
# Check UniFi Protect for new missing events
while True:
# Get list of events that need to be backed up from UniFi protect
@@ -120,8 +126,6 @@ class MissingEventChecker:
limit=chunk_size,
)
existing_ids = await self._get_backedup_event_ids() | await self._get_ongoing_event_ids()
for event in events_chunk:
# Filter out on-going events
if event.end is None:
@@ -144,7 +148,7 @@ class MissingEventChecker:
continue
logger.extra_debug(f"Yielding new missing event '{event.id}'") # type: ignore[attr-defined]
yield MissingEvent(event, 0)
yield event
# Last chunk was in-complete, we can stop now
if len(events_chunk) < chunk_size:
@@ -170,27 +174,39 @@ class MissingEventChecker:
await self._ignore_event(missing_event.event, commit=False)
await self._db.commit()
async def _add_to_download_queue(self, event: Event):
if not self.shown_warning:
logger.warning(" Found missing events, adding to backup queue")
self.shown_warning = True
if event.type != EventType.SMART_DETECT:
event_name = f"{event.id} ({event.type.value})"
else:
event_name = f"{event.id} ({', '.join(event.smart_detect_types)})"
logger.extra_debug( # type: ignore[attr-defined]
f" Adding missing event to download queue: {event_name}"
f" ({event.start.strftime('%Y-%m-%dT%H-%M-%S')} -"
f" {event.end.strftime('%Y-%m-%dT%H-%M-%S')})"
)
await self._download_queue.put(event)
async def start(self):
"""Run main loop."""
logger.info("Starting Missing Event Checker")
while True:
try:
shown_warning = False
self.shown_warning = False
# Wait for unifi protect to be connected
await self._protect.connect_event.wait()
logger.debug("Running check for missing events...")
logger.extra_debug("Checking for new missing events") # type: ignore[attr-defined]
async for missing_event in self._get_new_missing_events():
logger.debug(f"Found new missing event: '{missing_event.event.id}")
self.missing_events[missing_event.event.id] = missing_event
db_event_ids = await self._get_backedup_event_ids()
in_progress_ids = await self._get_ongoing_event_ids()
db_event_ids = await self._get_backedup_event_ids()
logger.extra_debug("Processing missing events") # type: ignore[attr-defined]
logger.extra_debug("Checking for previously missing events")
for missing_event in self.missing_events.copy().values():
event = missing_event.event
@@ -201,24 +217,16 @@ class MissingEventChecker:
continue
# it is in progress, we need to wait
elif event.id in in_progress_ids:
if event.id in in_progress_ids:
continue
if not shown_warning:
logger.warning(" Found missing events, adding to backup queue")
shown_warning = True
await self._add_to_download_queue(event)
if event.type != EventType.SMART_DETECT:
event_name = f"{event.id} ({event.type.value})"
else:
event_name = f"{event.id} ({', '.join(event.smart_detect_types)})"
logger.extra_debug(
f" Adding missing event to download queue: {event_name}"
f" ({event.start.strftime('%Y-%m-%dT%H-%M-%S')} -"
f" {event.end.strftime('%Y-%m-%dT%H-%M-%S')})"
) # type: ignore[attr-defined]
await self._download_queue.put(event)
logger.extra_debug("Checking for new missing events") # type: ignore[attr-defined]
async for event in self._get_new_missing_events():
logger.debug(f"Found new missing event: '{event.id}")
self.missing_events[event.id] = MissingEvent(event, 0)
await self._add_to_download_queue(event)
except Exception as e:
logger.error(