mirror of
https://github.com/ep1cman/unifi-protect-backup.git
synced 2025-12-05 23:53:30 +00:00
Today after adding a new camera for testing, it became clear that the previous assumption that pyunifiprotect would update its bootstrap when new cameras were added was incorrect.
491 lines
20 KiB
Python
491 lines
20 KiB
Python
"""Main module."""
|
|
import asyncio
|
|
import logging
|
|
import pathlib
|
|
import shutil
|
|
from typing import Callable, List, Optional
|
|
|
|
import aiocron
|
|
import aiohttp
|
|
from pyunifiprotect import NvrError, ProtectApiClient
|
|
from pyunifiprotect.data.nvr import Event
|
|
from pyunifiprotect.data.types import EventType, ModelType
|
|
from pyunifiprotect.data.websocket import WSAction, WSSubscriptionMessage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RcloneException(Exception):
|
|
"""Exception class for when rclone does not exit with `0`."""
|
|
|
|
def __init__(self, stdout, stderr, returncode):
|
|
"""Exception class for when rclone does not exit with `0`.
|
|
|
|
Args:
|
|
stdout (str): What rclone output to stdout
|
|
stderr (str): What rclone output to stderr
|
|
returncode (str): The return code of the rclone process
|
|
"""
|
|
super().__init__()
|
|
self.stdout: str = stdout
|
|
self.stderr: str = stderr
|
|
self.returncode: int = returncode
|
|
|
|
def __str__(self):
|
|
"""Turns excpetion into a human readable form."""
|
|
return f"Return Code: {self.returncode}\nStdout:\n{self.stdout}\nStderr:\n{self.stderr}"
|
|
|
|
|
|
def add_logging_level(levelName: str, levelNum: int, methodName: Optional[str] = None) -> None:
|
|
"""Comprehensively adds a new logging level to the `logging` module and the currently configured logging class.
|
|
|
|
`levelName` becomes an attribute of the `logging` module with the value
|
|
`levelNum`. `methodName` becomes a convenience method for both `logging`
|
|
itself and the class returned by `logging.getLoggerClass()` (usually just
|
|
`logging.Logger`).
|
|
|
|
To avoid accidental clobbering of existing attributes, this method will
|
|
raise an `AttributeError` if the level name is already an attribute of the
|
|
`logging` module or if the method name is already present
|
|
|
|
Credit: https://stackoverflow.com/a/35804945
|
|
|
|
Args:
|
|
levelName (str): The name of the new logging level (in all caps).
|
|
levelNum (int): The priority value of the logging level, lower=more verbose.
|
|
methodName (str): The name of the method used to log using this.
|
|
If `methodName` is not specified, `levelName.lower()` is used.
|
|
|
|
Example:
|
|
::
|
|
>>> add_logging_level('TRACE', logging.DEBUG - 5)
|
|
>>> logging.getLogger(__name__).setLevel("TRACE")
|
|
>>> logging.getLogger(__name__).trace('that worked')
|
|
>>> logging.trace('so did this')
|
|
>>> logging.TRACE
|
|
5
|
|
|
|
"""
|
|
if not methodName:
|
|
methodName = levelName.lower()
|
|
|
|
if hasattr(logging, levelName):
|
|
raise AttributeError('{} already defined in logging module'.format(levelName))
|
|
if hasattr(logging, methodName):
|
|
raise AttributeError('{} already defined in logging module'.format(methodName))
|
|
if hasattr(logging.getLoggerClass(), methodName):
|
|
raise AttributeError('{} already defined in logger class'.format(methodName))
|
|
|
|
# This method was inspired by the answers to Stack Overflow post
|
|
# http://stackoverflow.com/q/2183233/2988730, especially
|
|
# http://stackoverflow.com/a/13638084/2988730
|
|
def logForLevel(self, message, *args, **kwargs):
|
|
if self.isEnabledFor(levelNum):
|
|
self._log(levelNum, message, args, **kwargs)
|
|
|
|
def logToRoot(message, *args, **kwargs):
|
|
logging.log(levelNum, message, *args, **kwargs)
|
|
|
|
logging.addLevelName(levelNum, levelName)
|
|
setattr(logging, levelName, levelNum)
|
|
setattr(logging.getLoggerClass(), methodName, logForLevel)
|
|
setattr(logging, methodName, logToRoot)
|
|
|
|
|
|
def setup_logging(verbosity: int) -> None:
|
|
"""Configures loggers to provided the desired level of verbosity.
|
|
|
|
Verbosity 0: Only log info messages created by `unifi-protect-backup`, and all warnings
|
|
verbosity 1: Only log info & debug messages created by `unifi-protect-backup`, and all warnings
|
|
verbosity 2: Log info & debug messages created by `unifi-protect-backup`, command output, and
|
|
all warnings
|
|
Verbosity 3: Log debug messages created by `unifi-protect-backup`, command output, all info
|
|
messages, and all warnings
|
|
Verbosity 4: Log debug messages created by `unifi-protect-backup` command output, all info
|
|
messages, all warnings, and websocket data
|
|
Verbosity 5: Log websocket data, command output, all debug messages, all info messages and all
|
|
warnings
|
|
|
|
Args:
|
|
verbosity (int): The desired level of verbosity
|
|
|
|
"""
|
|
add_logging_level(
|
|
'EXTRA_DEBUG',
|
|
logging.DEBUG - 1,
|
|
)
|
|
add_logging_level(
|
|
'WEBSOCKET_DATA',
|
|
logging.DEBUG - 2,
|
|
)
|
|
|
|
format = "{asctime} [{levelname}]:{name: <20}:\t{message}"
|
|
date_format = "%Y-%m-%d %H:%M:%S"
|
|
style = '{'
|
|
|
|
if verbosity == 0:
|
|
logging.basicConfig(level=logging.WARN, format=format, style=style, datefmt=date_format)
|
|
logger.setLevel(logging.INFO)
|
|
elif verbosity == 1:
|
|
logging.basicConfig(level=logging.WARN, format=format, style=style, datefmt=date_format)
|
|
logger.setLevel(logging.DEBUG)
|
|
elif verbosity == 2:
|
|
logging.basicConfig(level=logging.WARN, format=format, style=style, datefmt=date_format)
|
|
logger.setLevel(logging.EXTRA_DEBUG) # type: ignore
|
|
elif verbosity == 3:
|
|
logging.basicConfig(level=logging.INFO, format=format, style=style, datefmt=date_format)
|
|
logger.setLevel(logging.EXTRA_DEBUG) # type: ignore
|
|
elif verbosity == 4:
|
|
logging.basicConfig(level=logging.INFO, format=format, style=style, datefmt=date_format)
|
|
logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore
|
|
elif verbosity == 5:
|
|
logging.basicConfig(level=logging.DEBUG, format=format, style=style, datefmt=date_format)
|
|
logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore
|
|
|
|
|
|
def human_readable_size(num):
|
|
"""Turns a number into a human readable number with ISO/IEC 80000 binary prefixes.
|
|
|
|
Based on: https://stackoverflow.com/a/1094933
|
|
|
|
Args:
|
|
num (int): The number to be converted into human readable format
|
|
"""
|
|
for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]:
|
|
if abs(num) < 1024.0:
|
|
return f"{num:3.1f}{unit}"
|
|
num /= 1024.0
|
|
raise ValueError("`num` too large, ran out of prefixes")
|
|
|
|
|
|
class UnifiProtectBackup:
|
|
"""Backup Unifi protect event clips using rclone.
|
|
|
|
Listens to the Unifi Protect websocket for events. When a completed motion or smart detection
|
|
event is detected, it will download the clip and back it up using rclone
|
|
|
|
Attributes:
|
|
retention (str): How long should event clips be backed up for. Format as per the
|
|
`--max-age` argument of `rclone`
|
|
(https://rclone.org/filtering/#max-age-don-t-transfer-any-file-older-than-this)
|
|
rclone_args (str): Extra args passed directly to `rclone rcat`.
|
|
ignore_cameras (List[str]): List of camera IDs for which to not backup events
|
|
verbose (int): How verbose to setup logging, see :func:`setup_logging` for details.
|
|
_download_queue (asyncio.Queue): Queue of events that need to be backed up
|
|
_unsub (Callable): Unsubscribe from the websocket callback
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
address: str,
|
|
username: str,
|
|
password: str,
|
|
verify_ssl: bool,
|
|
rclone_destination: str,
|
|
retention: str,
|
|
rclone_args: str,
|
|
ignore_cameras: List[str],
|
|
verbose: int,
|
|
port: int = 443,
|
|
):
|
|
"""Will configure logging settings and the Unifi Protect API (but not actually connect).
|
|
|
|
Args:
|
|
address (str): Base address of the Unifi Protect instance
|
|
port (int): Post of the Unifi Protect instance, usually 443
|
|
username (str): Username to log into Unifi Protect instance
|
|
password (str): Password for Unifi Protect user
|
|
verify_ssl (bool): Flag for if SSL certificates should be validated
|
|
rclone_destination (str): `rclone` destination path in the format
|
|
{rclone remote}:{path on remote}. E.g.
|
|
`gdrive:/backups/unifi_protect`
|
|
retention (str): How long should event clips be backed up for. Format as per the
|
|
`--max-age` argument of `rclone`
|
|
(https://rclone.org/filtering/#max-age-don-t-transfer-any-file-older-than-this)
|
|
rclone_args (str): A bandwidth limit which is passed to the `--bwlimit` argument of
|
|
`rclone` (https://rclone.org/docs/#bwlimit-bandwidth-spec)
|
|
ignore_cameras (List[str]): List of camera IDs for which to not backup events
|
|
verbose (int): How verbose to setup logging, see :func:`setup_logging` for details.
|
|
"""
|
|
setup_logging(verbose)
|
|
|
|
logger.debug("Config:")
|
|
logger.debug(f" {address=}")
|
|
logger.debug(f" {port=}")
|
|
logger.debug(f" {username=}")
|
|
if verbose < 5:
|
|
logger.debug(" password=REDACTED")
|
|
else:
|
|
logger.debug(f" {password=}")
|
|
|
|
logger.debug(f" {verify_ssl=}")
|
|
logger.debug(f" {rclone_destination=}")
|
|
logger.debug(f" {retention=}")
|
|
logger.debug(f" {rclone_args=}")
|
|
logger.debug(f" {ignore_cameras=}")
|
|
logger.debug(f" {verbose=}")
|
|
|
|
self.rclone_destination = rclone_destination
|
|
self.retention = retention
|
|
self.rclone_args = rclone_args
|
|
|
|
self._protect = ProtectApiClient(
|
|
address,
|
|
port,
|
|
username,
|
|
password,
|
|
verify_ssl=verify_ssl,
|
|
subscribed_models={ModelType.EVENT},
|
|
)
|
|
self.ignore_cameras = ignore_cameras
|
|
self._download_queue: asyncio.Queue = asyncio.Queue()
|
|
self._unsub: Callable[[], None]
|
|
|
|
async def start(self):
|
|
"""Bootstrap the backup process and kick off the main loop.
|
|
|
|
You should run this to start the realtime backup of Unifi Protect clips as they are created
|
|
|
|
"""
|
|
logger.info("Starting...")
|
|
|
|
# Ensure rclone is installed and properly configured
|
|
logger.info("Checking rclone configuration...")
|
|
await self._check_rclone()
|
|
|
|
# Start the pyunifiprotect connection by calling `update`
|
|
logger.info("Connecting to Unifi Protect...")
|
|
await self._protect.update()
|
|
|
|
# Get a mapping of camera ids -> names
|
|
logger.info("Found cameras:")
|
|
for camera in self._protect.bootstrap.cameras.values():
|
|
logger.info(f" - {camera.id}: {camera.name}")
|
|
|
|
# Subscribe to the websocket
|
|
self._unsub = self._protect.subscribe_websocket(self._websocket_callback)
|
|
|
|
# Set up a "purge" task to run at midnight each day to delete old recordings and empty directories
|
|
logger.info("Setting up purge task...")
|
|
|
|
@aiocron.crontab("0 0 * * *")
|
|
async def rclone_purge_old():
|
|
logger.info("Deleting old files...")
|
|
cmd = f"rclone delete -vv --min-age {self.retention} '{self.rclone_destination}'"
|
|
cmd += f" && rclone rmdirs -vv --leave-root '{self.rclone_destination}'"
|
|
proc = await asyncio.create_subprocess_shell(
|
|
cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
if proc.returncode == 0:
|
|
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
|
|
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
|
|
logger.info("Successfully deleted old files")
|
|
else:
|
|
logger.warn("Failed to purge old files")
|
|
logger.warn(f"stdout:\n{stdout.decode()}")
|
|
logger.warn(f"stderr:\n{stderr.decode()}")
|
|
|
|
# Launches the main loop
|
|
logger.info("Listening for events...")
|
|
await self._backup_events()
|
|
|
|
logger.info("Stopping...")
|
|
|
|
# Unsubscribes from the websocket
|
|
self._unsub()
|
|
|
|
async def _check_rclone(self) -> None:
|
|
"""Check if rclone is installed and the specified remote is configured.
|
|
|
|
Raises:
|
|
RcloneException: If rclone is not installed or it failed to list remotes
|
|
ValueError: The given rclone destination is for a remote that is not configured
|
|
|
|
"""
|
|
rclone = shutil.which('rclone')
|
|
logger.debug(f"rclone found: {rclone}")
|
|
if not rclone:
|
|
raise RuntimeError("`rclone` is not installed on this system")
|
|
|
|
cmd = "rclone listremotes -vv"
|
|
proc = await asyncio.create_subprocess_shell(
|
|
cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
|
|
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
|
|
if proc.returncode != 0:
|
|
raise RcloneException(stdout.decode(), stderr.decode(), proc.returncode)
|
|
|
|
# Check if the destination is for a configured remote
|
|
for line in stdout.splitlines():
|
|
if self.rclone_destination.startswith(line.decode()):
|
|
break
|
|
else:
|
|
remote = self.rclone_destination.split(":")[0]
|
|
raise ValueError(f"rclone does not have a remote called `{remote}`")
|
|
|
|
def _websocket_callback(self, msg: WSSubscriptionMessage) -> None:
|
|
"""Callback for "EVENT" websocket messages.
|
|
|
|
Filters the incoming events, and puts completed events onto the download queue
|
|
|
|
Args:
|
|
msg (Event): Incoming event data
|
|
"""
|
|
logger.websocket_data(msg) # type: ignore
|
|
|
|
# We are only interested in updates that end motion/smartdetection event
|
|
assert isinstance(msg.new_obj, Event)
|
|
if msg.action != WSAction.UPDATE:
|
|
return
|
|
if msg.new_obj.camera_id in self.ignore_cameras:
|
|
return
|
|
if msg.new_obj.end is None:
|
|
return
|
|
if msg.new_obj.type not in {EventType.MOTION, EventType.SMART_DETECT}:
|
|
return
|
|
self._download_queue.put_nowait(msg.new_obj)
|
|
logger.debug(f"Adding event {msg.new_obj.id} to queue (Current queue={self._download_queue.qsize()})")
|
|
|
|
async def _backup_events(self) -> None:
|
|
"""Main loop for backing up events.
|
|
|
|
Waits for an event in the queue, then downloads the corresponding clip and uploads it using rclone.
|
|
If errors occur it will simply log the errors and wait for the next event. In a future release,
|
|
retries will be added.
|
|
|
|
"""
|
|
while True:
|
|
try:
|
|
event = await self._download_queue.get()
|
|
|
|
logger.info(f"Backing up event: {event.id}")
|
|
logger.debug(f"Remaining Queue: {self._download_queue.qsize()}")
|
|
logger.debug(f" Camera: {await self._get_camera_name(event.camera_id)}")
|
|
logger.debug(f" Type: {event.type}")
|
|
logger.debug(f" Start: {event.start.strftime('%Y-%m-%dT%H-%M-%S')}")
|
|
logger.debug(f" End: {event.end.strftime('%Y-%m-%dT%H-%M-%S')}")
|
|
logger.debug(f" Duration: {event.end-event.start}")
|
|
|
|
# Download video
|
|
logger.debug(" Downloading video...")
|
|
for x in range(5):
|
|
try:
|
|
video = await self._protect.get_camera_video(event.camera_id, event.start, event.end)
|
|
assert isinstance(video, bytes)
|
|
break
|
|
except (AssertionError, aiohttp.client_exceptions.ClientPayloadError) as e:
|
|
logger.warn(f" Failed download attempt {x+1}, retying in 1s")
|
|
logger.exception(e)
|
|
await asyncio.sleep(1)
|
|
else:
|
|
logger.warn(f"Download failed after 5 attempts, abandoning event {event.id}:")
|
|
continue
|
|
|
|
destination = await self.generate_file_path(event)
|
|
|
|
logger.debug(" Uploading video via rclone...")
|
|
logger.debug(f" To: {destination}")
|
|
logger.debug(f" Size: {human_readable_size(len(video))}")
|
|
for x in range(5):
|
|
try:
|
|
await self._upload_video(video, destination, self.rclone_args)
|
|
break
|
|
except RcloneException as e:
|
|
logger.warn(f" Failed upload attempt {x+1}, retying in 1s")
|
|
logger.exception(e)
|
|
await asyncio.sleep(1)
|
|
else:
|
|
logger.warn(f"Upload failed after 5 attempts, abandoning event {event.id}:")
|
|
continue
|
|
|
|
logger.info("Backed up successfully!")
|
|
|
|
except Exception as e:
|
|
logger.warn(f"Unexpected exception occurred, abandoning event {event.id}:")
|
|
logger.exception(e)
|
|
|
|
async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str):
|
|
"""Upload video using rclone.
|
|
|
|
In order to avoid writing to disk, the video file data is piped directly
|
|
to the rclone process and uploaded using the `rcat` function of rclone.
|
|
|
|
Args:
|
|
video (bytes): The data to be written to the file
|
|
destination (pathlib.Path): Where rclone should write the file
|
|
rclone_args (str): Optional extra arguments to pass to `rclone`
|
|
|
|
Raises:
|
|
RuntimeError: If rclone returns a non-zero exit code
|
|
"""
|
|
cmd = f"rclone rcat -vv {rclone_args} '{destination}'"
|
|
proc = await asyncio.create_subprocess_shell(
|
|
cmd,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await proc.communicate(video)
|
|
if proc.returncode == 0:
|
|
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
|
|
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
|
|
else:
|
|
raise RcloneException(stdout.decode(), stderr.decode(), proc.returncode)
|
|
|
|
async def generate_file_path(self, event: Event) -> pathlib.Path:
|
|
"""Generates the rclone destination path for the provided event.
|
|
|
|
Generates paths in the following structure:
|
|
::
|
|
rclone_destination
|
|
|- Camera Name
|
|
|- {Date}
|
|
|- {start timestamp} {event type} ({detections}).mp4
|
|
|
|
Args:
|
|
event: The event for which to create an output path
|
|
|
|
Returns:
|
|
pathlib.Path: The rclone path the event should be backed up to
|
|
|
|
"""
|
|
path = pathlib.Path(self.rclone_destination)
|
|
assert isinstance(event.camera_id, str)
|
|
path /= await self._get_camera_name(event.camera_id) # directory per camera
|
|
path /= event.start.strftime("%Y-%m-%d") # Directory per day
|
|
|
|
file_name = f"{event.start.strftime('%Y-%m-%dT%H-%M-%S')} {event.type}"
|
|
|
|
if event.smart_detect_types:
|
|
detections = " ".join(event.smart_detect_types)
|
|
file_name += f" ({detections})"
|
|
file_name += ".mp4"
|
|
|
|
path /= file_name
|
|
|
|
return path
|
|
|
|
async def _get_camera_name(self, id: str):
|
|
try:
|
|
return self._protect.bootstrap.cameras[id].name
|
|
except KeyError:
|
|
# Refresh cameras
|
|
logger.debug(f"Unknown camera id: '{id}', checking API")
|
|
|
|
try:
|
|
await self._protect.update(force=True)
|
|
except NvrError:
|
|
logger.debug(f"Unknown camera id: '{id}'")
|
|
raise
|
|
|
|
name = self._protect.bootstrap.cameras[id].name
|
|
logger.debug(f"Found camera - {id}: {name}")
|
|
return name
|