Fix uploading files not being accounted for when checking for missing events

This commit is contained in:
Sebastian Goscik
2022-12-09 11:11:04 +00:00
parent 8e3ea2b13f
commit 0978798078
4 changed files with 47 additions and 22 deletions

View File

@@ -39,11 +39,12 @@ async def get_video_length(video: bytes) -> float:
class VideoDownloader: class VideoDownloader:
"""Downloads event video clips from Unifi Protect""" """Downloads event video clips from Unifi Protect"""
def __init__(self, protect: ProtectApiClient, download_queue: asyncio.Queue, buffer_size: int = 256 * 1024 * 1024): def __init__(self, protect: ProtectApiClient, download_queue: asyncio.Queue, upload_queue: VideoQueue):
self._protect: ProtectApiClient = protect self._protect: ProtectApiClient = protect
self._download_queue: asyncio.Queue = download_queue self.download_queue: asyncio.Queue = download_queue
self.video_queue = VideoQueue(buffer_size) self.upload_queue: VideoQueue = upload_queue
self.logger = logging.LoggerAdapter(logger, {'event': ''}) self.logger = logging.LoggerAdapter(logger, {'event': ''})
self.current_event = None
# Check if `ffprobe` is available # Check if `ffprobe` is available
ffprobe = shutil.which('ffprobe') ffprobe = shutil.which('ffprobe')
@@ -58,7 +59,8 @@ class VideoDownloader:
self.logger.info("Starting Downloader") self.logger.info("Starting Downloader")
while True: while True:
try: try:
event = await self._download_queue.get() event = await self.download_queue.get()
self.current_event = event
self.logger = logging.LoggerAdapter(logger, {'event': f' [{event.id}]'}) self.logger = logging.LoggerAdapter(logger, {'event': f' [{event.id}]'})
# Fix timezones since pyunifiprotect sets all timestamps to UTC. Instead localize them to # Fix timezones since pyunifiprotect sets all timestamps to UTC. Instead localize them to
@@ -67,9 +69,9 @@ class VideoDownloader:
event.end = event.end.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone) event.end = event.end.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone)
self.logger.info(f"Downloading event: {event.id}") self.logger.info(f"Downloading event: {event.id}")
self.logger.debug(f"Remaining Download Queue: {self._download_queue.qsize()}") self.logger.debug(f"Remaining Download Queue: {self.download_queue.qsize()}")
output_queue_current_size = human_readable_size(self.video_queue.qsize()) output_queue_current_size = human_readable_size(self.upload_queue.qsize())
output_queue_max_size = human_readable_size(self.video_queue.maxsize) output_queue_max_size = human_readable_size(self.upload_queue.maxsize)
self.logger.debug(f"Video Download Buffer: {output_queue_current_size}/{output_queue_max_size}") self.logger.debug(f"Video Download Buffer: {output_queue_current_size}/{output_queue_max_size}")
self.logger.debug(f" Camera: {await get_camera_name(self._protect, event.camera_id)}") self.logger.debug(f" Camera: {await get_camera_name(self._protect, event.camera_id)}")
if event.type == EventType.SMART_DETECT: if event.type == EventType.SMART_DETECT:
@@ -102,7 +104,7 @@ class VideoDownloader:
if self._has_ffprobe: if self._has_ffprobe:
await self._check_video_length(video, duration) await self._check_video_length(video, duration)
await self.video_queue.put((event, video)) await self.upload_queue.put((event, video))
self.logger.debug("Added to upload queue") self.logger.debug("Added to upload queue")
except Exception as e: except Exception as e:

View File

