mirror of
https://github.com/ep1cman/unifi-protect-backup.git
synced 2025-12-05 23:53:30 +00:00
Compare commits
2 Commits
5b7f105102
...
5fa54615c8
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5fa54615c8 | ||
|
|
8e267017cd |
@@ -2,11 +2,12 @@
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import AsyncIterator, List, Set
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from dateutil.relativedelta import relativedelta
|
||||
from typing import AsyncIterator, List, Set, Dict
|
||||
|
||||
import aiosqlite
|
||||
from dateutil.relativedelta import relativedelta
|
||||
from uiprotect import ProtectApiClient
|
||||
from uiprotect.data.nvr import Event
|
||||
from uiprotect.data.types import EventType
|
||||
@@ -17,6 +18,14 @@ from unifi_protect_backup.utils import EVENT_TYPES_MAP, wanted_event_type
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MissingEvent:
|
||||
"""Track missing events and how many attempts they have had."""
|
||||
|
||||
event: Event
|
||||
attempts: int
|
||||
|
||||
|
||||
class MissingEventChecker:
|
||||
"""Periodically checks if any unifi protect events exist within the retention period that are not backed up."""
|
||||
|
||||
@@ -58,85 +67,107 @@ class MissingEventChecker:
|
||||
self.ignore_cameras: Set[str] = ignore_cameras
|
||||
self.cameras: Set[str] = cameras
|
||||
self.interval: int = interval
|
||||
self.missing_events: Dict[str, MissingEvent] = {}
|
||||
self.last_check_time: datetime | None = None
|
||||
|
||||
async def _get_missing_events(self) -> AsyncIterator[Event]:
|
||||
start_time = datetime.now() - self.retention
|
||||
end_time = datetime.now()
|
||||
async def _get_backedup_event_ids(self) -> Set[str]:
|
||||
# Get ids of events successfully backed up, or ignored
|
||||
async with self._db.execute("SELECT id FROM events") as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
return {row[0] for row in rows}
|
||||
|
||||
async def _get_ongoing_event_ids(self) -> Set[str]:
|
||||
# Get ids of events currently being downloaded
|
||||
downloading_event_ids = {event.id for event in self._downloader.download_queue._queue} # type: ignore
|
||||
current_download = self._downloader.current_event
|
||||
if current_download is not None:
|
||||
downloading_event_ids.add(current_download.id)
|
||||
|
||||
# Get ids of events currently being uploaded
|
||||
uploading_event_ids = {event.id for event, video in self._downloader.upload_queue._queue} # type: ignore
|
||||
for uploader in self._uploaders:
|
||||
current_upload = uploader.current_event
|
||||
if current_upload is not None:
|
||||
uploading_event_ids.add(current_upload.id)
|
||||
|
||||
return downloading_event_ids | uploading_event_ids
|
||||
|
||||
async def _get_new_missing_events(self) -> AsyncIterator[MissingEvent]:
|
||||
# If it's the first check we've done, check the entire retention period
|
||||
if self.last_check_time is None:
|
||||
start_time = datetime.now(timezone.utc) - self.retention
|
||||
# Otherwise only check the time since the last check + a buffer period
|
||||
# however, if the retention is smaller than the buffer, only check the
|
||||
# retention period
|
||||
else:
|
||||
now = datetime.now(timezone.utc)
|
||||
retention_start = now - self.retention
|
||||
buffer_start = self.last_check_time - relativedelta(hours=3)
|
||||
start_time = max(retention_start, buffer_start)
|
||||
|
||||
end_time = datetime.now(timezone.utc)
|
||||
new_last_check_time = end_time
|
||||
chunk_size = 500
|
||||
|
||||
# Check UniFi Protect for new missing events
|
||||
while True:
|
||||
# Get list of events that need to be backed up from unifi protect
|
||||
logger.extra_debug(f"Fetching events for interval: {start_time} - {end_time}") # type: ignore
|
||||
# Get list of events that need to be backed up from UniFi protect
|
||||
logger.info(f"Fetching events for interval: {start_time} - {end_time}") # type: ignore
|
||||
events_chunk = await self._protect.get_events(
|
||||
start=start_time,
|
||||
end=end_time,
|
||||
types=list(EVENT_TYPES_MAP.keys()),
|
||||
types=list(EVENT_TYPES_MAP.keys()), # TODO: Only request the types we want
|
||||
limit=chunk_size,
|
||||
)
|
||||
|
||||
if not events_chunk:
|
||||
break # There were no events to backup
|
||||
existing_ids = await self._get_backedup_event_ids() | await self._get_ongoing_event_ids()
|
||||
|
||||
# Filter out on-going events
|
||||
unifi_events = {event.id: event for event in events_chunk if event.end is not None}
|
||||
for event in events_chunk:
|
||||
# Filter out on-going events
|
||||
if event.end is None:
|
||||
# Push back new_last_checked_time to before on-going events
|
||||
if event.start < new_last_check_time:
|
||||
new_last_check_time = event.start
|
||||
continue
|
||||
|
||||
if not unifi_events:
|
||||
break # No completed events to process
|
||||
# Next chunks start time should be the start of the
|
||||
# oldest complete event in the current chunk
|
||||
if event.start > start_time:
|
||||
start_time = event.start
|
||||
|
||||
# Next chunks start time should be the start of the oldest complete event in the current chunk
|
||||
start_time = max([event.start for event in unifi_events.values() if event.end is not None])
|
||||
# Skip backed up/in-progress events
|
||||
if event.id in existing_ids:
|
||||
continue
|
||||
|
||||
# Get list of events that have been backed up from the database
|
||||
# Filter out unwanted event types
|
||||
if not wanted_event_type(event, self.detection_types, self.cameras, self.ignore_cameras):
|
||||
continue
|
||||
|
||||
# events(id, type, camera_id, start, end)
|
||||
async with self._db.execute("SELECT * FROM events") as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
db_event_ids = {row[0] for row in rows}
|
||||
|
||||
# Prevent re-adding events currently in the download/upload queue
|
||||
downloading_event_ids = {event.id for event in self._downloader.download_queue._queue} # type: ignore
|
||||
current_download = self._downloader.current_event
|
||||
if current_download is not None:
|
||||
downloading_event_ids.add(current_download.id)
|
||||
|
||||
uploading_event_ids = {event.id for event, video in self._downloader.upload_queue._queue} # type: ignore
|
||||
for uploader in self._uploaders:
|
||||
current_upload = uploader.current_event
|
||||
if current_upload is not None:
|
||||
uploading_event_ids.add(current_upload.id)
|
||||
|
||||
existing_ids = db_event_ids | downloading_event_ids | uploading_event_ids
|
||||
missing_events = {
|
||||
event_id: event for event_id, event in unifi_events.items() if event_id not in existing_ids
|
||||
}
|
||||
|
||||
# Exclude events of unwanted types
|
||||
wanted_events = {
|
||||
event_id: event
|
||||
for event_id, event in missing_events.items()
|
||||
if wanted_event_type(event, self.detection_types, self.cameras, self.ignore_cameras)
|
||||
}
|
||||
|
||||
# Yeild events one by one to allow the async loop to start other task while
|
||||
# waiting on the full list of events
|
||||
for event in wanted_events.values():
|
||||
yield event
|
||||
logger.extra_debug(f"Yielding new missing event '{event.id}'") # type: ignore[attr-defined]
|
||||
yield MissingEvent(event, 0)
|
||||
|
||||
# Last chunk was in-complete, we can stop now
|
||||
if len(events_chunk) < chunk_size:
|
||||
break
|
||||
|
||||
async def ignore_missing(self):
|
||||
"""Ignore missing events by adding them to the event table."""
|
||||
logger.info(" Ignoring missing events")
|
||||
self.last_check_time = new_last_check_time
|
||||
|
||||
async for event in self._get_missing_events():
|
||||
logger.extra_debug(f"Ignoring event '{event.id}'")
|
||||
await self._db.execute(
|
||||
"INSERT INTO events VALUES "
|
||||
f"('{event.id}', '{event.type.value}', '{event.camera_id}',"
|
||||
f"'{event.start.timestamp()}', '{event.end.timestamp()}')"
|
||||
)
|
||||
async def _ignore_event(self, event, commit=True):
|
||||
"""Ignore an event by adding them to the event table."""
|
||||
logger.extra_debug(f"Ignoring event '{event.id}'") # type: ignore[attr-defined]
|
||||
await self._db.execute(
|
||||
"INSERT INTO events VALUES "
|
||||
f"('{event.id}', '{event.type.value}', '{event.camera_id}',"
|
||||
f"'{event.start.timestamp()}', '{event.end.timestamp()}')"
|
||||
)
|
||||
if commit:
|
||||
await self._db.commit()
|
||||
|
||||
async def ignore_missing(self):
|
||||
"""Ignore all missing events by adding them to the event table."""
|
||||
logger.info(" Ignoring missing events")
|
||||
async for missing_event in self._get_new_missing_events():
|
||||
await self._ignore_event(missing_event.event, commit=False)
|
||||
await self._db.commit()
|
||||
|
||||
async def start(self):
|
||||
@@ -151,7 +182,28 @@ class MissingEventChecker:
|
||||
|
||||
logger.debug("Running check for missing events...")
|
||||
|
||||
async for event in self._get_missing_events():
|
||||
logger.extra_debug("Checking for new missing events") # type: ignore[attr-defined]
|
||||
async for missing_event in self._get_new_missing_events():
|
||||
logger.debug(f"Found new missing event: '{missing_event.event.id}")
|
||||
self.missing_events[missing_event.event.id] = missing_event
|
||||
|
||||
db_event_ids = await self._get_backedup_event_ids()
|
||||
in_progress_ids = await self._get_ongoing_event_ids()
|
||||
|
||||
logger.extra_debug("Processing missing events") # type: ignore[attr-defined]
|
||||
for missing_event in self.missing_events.copy().values():
|
||||
event = missing_event.event
|
||||
|
||||
# it has been backed up, stop tracking it
|
||||
if event.id in db_event_ids:
|
||||
del self.missing_events[event.id]
|
||||
logger.debug(f"Missing event '{event.id}' backed up")
|
||||
continue
|
||||
|
||||
# it is in progress, we need to wait
|
||||
elif event.id in in_progress_ids:
|
||||
continue
|
||||
|
||||
if not shown_warning:
|
||||
logger.warning(" Found missing events, adding to backup queue")
|
||||
shown_warning = True
|
||||
@@ -162,10 +214,10 @@ class MissingEventChecker:
|
||||
event_name = f"{event.id} ({', '.join(event.smart_detect_types)})"
|
||||
|
||||
logger.extra_debug(
|
||||
f" Adding missing event to backup queue: {event_name}"
|
||||
f" Adding missing event to download queue: {event_name}"
|
||||
f" ({event.start.strftime('%Y-%m-%dT%H-%M-%S')} -"
|
||||
f" {event.end.strftime('%Y-%m-%dT%H-%M-%S')})"
|
||||
)
|
||||
) # type: ignore[attr-defined]
|
||||
await self._download_queue.put(event)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -62,9 +62,9 @@ class Purge:
|
||||
# For every event older than the retention time
|
||||
retention_oldest_time = time.mktime((datetime.now() - self.retention).timetuple())
|
||||
async with self._db.execute(
|
||||
f"SELECT * FROM events WHERE end < {retention_oldest_time}"
|
||||
f"SELECT id FROM events WHERE end < {retention_oldest_time}"
|
||||
) as event_cursor:
|
||||
async for event_id, event_type, camera_id, event_start, event_end in event_cursor: # noqa: B007
|
||||
async for (event_id,) in event_cursor: # noqa: B007
|
||||
logger.info(f"Purging event: {event_id}.")
|
||||
|
||||
# For every backup for this event
|
||||
|
||||
Reference in New Issue
Block a user