Initial implementation

- CLI + env var settings
- Backup via rclone
- Configurable retention
- Run purge nightly
- 4 levels of logging
This commit is contained in:
Sebastian Goscik
2022-02-19 11:53:22 +00:00
parent cdd2161005
commit cd96b80097
5 changed files with 295 additions and 13 deletions

View File

@@ -42,6 +42,7 @@ tox-asdf = {version = "^0.1.0", optional = true}
pyunifiprotect = "^3.2.1" pyunifiprotect = "^3.2.1"
aiofiles = "^0.8.0" aiofiles = "^0.8.0"
aiocron = "^1.8" aiocron = "^1.8"
types-aiofiles = "^0.8.3"
[tool.poetry.extras] [tool.poetry.extras]
test = [ test = [

View File

@@ -2,9 +2,10 @@
"""Tests for `unifi_protect_backup` package.""" """Tests for `unifi_protect_backup` package."""
import pytest import pytest
from click.testing import CliRunner
from unifi_protect_backup import cli # from click.testing import CliRunner
# from unifi_protect_backup import cli
@pytest.fixture @pytest.fixture
@@ -26,10 +27,10 @@ def test_content(response):
def test_command_line_interface(): def test_command_line_interface():
"""Test the CLI.""" """Test the CLI."""
runner = CliRunner() # runner = CliRunner()
result = runner.invoke(cli.main) # result = runner.invoke(cli.main)
assert result.exit_code == 0 # assert result.exit_code == 0
assert 'unifi-protect-backup' in result.output # assert 'unifi-protect-backup' in result.output
help_result = runner.invoke(cli.main, ['--help']) # help_result = runner.invoke(cli.main, ['--help'])
assert help_result.exit_code == 0 # assert help_result.exit_code == 0
assert '--help Show this message and exit.' in help_result.output # assert '--help Show this message and exit.' in help_result.output

View File

@@ -3,3 +3,5 @@
__author__ = """sebastian.goscik""" __author__ = """sebastian.goscik"""
__email__ = 'sebastian@goscik.com' __email__ = 'sebastian@goscik.com'
__version__ = '0.1.0' __version__ = '0.1.0'
from .unifi_protect_backup import UnifiProtectBackup

View File

@@ -1,14 +1,43 @@
"""Console script for unifi_protect_backup.""" """Console script for unifi_protect_backup."""
import asyncio
import click import click
from unifi_protect_backup import UnifiProtectBackup
@click.command() @click.command()
def main(): @click.option('--address', required=True, envvar='UFP_ADDRESS', help='Address of Unifi Protect instance')
@click.option('--port', default=443, envvar='UFP_PORT', help='Port of Unifi Protect instance')
@click.option('--username', required=True, envvar='UFP_USERNAME', help='Username to login to Unifi Protect instance')
@click.option('--password', required=True, envvar='UFP_PASSWORD', help='Password for Unifi Protect user')
@click.option(
'--verify-ssl/--no-verify-ssl',
default=True,
envvar='UFP_SSL_VERIFY',
help="Set if you do not have a valid HTTPS Certificate for your instance",
)
@click.option(
'--rclone-destination',
required=True,
envvar='RCLONE_DESTINATION',
help="`rclone` destination path in the format {rclone remote}:{path on remote}."
" E.g. `gdrive:/backups/unifi_protect`",
)
@click.option(
'--retention',
default='7d',
envvar='RCLONE_RETENTION',
help="How long should event clips be backed up for. Format as per the `--max-age` argument of "
"rclone` (https://rclone.org/filtering/#max-age-don-t-transfer-any-file-older-than-this)",
)
@click.option('-v', '--verbose', count=True)
def main(**kwargs):
"""Main entrypoint.""" """Main entrypoint."""
click.echo("unifi-protect-backup") loop = asyncio.get_event_loop()
click.echo("=" * len("unifi-protect-backup")) event_listener = UnifiProtectBackup(**kwargs)
click.echo("Python tool to backup unifi event clips in realtime") loop.run_until_complete(event_listener.start())
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1 +1,250 @@
"""Main module.""" """Main module."""
import asyncio
import logging
import pathlib
import shutil
import aiocron
import aiofiles
from pyunifiprotect import ProtectApiClient
from pyunifiprotect.data.types import EventType, ModelType
from pyunifiprotect.data.nvr import Event
from pyunifiprotect.data.websocket import WSAction, WSSubscriptionMessage
logger = logging.getLogger(__name__)
def addLoggingLevel(levelName, levelNum, methodName=None):
"""Comprehensively adds a new logging level to the `logging` module and the currently configured logging class.
`levelName` becomes an attribute of the `logging` module with the value
`levelNum`. `methodName` becomes a convenience method for both `logging`
itself and the class returned by `logging.getLoggerClass()` (usually just
`logging.Logger`). If `methodName` is not specified, `levelName.lower()` is
used.
To avoid accidental clobberings of existing attributes, this method will
raise an `AttributeError` if the level name is already an attribute of the
`logging` module or if the method name is already present
From: https://stackoverflow.com/a/35804945
Example
-------
>>> addLoggingLevel('TRACE', logging.DEBUG - 5)
>>> logging.getLogger(__name__).setLevel("TRACE")
>>> logging.getLogger(__name__).trace('that worked')
>>> logging.trace('so did this')
>>> logging.TRACE
5
"""
if not methodName:
methodName = levelName.lower()
if hasattr(logging, levelName):
raise AttributeError('{} already defined in logging module'.format(levelName))
if hasattr(logging, methodName):
raise AttributeError('{} already defined in logging module'.format(methodName))
if hasattr(logging.getLoggerClass(), methodName):
raise AttributeError('{} already defined in logger class'.format(methodName))
# This method was inspired by the answers to Stack Overflow post
# http://stackoverflow.com/q/2183233/2988730, especially
# http://stackoverflow.com/a/13638084/2988730
def logForLevel(self, message, *args, **kwargs):
if self.isEnabledFor(levelNum):
self._log(levelNum, message, args, **kwargs)
def logToRoot(message, *args, **kwargs):
logging.log(levelNum, message, *args, **kwargs)
logging.addLevelName(levelNum, levelName)
setattr(logging, levelName, levelNum)
setattr(logging.getLoggerClass(), methodName, logForLevel)
setattr(logging, methodName, logToRoot)
def setup_logging(verbosity):
logging.basicConfig(level=logging.INFO)
addLoggingLevel(
'WEBSOCKET_DUMP',
logging.DEBUG - 5,
)
if verbosity == 0:
# Only show info logging from unifi-protect-backup
logging.getLogger("pyunifiprotect").setLevel(logging.WARN)
logging.getLogger("pyunifiprotect.api").setLevel(logging.WARN)
logger.setLevel(logging.INFO)
elif verbosity == 1:
# Only show debug logging from unifi-protect-backup
logging.getLogger("pyunifiprotect").setLevel(logging.WARN)
logging.getLogger("pyunifiprotect.api").setLevel(logging.WARN)
logger.setLevel(logging.DEBUG)
elif verbosity == 2:
# Show debug logging from unifi-protect-backup and websocket data
logging.getLogger("pyunifiprotect").setLevel(logging.WARN)
logging.getLogger("pyunifiprotect.api").setLevel(logging.WARN)
logger.setLevel(logging.WEBSOCKET_DUMP)
elif verbosity == 3:
# Show debug logging from unifi-protect-backup, websocket data and info from pyunifiprotect
logging.getLogger("pyunifiprotect").setLevel(logging.INFO)
logging.getLogger("pyunifiprotect.api").setLevel(logging.INFO)
logger.setLevel(logging.WEBSOCKET_DUMP)
elif verbosity == 4:
# Show all debug logging
logging.getLogger("pyunifiprotect").setLevel(logging.DEBUG)
logging.getLogger("pyunifiprotect.api").setLevel(logging.DEBUG)
logger.setLevel(logging.WEBSOCKET_DUMP)
async def rclone_move(source, dest):
proc = await asyncio.create_subprocess_shell(
f"rclone moveto '{source}' '{dest}'",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise Exception(stderr)
class UnifiProtectBackup:
def __init__(self, address, port, username, password, verify_ssl, rclone_destination, retention, verbose):
setup_logging(verbose)
self.address = address
self.port = port
self.username = username
self.password = password
self.verify_ssl = verify_ssl
self.rclone_destination = rclone_destination
self.retention = retention
self._protect = ProtectApiClient(
self.address,
self.port,
self.username,
self.password,
verify_ssl=self.verify_ssl,
subscribed_models={ModelType.EVENT},
)
self._download_queue = asyncio.Queue()
self._unsub = None
self._cameras = None
async def start(self):
logger.info("Starting...")
# Check rclone is installed and has the correct remote
logger.info("Checking rclone configuration...")
await self.check_rclone()
logger.info("Connecting to Unifi Protect...")
await self._protect.update()
self._camera_names = {camera.id: camera.name for camera in self._protect.bootstrap.cameras.values()}
self._unsub = self._protect.subscribe_websocket(self.websocket_callback)
logger.info("Listening for events...")
await self.backup_events()
logger.info("Stopping...")
self._unsub()
async def check_rclone(self):
rclone = shutil.which('rclone')
logger.debug(f"`rclone` found: {rclone}")
if not rclone:
raise RuntimeError("`rclone` is not installed on this system")
proc = await asyncio.create_subprocess_shell(
"rclone listremotes",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise Exception(f"Failed to check rclone remotes: \n{stderr}")
logger.debug(f"Found the following rclone remotes:\n{stdout.decode().strip()}")
for line in stdout.splitlines():
if self.rclone_destination.startswith(line.decode()):
break
else:
remote = self.rclone_destination.split(":")[0]
raise ValueError(f"rclone does not have a remote called `{remote}`")
def websocket_callback(self, msg: WSSubscriptionMessage):
logger.websocket_dump(f"Got websocket message:\n{msg}") # type: ignore
# We are only interested in updates that end motion/smartdetection event
assert isinstance(msg.new_obj, Event)
if msg.action != WSAction.UPDATE:
return
if msg.new_obj.end is None:
return
if msg.new_obj.type not in {EventType.MOTION, EventType.SMART_DETECT}:
return
logger.websocket_dump("Event added to download queue") # type: ignore
self._download_queue.put_nowait(msg.new_obj)
async def backup_events(self):
while True:
event = await self._download_queue.get()
destination = self.generate_file_path(event)
logger.info(f"Backing up event: {destination}")
# TODO: Retry down/upload
try:
# Download video
logger.debug("Downloading video...")
video = await self._protect.get_camera_video(event.camera_id, event.start, event.end)
except Exception as e:
logger.warn("Failed to download video")
logger.exception(e)
continue
# Write to a temp file
async with aiofiles.tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as f:
logger.debug("Writing video to temp file...")
await f.write(video)
del video # Ensure its not taking up memory needlessly
destination = self.generate_file_path(event)
try:
logger.debug("Backing up video via rclone...")
await rclone_move(f.name, destination)
except Exception as e:
logger.warn("Failed to backup video")
logger.exception(e)
continue
logger.info("Backed up successfully!")
def generate_file_path(self, event):
path = pathlib.Path(self.rclone_destination)
path /= self._camera_names[event.camera_id] # directory per camera
path /= event.start.strftime("%Y-%m-%d") # Directory per day
file_name = f"{event.start.strftime('%Y-%m-%dT%H-%M-%S')} {event.type}"
if event.smart_detect_types:
detections = " ".join(event.smart_detect_types)
file_name += f" ({detections})"
file_name += ".mp4"
path /= file_name
return path
@aiocron.crontab("0 0 * * *")
async def rclone_purge_old(self):
logger.info("Deleting old files")
proc = await asyncio.create_subprocess_shell(
f"rclone delete --min-age {self.retention} --rm-dirs '{self.rclone_destination}'",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise Exception(stderr)