Major Restructure

- Each task is now its own class
- Added a database to track backed up events and their destinations
- Added task to check for and backup missed events
This commit is contained in:
Sebastian Goscik
2022-12-03 21:48:44 +00:00
parent 031d4e4862
commit 471ecb0662
14 changed files with 1054 additions and 527 deletions

View File

@@ -1,44 +1,23 @@
"""Main module."""
import asyncio
import json
import logging
import pathlib
import re
import os
import shutil
from asyncio.exceptions import TimeoutError
from datetime import datetime, timedelta, timezone
from cmath import log
from pprint import pprint
from time import sleep
from typing import Callable, List, Optional
import aiocron
import pytz
from aiohttp.client_exceptions import ClientPayloadError
from pyunifiprotect import NvrError, ProtectApiClient
from pyunifiprotect.data.nvr import Event
from pyunifiprotect.data.types import EventType, ModelType
from pyunifiprotect.data.websocket import WSAction, WSSubscriptionMessage
import aiosqlite
from pyunifiprotect import ProtectApiClient
from pyunifiprotect.data.types import ModelType
from unifi_protect_backup import EventListener, MissingEventChecker, Purge, VideoDownloader, VideoUploader
from unifi_protect_backup.utils import SubprocessException, parse_rclone_retention, run_command
logger = logging.getLogger(__name__)
class SubprocessException(Exception):
"""Exception class for when rclone does not exit with `0`."""
def __init__(self, stdout, stderr, returncode):
"""Exception class for when rclone does not exit with `0`.
Args:
stdout (str): What rclone output to stdout
stderr (str): What rclone output to stderr
returncode (str): The return code of the rclone process
"""
super().__init__()
self.stdout: str = stdout
self.stderr: str = stderr
self.returncode: int = returncode
def __str__(self):
"""Turns excpetion into a human readable form."""
return f"Return Code: {self.returncode}\nStdout:\n{self.stdout}\nStderr:\n{self.stderr}"
# TODO: https://github.com/cjrh/aiorun#id6 (smart shield)
def add_logging_level(levelName: str, levelNum: int, methodName: Optional[str] = None) -> None:
@@ -124,10 +103,46 @@ def setup_logging(verbosity: int) -> None:
logging.DEBUG - 2,
)
format = "{asctime} [{levelname}]:{name: <20}:\t{message}"
format = "{asctime} [{levelname:^11s}] {name:<42} :\t{message}"
date_format = "%Y-%m-%d %H:%M:%S"
style = '{'
logger = logging.getLogger("unifi_protect_backup")
sh = logging.StreamHandler()
formatter = logging.Formatter(format, date_format, style)
sh.setFormatter(formatter)
def decorate_emit(fn):
# add methods we need to the class
def new(*args):
levelno = args[0].levelno
if levelno >= logging.CRITICAL:
color = '\x1b[31;1m' # RED
elif levelno >= logging.ERROR:
color = '\x1b[31;1m' # RED
elif levelno >= logging.WARNING:
color = '\x1b[33;1m' # YELLOW
elif levelno >= logging.INFO:
color = '\x1b[32;1m' # GREEN
elif levelno >= logging.DEBUG:
color = '\x1b[36;1m' # CYAN
elif levelno >= logging.EXTRA_DEBUG:
color = '\x1b[35;1m' # MAGENTA
else:
color = '\x1b[0m'
# add colored *** in the beginning of the message
args[0].levelname = f"{color}{args[0].levelname:^11s}\x1b[0m"
return fn(*args)
return new
sh.emit = decorate_emit(sh.emit)
logger.addHandler(sh)
logger.propagate = False
if verbosity == 0:
logging.basicConfig(level=logging.WARN, format=format, style=style, datefmt=date_format)
logger.setLevel(logging.INFO)
@@ -143,24 +158,20 @@ def setup_logging(verbosity: int) -> None:
elif verbosity == 4:
logging.basicConfig(level=logging.INFO, format=format, style=style, datefmt=date_format)
logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore
elif verbosity == 5:
elif verbosity >= 5:
logging.basicConfig(level=logging.DEBUG, format=format, style=style, datefmt=date_format)
logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore
def human_readable_size(num):
"""Turns a number into a human readable number with ISO/IEC 80000 binary prefixes.
Based on: https://stackoverflow.com/a/1094933
Args:
num (int): The number to be converted into human readable format
"""
for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]:
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}"
num /= 1024.0
raise ValueError("`num` too large, ran out of prefixes")
async def create_database(path: str):
"""Creates sqlite database and creates the events abd backups tables"""
db = await aiosqlite.connect(path)
await db.execute("CREATE TABLE events(id PRIMARY KEY, type, camera_id, start REAL, end REAL)")
await db.execute(
"CREATE TABLE backups(id REFERENCES events(id) ON DELETE CASCADE, remote, path, PRIMARY KEY (id, remote))"
)
await db.commit()
return db
class UnifiProtectBackup:
@@ -168,19 +179,6 @@ class UnifiProtectBackup:
Listens to the Unifi Protect websocket for events. When a completed motion or smart detection
event is detected, it will download the clip and back it up using rclone
Attributes:
retention (str): How long should event clips be backed up for. Format as per the
`--max-age` argument of `rclone`
(https://rclone.org/filtering/#max-age-don-t-transfer-any-file-older-than-this)
rclone_args (str): Extra args passed directly to `rclone rcat`.
ignore_cameras (List[str]): List of camera IDs for which to not backup events
verbose (int): How verbose to setup logging, see :func:`setup_logging` for details.
detection_types (List[str]): List of which detection types to backup.
file_structure_format (str): A Python format string for output file path
_download_queue (asyncio.Queue): Queue of events that need to be backed up
_unsub (Callable): Unsubscribe from the websocket callback
_has_ffprobe (bool): If ffprobe was found on the host
"""
def __init__(
@@ -196,6 +194,7 @@ class UnifiProtectBackup:
ignore_cameras: List[str],
file_structure_format: str,
verbose: int,
sqlite_path: str = "events.sqlite",
port: int = 443,
):
"""Will configure logging settings and the Unifi Protect API (but not actually connect).
@@ -218,6 +217,7 @@ class UnifiProtectBackup:
ignore_cameras (List[str]): List of camera IDs for which to not backup events.
file_structure_format (str): A Python format string for output file path.
verbose (int): How verbose to setup logging, see :func:`setup_logging` for details.
sqlite_path (str): Path where to find/create sqlite database
"""
setup_logging(verbose)
@@ -238,9 +238,10 @@ class UnifiProtectBackup:
logger.debug(f" {verbose=}")
logger.debug(f" {detection_types=}")
logger.debug(f" {file_structure_format=}")
logger.debug(f" {sqlite_path=}")
self.rclone_destination = rclone_destination
self.retention = retention
self.retention = parse_rclone_retention(retention)
self.rclone_args = rclone_args
self.file_structure_format = file_structure_format
@@ -262,8 +263,9 @@ class UnifiProtectBackup:
self._download_queue: asyncio.Queue = asyncio.Queue()
self._unsub: Callable[[], None]
self.detection_types = detection_types
self._has_ffprobe = False
self._sqlite_path = sqlite_path
self._db = None
async def start(self):
"""Bootstrap the backup process and kick off the main loop.
@@ -271,114 +273,79 @@ class UnifiProtectBackup:
You should run this to start the realtime backup of Unifi Protect clips as they are created
"""
logger.info("Starting...")
try:
logger.info("Starting...")
# Ensure `rclone` is installed and properly configured
logger.info("Checking rclone configuration...")
await self._check_rclone()
# Ensure `rclone` is installed and properly configured
logger.info("Checking rclone configuration...")
await self._check_rclone()
# Check if `ffprobe` is available
ffprobe = shutil.which('ffprobe')
if ffprobe is not None:
logger.debug(f"ffprobe found: {ffprobe}")
self._has_ffprobe = True
# Start the pyunifiprotect connection by calling `update`
logger.info("Connecting to Unifi Protect...")
await self._protect.update()
# Start the pyunifiprotect connection by calling `update`
logger.info("Connecting to Unifi Protect...")
await self._protect.update()
# Get a mapping of camera ids -> names
logger.info("Found cameras:")
for camera in self._protect.bootstrap.cameras.values():
logger.info(f" - {camera.id}: {camera.name}")
# Get a mapping of camera ids -> names
logger.info("Found cameras:")
for camera in self._protect.bootstrap.cameras.values():
logger.info(f" - {camera.id}: {camera.name}")
tasks = []
# Subscribe to the websocket
self._unsub = self._protect.subscribe_websocket(self._websocket_callback)
if not os.path.exists(self._sqlite_path):
logger.info("Database doesn't exist, creating a new one")
self._db = await create_database(self._sqlite_path)
else:
self._db = await aiosqlite.connect(self._sqlite_path)
# Set up a "purge" task to run at midnight each day to delete old recordings and empty directories
logger.info("Setting up purge task...")
event_queue = asyncio.Queue()
@aiocron.crontab("0 0 * * *")
async def rclone_purge_old():
logger.info("Deleting old files...")
cmd = f'rclone delete -vv --min-age {self.retention} "{self.rclone_destination}"'
cmd += f' && rclone rmdirs -vv --leave-root "{self.rclone_destination}"'
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
# Enable foreign keys in the database
await self._db.execute("PRAGMA foreign_keys = ON;")
# Create downloader task
# This will download video files to its buffer
downloader = VideoDownloader(self._protect, event_queue) # TODO: Make buffer size configurable
tasks.append(asyncio.create_task(downloader.start()))
# Create upload task
# This will upload the videos in the downloader's buffer to the rclone remotes and log it in the database
uploader = VideoUploader(
self._protect,
downloader.video_queue,
self.rclone_destination,
self.rclone_args,
self.file_structure_format,
self._db,
)
stdout, stderr = await proc.communicate()
if proc.returncode == 0:
logger.extra_debug(f"stdout:\n{stdout.decode()}")
logger.extra_debug(f"stderr:\n{stderr.decode()}")
logger.info("Successfully deleted old files")
else:
logger.warn("Failed to purge old files")
logger.warn(f"stdout:\n{stdout.decode()}")
logger.warn(f"stderr:\n{stderr.decode()}")
tasks.append(asyncio.create_task(uploader.start()))
# We need to catch websocket disconnect and trigger a reconnect.
@aiocron.crontab("* * * * *")
async def check_websocket_and_reconnect():
logger.extra_debug("Checking the status of the websocket...")
if self._protect.check_ws():
logger.extra_debug("Websocket is connected.")
else:
logger.warn("Lost connection to Unifi Protect.")
# Create event listener task
# This will connect to the unifi protect websocket and listen for events. When one is detected it will
# be added to the queue of events to download
event_listener = EventListener(event_queue, self._protect, self.detection_types, self.ignore_cameras)
tasks.append(asyncio.create_task(event_listener.start()))
# Unsubscribe, close the session.
self._unsub()
# Create purge task
# This will, every midnight, purge old backups from the rclone remotes and database
purge = Purge(self._db, self.retention, self.rclone_destination)
tasks.append(asyncio.create_task(purge.start()))
# Create missing event task
# This will check all the events within the retention period, if any have been missed and not backed up
# they will be added to the event queue
missing = MissingEventChecker(
self._protect, self._db, event_queue, self.retention, self.detection_types, self.ignore_cameras
)
tasks.append(asyncio.create_task(missing.start()))
logger.info("Starting...")
await asyncio.gather(*tasks)
except asyncio.CancelledError:
if self._protect is not None:
await self._protect.close_session()
while True:
logger.warn("Attempting reconnect...")
try:
# Start again from scratch. In principle if Unifi
# Protect has not been restarted we should just be able
# to call self._protect.update() to reconnect to the
# websocket. However, if the server has been restarted a
# call to self._protect.check_ws() returns true and some
# seconds later pyunifiprotect detects the websocket as
# disconnected again. Therefore, kill it all and try
# again!
replacement_protect = ProtectApiClient(
self.address,
self.port,
self.username,
self.password,
verify_ssl=self.verify_ssl,
subscribed_models={ModelType.EVENT},
)
# Start the pyunifiprotect connection by calling `update`
await replacement_protect.update()
if replacement_protect.check_ws():
self._protect = replacement_protect
self._unsub = self._protect.subscribe_websocket(self._websocket_callback)
break
else:
logger.warn("Unable to establish connection to Unifi Protect")
except Exception as e:
logger.warn("Unexpected exception occurred while trying to reconnect:")
logger.exception(e)
finally:
# If we get here we need to close the replacement session again
await replacement_protect.close_session()
# Back off for a little while
await asyncio.sleep(10)
logger.info("Re-established connection to Unifi Protect and to the websocket.")
# Launches the main loop
logger.info("Listening for events...")
await self._backup_events()
logger.info("Stopping...")
# Unsubscribes from the websocket
self._unsub()
if self._db is not None:
await self._db.close()
async def _check_rclone(self) -> None:
"""Check if rclone is installed and the specified remote is configured.
@@ -393,258 +360,17 @@ class UnifiProtectBackup:
raise RuntimeError("`rclone` is not installed on this system")
logger.debug(f"rclone found: {rclone}")
cmd = "rclone listremotes -vv"
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
if proc.returncode != 0:
raise SubprocessException(stdout.decode(), stderr.decode(), proc.returncode)
returncode, stdout, stderr = await run_command("rclone listremotes -vv")
if returncode != 0:
raise SubprocessException(stdout, stderr, returncode)
# Check if the destination is for a configured remote
for line in stdout.splitlines():
if self.rclone_destination.startswith(line.decode()):
if self.rclone_destination.startswith(line):
break
else:
remote = self.rclone_destination.split(":")[0]
raise ValueError(f"rclone does not have a remote called `{remote}`")
def _websocket_callback(self, msg: WSSubscriptionMessage) -> None:
"""Callback for "EVENT" websocket messages.
Filters the incoming events, and puts completed events onto the download queue
Args:
msg (Event): Incoming event data
"""
logger.websocket_data(msg) # type: ignore
# We are only interested in updates that end motion/smartdetection event
assert isinstance(msg.new_obj, Event)
if msg.action != WSAction.UPDATE:
return
if msg.new_obj.camera_id in self.ignore_cameras:
return
if msg.new_obj.end is None:
return
if msg.new_obj.type not in [EventType.MOTION, EventType.SMART_DETECT, EventType.RING]:
return
if msg.new_obj.type is EventType.MOTION and "motion" not in self.detection_types:
logger.extra_debug(f"Skipping unwanted motion detection event: {msg.new_obj.id}") # type: ignore
return
if msg.new_obj.type is EventType.RING and "ring" not in self.detection_types:
logger.extra_debug(f"Skipping unwanted ring event: {msg.new_obj.id}") # type: ignore
return
elif msg.new_obj.type is EventType.SMART_DETECT:
for event_smart_detection_type in msg.new_obj.smart_detect_types:
if event_smart_detection_type not in self.detection_types:
logger.extra_debug( # type: ignore
f"Skipping unwanted {event_smart_detection_type} detection event: {msg.new_obj.id}"
)
return
self._download_queue.put_nowait(msg.new_obj)
logger.debug(f"Adding event {msg.new_obj.id} to queue (Current queue={self._download_queue.qsize()})")
async def _backup_events(self) -> None:
"""Main loop for backing up events.
Waits for an event in the queue, then downloads the corresponding clip and uploads it using rclone.
If errors occur it will simply log the errors and wait for the next event. In a future release,
retries will be added.
"""
while True:
try:
event = await self._download_queue.get()
# Fix timezones since pyunifiprotect sets all timestamps to UTC. Instead localize them to
# the timezone of the unifi protect NVR.
event.start = event.start.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone)
event.end = event.end.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone)
logger.info(f"Backing up event: {event.id}")
logger.debug(f"Remaining Queue: {self._download_queue.qsize()}")
logger.debug(f" Camera: {await self._get_camera_name(event.camera_id)}")
if event.type == EventType.SMART_DETECT:
logger.debug(f" Type: {event.type} ({', '.join(event.smart_detect_types)})")
else:
logger.debug(f" Type: {event.type}")
logger.debug(f" Start: {event.start.strftime('%Y-%m-%dT%H-%M-%S')} ({event.start.timestamp()})")
logger.debug(f" End: {event.end.strftime('%Y-%m-%dT%H-%M-%S')} ({event.end.timestamp()})")
duration = (event.end - event.start).total_seconds()
logger.debug(f" Duration: {duration}")
# Unifi protect does not return full video clips if the clip is requested too soon.
# There are two issues at play here:
# - Protect will only cut a clip on an keyframe which happen every 5s
# - Protect's pipeline needs a finite amount of time to make a clip available
# So we will wait 1.5x the keyframe interval to ensure that there is always ample video
# stored and Protect can return a full clip (which should be at least the length requested,
# but often longer)
time_since_event_ended = datetime.utcnow().replace(tzinfo=timezone.utc) - event.end
sleep_time = (timedelta(seconds=5 * 1.5) - time_since_event_ended).total_seconds()
if sleep_time > 0:
logger.debug(f" Sleeping ({sleep_time}s) to ensure clip is ready to download...")
await asyncio.sleep(sleep_time)
# Download video
logger.debug(" Downloading video...")
for x in range(5):
try:
video = await self._protect.get_camera_video(event.camera_id, event.start, event.end)
assert isinstance(video, bytes)
break
except (AssertionError, ClientPayloadError, TimeoutError) as e:
logger.warn(f" Failed download attempt {x+1}, retying in 1s")
logger.exception(e)
await asyncio.sleep(1)
else:
logger.warn(f"Download failed after 5 attempts, abandoning event {event.id}:")
continue
destination = await self.generate_file_path(event)
# Get the actual length of the downloaded video using ffprobe
if self._has_ffprobe:
try:
downloaded_duration = await self._get_video_length(video)
msg = (
f" Downloaded video length: {downloaded_duration:.3f}s"
f"({downloaded_duration - duration:+.3f}s)"
)
if downloaded_duration < duration:
logger.warning(msg)
else:
logger.debug(msg)
except SubprocessException as e:
logger.warn(" `ffprobe` failed")
logger.exception(e)
# Upload video
logger.debug(" Uploading video via rclone...")
logger.debug(f" To: {destination}")
logger.debug(f" Size: {human_readable_size(len(video))}")
for x in range(5):
try:
await self._upload_video(video, destination, self.rclone_args)
break
except SubprocessException as e:
logger.warn(f" Failed upload attempt {x+1}, retying in 1s")
logger.exception(e)
await asyncio.sleep(1)
else:
logger.warn(f"Upload failed after 5 attempts, abandoning event {event.id}:")
continue
logger.info("Backed up successfully!")
except Exception as e:
logger.warn(f"Unexpected exception occurred, abandoning event {event.id}:")
logger.exception(e)
async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str):
"""Upload video using rclone.
In order to avoid writing to disk, the video file data is piped directly
to the rclone process and uploaded using the `rcat` function of rclone.
Args:
video (bytes): The data to be written to the file
destination (pathlib.Path): Where rclone should write the file
rclone_args (str): Optional extra arguments to pass to `rclone`
Raises:
RuntimeError: If rclone returns a non-zero exit code
"""
cmd = f'rclone rcat -vv {rclone_args} "{destination}"'
proc = await asyncio.create_subprocess_shell(
cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate(video)
if proc.returncode == 0:
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
else:
raise SubprocessException(stdout.decode(), stderr.decode(), proc.returncode)
async def _get_video_length(self, video: bytes) -> float:
cmd = 'ffprobe -v quiet -show_streams -select_streams v:0 -of json -'
proc = await asyncio.create_subprocess_shell(
cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate(video)
if proc.returncode == 0:
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
json_data = json.loads(stdout.decode())
return float(json_data['streams'][0]['duration'])
else:
raise SubprocessException(stdout.decode(), stderr.decode(), proc.returncode)
async def generate_file_path(self, event: Event) -> pathlib.Path:
"""Generates the rclone destination path for the provided event.
Generates rclone destination path for the given even based upon the format string
in `self.file_structure_format`.
Provides the following fields to the format string:
event: The `Event` object as per
https://github.com/briis/pyunifiprotect/blob/master/pyunifiprotect/data/nvr.py
duration_seconds: The duration of the event in seconds
detection_type: A nicely formatted list of the event detection type and the smart detection types (if any)
camera_name: The name of the camera that generated this event
Args:
event: The event for which to create an output path
Returns:
pathlib.Path: The rclone path the event should be backed up to
"""
assert isinstance(event.camera_id, str)
assert isinstance(event.start, datetime)
assert isinstance(event.end, datetime)
format_context = {
"event": event,
"duration_seconds": (event.end - event.start).total_seconds(),
"detection_type": f"{event.type} ({' '.join(event.smart_detect_types)})"
if event.smart_detect_types
else f"{event.type}",
"camera_name": await self._get_camera_name(event.camera_id),
}
file_path = self.file_structure_format.format(**format_context)
file_path = re.sub(r'[^\w\-_\.\(\)/ ]', '', file_path) # Sanitize any invalid chars
return pathlib.Path(f"{self.rclone_destination}/{file_path}")
async def _get_camera_name(self, id: str):
try:
return self._protect.bootstrap.cameras[id].name
except KeyError:
# Refresh cameras
logger.debug(f"Unknown camera id: '{id}', checking API")
try:
await self._protect.update(force=True)
except NvrError:
logger.debug(f"Unknown camera id: '{id}'")
raise
name = self._protect.bootstrap.cameras[id].name
logger.debug(f"Found camera - {id}: {name}")
return name
# Ensure the base directory exists
await run_command(f"rclone mkdir -vv {self.rclone_destination}")