@@ -9,6 +9,8 @@ from dateutil.relativedelta import relativedelta
from pyunifiprotect import ProtectApiClient from pyunifiprotect import ProtectApiClient
from pyunifiprotect.data.types import EventType from pyunifiprotect.data.types import EventType
from unifi_protect_backup import VideoDownloader, VideoUploader
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -19,7 +21,9 @@ class MissingEventChecker:
self, self,
protect: ProtectApiClient, protect: ProtectApiClient,
db: aiosqlite.Connection, db: aiosqlite.Connection,
event_queue: asyncio.Queue, download_queue: asyncio.Queue,
downloader: VideoDownloader,
uploader: VideoUploader,
retention: relativedelta, retention: relativedelta,
detection_types: List[str], detection_types: List[str],
ignore_cameras: List[str], ignore_cameras: List[str],
@@ -27,7 +31,9 @@ class MissingEventChecker:
) -> None: ) -> None:
self._protect: ProtectApiClient = protect self._protect: ProtectApiClient = protect
self._db: aiosqlite.Connection = db self._db: aiosqlite.Connection = db
self._event_queue: asyncio.Queue = event_queue self._download_queue: asyncio.Queue = download_queue
self._downloader: VideoDownloader = downloader
self._uploader: VideoUploader = uploader
self.retention: relativedelta = retention self.retention: relativedelta = retention
self.detection_types: List[str] = detection_types self.detection_types: List[str] = detection_types
self.ignore_cameras: List[str] = ignore_cameras self.ignore_cameras: List[str] = ignore_cameras
@@ -54,10 +60,15 @@ class MissingEventChecker:
rows = await cursor.fetchall() rows = await cursor.fetchall()
db_event_ids = {row[0] for row in rows} db_event_ids = {row[0] for row in rows}
# Prevent re-adding events currently in the download queue # Prevent re-adding events currently in the download/upload queue
downloading_event_ids = {event.id for event in self._event_queue._queue} downloading_event_ids = {event.id for event in self._downloader.download_queue._queue}
downloading_event_ids.add(self._downloader.current_event)
uploading_event_ids = {event.id for event in self._uploader.upload_queue._queue}
uploading_event_ids.add(self._uploader.current_event)
missing_event_ids = set(unifi_events.keys()) - (db_event_ids | downloading_event_ids) missing_event_ids = set(unifi_events.keys()) - (
db_event_ids | downloading_event_ids | uploading_event_ids
)
logger.debug(f" Total undownloaded events: {len(missing_event_ids)}") logger.debug(f" Total undownloaded events: {len(missing_event_ids)}")
def wanted_event_type(event_id): def wanted_event_type(event_id):
@@ -93,7 +104,7 @@ class MissingEventChecker:
missing_logger( missing_logger(
f" Adding missing event to backup queue: {event.id} ({', '.join(event.smart_detect_types)}) ({event.start.strftime('%Y-%m-%dT%H-%M-%S')} - {event.end.strftime('%Y-%m-%dT%H-%M-%S')})" f" Adding missing event to backup queue: {event.id} ({', '.join(event.smart_detect_types)}) ({event.start.strftime('%Y-%m-%dT%H-%M-%S')} - {event.end.strftime('%Y-%m-%dT%H-%M-%S')})"
) )
await self._event_queue.put(event) await self._download_queue.put(event)
except Exception as e: except Exception as e:
logger.warn(f"Unexpected exception occurred during missing event check:") logger.warn(f"Unexpected exception occurred during missing event check:")

View File

