Compare commits

...

2 Commits

Author SHA1 Message Date
Sebastian Goscik
5fa54615c8 Make purge db SELECT more specific 2025-07-29 12:07:05 +01:00
Sebastian Goscik
8e267017cd Rework missing event checker 2025-07-29 12:07:05 +01:00
2 changed files with 118 additions and 66 deletions

View File

@@ -2,11 +2,12 @@
import asyncio
import logging
from datetime import datetime
from typing import AsyncIterator, List, Set
from dataclasses import dataclass
from datetime import datetime, timezone
from dateutil.relativedelta import relativedelta
from typing import AsyncIterator, List, Set, Dict
import aiosqlite
from dateutil.relativedelta import relativedelta
from uiprotect import ProtectApiClient
from uiprotect.data.nvr import Event
from uiprotect.data.types import EventType
@@ -17,6 +18,14 @@ from unifi_protect_backup.utils import EVENT_TYPES_MAP, wanted_event_type
logger = logging.getLogger(__name__)
@dataclass
class MissingEvent:
"""Track missing events and how many attempts they have had."""
event: Event
attempts: int
class MissingEventChecker:
"""Periodically checks if any unifi protect events exist within the retention period that are not backed up."""
@@ -58,85 +67,107 @@ class MissingEventChecker:
self.ignore_cameras: Set[str] = ignore_cameras
self.cameras: Set[str] = cameras
self.interval: int = interval
self.missing_events: Dict[str, MissingEvent] = {}
self.last_check_time: datetime | None = None
async def _get_missing_events(self) -> AsyncIterator[Event]:
start_time = datetime.now() - self.retention
end_time = datetime.now()
async def _get_backedup_event_ids(self) -> Set[str]:
# Get ids of events successfully backed up, or ignored
async with self._db.execute("SELECT id FROM events") as cursor:
rows = await cursor.fetchall()
return {row[0] for row in rows}
async def _get_ongoing_event_ids(self) -> Set[str]:
# Get ids of events currently being downloaded
downloading_event_ids = {event.id for event in self._downloader.download_queue._queue} # type: ignore
current_download = self._downloader.current_event
if current_download is not None:
downloading_event_ids.add(current_download.id)
# Get ids of events currently being uploaded
uploading_event_ids = {event.id for event, video in self._downloader.upload_queue._queue} # type: ignore
for uploader in self._uploaders:
current_upload = uploader.current_event
if current_upload is not None:
uploading_event_ids.add(current_upload.id)
return downloading_event_ids | uploading_event_ids
async def _get_new_missing_events(self) -> AsyncIterator[MissingEvent]:
# If it's the first check we've done, check the entire retention period
if self.last_check_time is None:
start_time = datetime.now(timezone.utc) - self.retention
# Otherwise only check the time since the last check + a buffer period
# however, if the retention is smaller than the buffer, only check the
# retention period
else:
now = datetime.now(timezone.utc)
retention_start = now - self.retention
buffer_start = self.last_check_time - relativedelta(hours=3)
start_time = max(retention_start, buffer_start)
end_time = datetime.now(timezone.utc)
new_last_check_time = end_time
chunk_size = 500
# Check UniFi Protect for new missing events
while True:
# Get list of events that need to be backed up from unifi protect
logger.extra_debug(f"Fetching events for interval: {start_time} - {end_time}") # type: ignore
# Get list of events that need to be backed up from UniFi protect
logger.info(f"Fetching events for interval: {start_time} - {end_time}") # type: ignore
events_chunk = await self._protect.get_events(
start=start_time,
end=end_time,
types=list(EVENT_TYPES_MAP.keys()),
types=list(EVENT_TYPES_MAP.keys()), # TODO: Only request the types we want
limit=chunk_size,
)
if not events_chunk:
break # There were no events to backup
existing_ids = await self._get_backedup_event_ids() | await self._get_ongoing_event_ids()
# Filter out on-going events
unifi_events = {event.id: event for event in events_chunk if event.end is not None}
for event in events_chunk:
# Filter out on-going events
if event.end is None:
# Push back new_last_checked_time to before on-going events
if event.start < new_last_check_time:
new_last_check_time = event.start
continue
if not unifi_events:
break # No completed events to process
# Next chunks start time should be the start of the
# oldest complete event in the current chunk
if event.start > start_time:
start_time = event.start
# Next chunks start time should be the start of the oldest complete event in the current chunk
start_time = max([event.start for event in unifi_events.values() if event.end is not None])
# Skip backed up/in-progress events
if event.id in existing_ids:
continue
# Get list of events that have been backed up from the database
# Filter out unwanted event types
if not wanted_event_type(event, self.detection_types, self.cameras, self.ignore_cameras):
continue
# events(id, type, camera_id, start, end)
async with self._db.execute("SELECT * FROM events") as cursor:
rows = await cursor.fetchall()
db_event_ids = {row[0] for row in rows}
# Prevent re-adding events currently in the download/upload queue
downloading_event_ids = {event.id for event in self._downloader.download_queue._queue} # type: ignore
current_download = self._downloader.current_event
if current_download is not None:
downloading_event_ids.add(current_download.id)
uploading_event_ids = {event.id for event, video in self._downloader.upload_queue._queue} # type: ignore
for uploader in self._uploaders:
current_upload = uploader.current_event
if current_upload is not None:
uploading_event_ids.add(current_upload.id)
existing_ids = db_event_ids | downloading_event_ids | uploading_event_ids
missing_events = {
event_id: event for event_id, event in unifi_events.items() if event_id not in existing_ids
}
# Exclude events of unwanted types
wanted_events = {
event_id: event
for event_id, event in missing_events.items()
if wanted_event_type(event, self.detection_types, self.cameras, self.ignore_cameras)
}
# Yeild events one by one to allow the async loop to start other task while
# waiting on the full list of events
for event in wanted_events.values():
yield event
logger.extra_debug(f"Yielding new missing event '{event.id}'") # type: ignore[attr-defined]
yield MissingEvent(event, 0)
# Last chunk was in-complete, we can stop now
if len(events_chunk) < chunk_size:
break
async def ignore_missing(self):
"""Ignore missing events by adding them to the event table."""
logger.info(" Ignoring missing events")
self.last_check_time = new_last_check_time
async for event in self._get_missing_events():
logger.extra_debug(f"Ignoring event '{event.id}'")
await self._db.execute(
"INSERT INTO events VALUES "
f"('{event.id}', '{event.type.value}', '{event.camera_id}',"
f"'{event.start.timestamp()}', '{event.end.timestamp()}')"
)
async def _ignore_event(self, event, commit=True):
"""Ignore an event by adding them to the event table."""
logger.extra_debug(f"Ignoring event '{event.id}'") # type: ignore[attr-defined]
await self._db.execute(
"INSERT INTO events VALUES "
f"('{event.id}', '{event.type.value}', '{event.camera_id}',"
f"'{event.start.timestamp()}', '{event.end.timestamp()}')"
)
if commit:
await self._db.commit()
async def ignore_missing(self):
"""Ignore all missing events by adding them to the event table."""
logger.info(" Ignoring missing events")
async for missing_event in self._get_new_missing_events():
await self._ignore_event(missing_event.event, commit=False)
await self._db.commit()
async def start(self):
@@ -151,7 +182,28 @@ class MissingEventChecker:
logger.debug("Running check for missing events...")
async for event in self._get_missing_events():
logger.extra_debug("Checking for new missing events") # type: ignore[attr-defined]
async for missing_event in self._get_new_missing_events():
logger.debug(f"Found new missing event: '{missing_event.event.id}")
self.missing_events[missing_event.event.id] = missing_event
db_event_ids = await self._get_backedup_event_ids()
in_progress_ids = await self._get_ongoing_event_ids()
logger.extra_debug("Processing missing events") # type: ignore[attr-defined]
for missing_event in self.missing_events.copy().values():
event = missing_event.event
# it has been backed up, stop tracking it
if event.id in db_event_ids:
del self.missing_events[event.id]
logger.debug(f"Missing event '{event.id}' backed up")
continue
# it is in progress, we need to wait
elif event.id in in_progress_ids:
continue
if not shown_warning:
logger.warning(" Found missing events, adding to backup queue")
shown_warning = True
@@ -162,10 +214,10 @@ class MissingEventChecker:
event_name = f"{event.id} ({', '.join(event.smart_detect_types)})"
logger.extra_debug(
f" Adding missing event to backup queue: {event_name}"
f" Adding missing event to download queue: {event_name}"
f" ({event.start.strftime('%Y-%m-%dT%H-%M-%S')} -"
f" {event.end.strftime('%Y-%m-%dT%H-%M-%S')})"
)
) # type: ignore[attr-defined]
await self._download_queue.put(event)
except Exception as e:

View File

@@ -62,9 +62,9 @@ class Purge:
# For every event older than the retention time
retention_oldest_time = time.mktime((datetime.now() - self.retention).timetuple())
async with self._db.execute(
f"SELECT * FROM events WHERE end < {retention_oldest_time}"
f"SELECT id FROM events WHERE end < {retention_oldest_time}"
) as event_cursor:
async for event_id, event_type, camera_id, event_start, event_end in event_cursor: # noqa: B007
async for (event_id,) in event_cursor: # noqa: B007
logger.info(f"Purging event: {event_id}.")
# For every backup for this event