@@ -20,6 +20,7 @@ from unifi_protect_backup.utils import (
run_command, run_command,
setup_logging, setup_logging,
human_readable_size, human_readable_size,
VideoQueue,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -169,21 +170,22 @@ class UnifiProtectBackup:
else: else:
self._db = await aiosqlite.connect(self._sqlite_path) self._db = await aiosqlite.connect(self._sqlite_path)
event_queue = asyncio.Queue() download_queue = asyncio.Queue()
upload_queue = VideoQueue(self._download_buffer_size)
# Enable foreign keys in the database # Enable foreign keys in the database
await self._db.execute("PRAGMA foreign_keys = ON;") await self._db.execute("PRAGMA foreign_keys = ON;")
# Create downloader task # Create downloader task
# This will download video files to its buffer # This will download video files to its buffer
downloader = VideoDownloader(self._protect, event_queue, buffer_size=self._download_buffer_size) downloader = VideoDownloader(self._protect, download_queue, upload_queue)
tasks.append(asyncio.create_task(downloader.start())) tasks.append(asyncio.create_task(downloader.start()))
# Create upload task # Create upload task
# This will upload the videos in the downloader's buffer to the rclone remotes and log it in the database # This will upload the videos in the downloader's buffer to the rclone remotes and log it in the database
uploader = VideoUploader( uploader = VideoUploader(
self._protect, self._protect,
downloader.video_queue, upload_queue,
self.rclone_destination, self.rclone_destination,
self.rclone_args, self.rclone_args,
self.file_structure_format, self.file_structure_format,
@@ -194,7 +196,7 @@ class UnifiProtectBackup:
# Create event listener task # Create event listener task
# This will connect to the unifi protect websocket and listen for events. When one is detected it will # This will connect to the unifi protect websocket and listen for events. When one is detected it will
# be added to the queue of events to download # be added to the queue of events to download
event_listener = EventListener(event_queue, self._protect, self.detection_types, self.ignore_cameras) event_listener = EventListener(download_queue, self._protect, self.detection_types, self.ignore_cameras)
tasks.append(asyncio.create_task(event_listener.start())) tasks.append(asyncio.create_task(event_listener.start()))
# Create purge task # Create purge task
@@ -206,7 +208,14 @@ class UnifiProtectBackup:
# This will check all the events within the retention period, if any have been missed and not backed up # This will check all the events within the retention period, if any have been missed and not backed up
# they will be added to the event queue # they will be added to the event queue
missing = MissingEventChecker( missing = MissingEventChecker(
self._protect, self._db, event_queue, self.retention, self.detection_types, self.ignore_cameras self._protect,
self._db,
download_queue,
downloader,
uploader,
self.retention,
self.detection_types,
self.ignore_cameras,
) )
tasks.append(asyncio.create_task(missing.start())) tasks.append(asyncio.create_task(missing.start()))

View File

@@ -23,19 +23,20 @@ class VideoUploader:
def __init__( def __init__(
self, self,
protect: ProtectApiClient, protect: ProtectApiClient,
video_queue: VideoQueue, upload_queue: VideoQueue,
rclone_destination: str, rclone_destination: str,
rclone_args: str, rclone_args: str,
file_structure_format: str, file_structure_format: str,
db: aiosqlite.Connection, db: aiosqlite.Connection,
): ):
self._protect: ProtectApiClient = protect self._protect: ProtectApiClient = protect
self._video_queue: VideoQueue = video_queue self.upload_queue: VideoQueue = upload_queue
self._rclone_destination: str = rclone_destination self._rclone_destination: str = rclone_destination
self._rclone_args: str = rclone_args self._rclone_args: str = rclone_args
self._file_structure_format: str = file_structure_format self._file_structure_format: str = file_structure_format
self._db: aiosqlite.Connection = db self._db: aiosqlite.Connection = db
self.logger = logging.LoggerAdapter(logger, {'event': ''}) self.logger = logging.LoggerAdapter(logger, {'event': ''})
self.current_event = None
async def start(self): async def start(self):
"""Main loop """Main loop
@@ -46,7 +47,9 @@ class VideoUploader:
self.logger.info("Starting Uploader") self.logger.info("Starting Uploader")
while True: while True:
try: try:
event, video = await self._video_queue.get() event, video = await self.upload_queue.get()
self.current_event = event
self.logger = logging.LoggerAdapter(logger, {'event': f' [{event.id}]'}) self.logger = logging.LoggerAdapter(logger, {'event': f' [{event.id}]'})
self.logger.info(f"Uploading event: {event.id}") self.logger.info(f"Uploading event: {event.id}")