From 471ecb06624e1eaf1d592c8fc5bf7ec29f72fc1e Mon Sep 17 00:00:00 2001 From: Sebastian Goscik Date: Sat, 3 Dec 2022 21:48:44 +0000 Subject: [PATCH] Major Restructure - Each task is now its own class - Added a database to track backed up events and their destinations - Added task to check for and backup missed events --- .gitignore | 2 + CHANGELOG.md | 13 + README.md | 20 +- poetry.lock | 225 ++++---- pyproject.toml | 7 +- unifi_protect_backup/__init__.py | 7 +- unifi_protect_backup/cli.py | 12 +- unifi_protect_backup/downloader.py | 142 +++++ unifi_protect_backup/event_listener.py | 120 ++++ unifi_protect_backup/missing_event_checker.py | 83 +++ unifi_protect_backup/purge.py | 75 +++ unifi_protect_backup/unifi_protect_backup.py | 538 +++++------------- unifi_protect_backup/uploader.py | 146 +++++ unifi_protect_backup/utils.py | 191 +++++++ 14 files changed, 1054 insertions(+), 527 deletions(-) create mode 100644 unifi_protect_backup/downloader.py create mode 100644 unifi_protect_backup/event_listener.py create mode 100644 unifi_protect_backup/missing_event_checker.py create mode 100644 unifi_protect_backup/purge.py create mode 100644 unifi_protect_backup/uploader.py create mode 100644 unifi_protect_backup/utils.py diff --git a/.gitignore b/.gitignore index fef05e8..5d9e5e1 100644 --- a/.gitignore +++ b/.gitignore @@ -118,3 +118,5 @@ config/ data/ .envrc +clips/ +*.sqlite \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ee2267..249608c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,19 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.8.0] - 2022-12-03 +Major internal refactoring. Each task is now its own class and asyncio task. + +### Added +- A database of backed up events and where they are stored +- A periodic check for missed events + - This will also ensure past events before the tool was used are backed up, up until the retention period + +### Fixed +- Pruning is no longer done based on file timestamps, the database is used instead. The tool will no longer delete files it didn't create. +- Pruning now runs much more frequently (every minute) so retention periods of less than a day are now possible. + + ## [0.7.4] - 2022-08-21 No functional changes in this version. This is just to trigger the release CI. ### Added diff --git a/README.md b/README.md index 2a3869a..e8b0307 100644 --- a/README.md +++ b/README.md @@ -23,8 +23,9 @@ retention period. ## Features - Listens to events in real-time via the Unifi Protect websocket API +- Ensures any previous and/or missed events within the retention period are also backed up - Supports uploading to a [wide range of storage systems using `rclone`](https://rclone.org/overview/) -- Performs nightly pruning of old clips +- Automatic pruning of old clips ## Requirements - Python 3.9+ @@ -54,9 +55,6 @@ In order to connect to your unifi protect instance, you will first need to setup ## Usage -:warning: **Potential Data Loss**: Be very careful when setting the `rclone-destination`, at midnight every day it will -delete any files older than `retention`. It is best to give `unifi-protect-backup` its own directory. - ``` Usage: unifi-protect-backup [OPTIONS] @@ -102,30 +100,31 @@ Options: a_name}/{event.start:%Y-%m-%d}/{event.end:%Y -%m-%dT%H-%M-%S} {detection_type}.mp4] -v, --verbose How verbose the logging output should be. - + None: Only log info messages created by `unifi-protect-backup`, and all warnings - + -v: Only log info & debug messages created by `unifi-protect-backup`, and all warnings - + -vv: Log info & debug messages created by `unifi-protect-backup`, command output, and all warnings - + -vvv Log debug messages created by `unifi-protect-backup`, command output, all info messages, and all warnings - + -vvvv: Log debug messages created by `unifi-protect-backup` command output, all info messages, all warnings, and websocket data - + -vvvvv: Log websocket data, command output, all debug messages, all info messages and all warnings [x>=0] + --sqlite_path TEXT Path to the SQLite database to use/create --help Show this message and exit. ``` @@ -142,6 +141,7 @@ always take priority over environment variables): - `IGNORE_CAMERAS` - `DETECTION_TYPES` - `FILE_STRUCTURE_FORMAT` +- `SQLITE_PATH` ## File path formatting diff --git a/poetry.lock b/poetry.lock index b8ea146..60514b3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,18 +1,3 @@ -[[package]] -name = "aiocron" -version = "1.8" -description = "Crontabs for asyncio" -category = "main" -optional = false -python-versions = "*" - -[package.dependencies] -croniter = "*" -tzlocal = "*" - -[package.extras] -test = ["coverage"] - [[package]] name = "aiofiles" version = "0.8.0" @@ -41,6 +26,17 @@ yarl = ">=1.0,<2.0" [package.extras] speedups = ["aiodns", "brotli", "cchardet"] +[[package]] +name = "aiorun" +version = "2022.11.1" +description = "Boilerplate for asyncio applications" +category = "main" +optional = false +python-versions = ">=3.5" + +[package.extras] +dev = ["pytest", "pytest-cov"] + [[package]] name = "aioshutil" version = "1.1" @@ -60,6 +56,17 @@ python-versions = ">=3.6" [package.dependencies] frozenlist = ">=1.1.0" +[[package]] +name = "aiosqlite" +version = "0.17.0" +description = "asyncio bridge to the standard sqlite3 module" +category = "main" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +typing_extensions = ">=3.7.2" + [[package]] name = "appnope" version = "0.1.3" @@ -68,6 +75,22 @@ category = "main" optional = true python-versions = "*" +[[package]] +name = "astroid" +version = "2.12.13" +description = "An abstract syntax tree for Python with inference support." +category = "main" +optional = false +python-versions = ">=3.7.2" + +[package.dependencies] +lazy-object-proxy = ">=1.4.0" +typing-extensions = {version = ">=3.10", markers = "python_version < \"3.10\""} +wrapt = [ + {version = ">=1.11,<2", markers = "python_version < \"3.11\""}, + {version = ">=1.14,<2", markers = "python_version >= \"3.11\""}, +] + [[package]] name = "asttokens" version = "2.0.5" @@ -122,28 +145,24 @@ python-versions = "*" [[package]] name = "black" -version = "21.12b0" +version = "22.10.0" description = "The uncompromising code formatter." category = "main" optional = true -python-versions = ">=3.6.2" +python-versions = ">=3.7" [package.dependencies] -click = ">=7.1.2" +click = ">=8.0.0" mypy-extensions = ">=0.4.3" -pathspec = ">=0.9.0,<1" +pathspec = ">=0.9.0" platformdirs = ">=2" -tomli = ">=0.2.6,<2.0.0" -typing-extensions = [ - {version = ">=3.10.0.0", markers = "python_version < \"3.10\""}, - {version = "!=3.10.0.1", markers = "python_version >= \"3.10\""}, -] +tomli = {version = ">=1.1.0", markers = "python_full_version < \"3.11.0a7\""} +typing-extensions = {version = ">=3.10.0.0", markers = "python_version < \"3.10\""} [package.extras] colorama = ["colorama (>=0.4.3)"] d = ["aiohttp (>=3.7.4)"] jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"] -python2 = ["typed-ast (>=1.4.3)"] uvloop = ["uvloop (>=0.15.2)"] [[package]] @@ -221,11 +240,11 @@ colorama = {version = "*", markers = "platform_system == \"Windows\""} [[package]] name = "colorama" -version = "0.4.4" +version = "0.4.6" description = "Cross-platform colored terminal text." category = "main" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" [[package]] name = "coverage" @@ -238,17 +257,6 @@ python-versions = ">=3.7" [package.extras] toml = ["tomli"] -[[package]] -name = "croniter" -version = "1.3.5" -description = "croniter provides iteration for datetime object with cron like format" -category = "main" -optional = false -python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" - -[package.dependencies] -python-dateutil = "*" - [[package]] name = "cryptography" version = "37.0.2" @@ -276,6 +284,17 @@ category = "main" optional = true python-versions = ">=3.5" +[[package]] +name = "dill" +version = "0.3.6" +description = "serialize all of python" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.extras] +graph = ["objgraph (>=1.7.2)"] + [[package]] name = "distlib" version = "0.3.4" @@ -441,7 +460,7 @@ name = "isort" version = "5.10.1" description = "A Python utility / library to sort Python imports." category = "main" -optional = true +optional = false python-versions = ">=3.6.1,<4.0" [package.extras] @@ -495,6 +514,14 @@ SecretStorage = {version = ">=3.2", markers = "sys_platform == \"linux\""} docs = ["sphinx", "jaraco.packaging (>=9)", "rst.linker (>=1.9)", "jaraco.tidelift (>=1.4)"] testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-cov", "pytest-enabler (>=1.0.1)", "pytest-black (>=0.3.7)", "pytest-mypy (>=0.9.1)"] +[[package]] +name = "lazy-object-proxy" +version = "1.8.0" +description = "A fast and thorough lazy object proxy." +category = "main" +optional = false +python-versions = ">=3.7" + [[package]] name = "matplotlib-inline" version = "0.1.3" @@ -511,7 +538,7 @@ name = "mccabe" version = "0.6.1" description = "McCabe checker, plugin for flake8" category = "main" -optional = true +optional = false python-versions = "*" [[package]] @@ -641,7 +668,7 @@ name = "platformdirs" version = "2.5.2" description = "A small Python module for determining appropriate platform-specific dirs, e.g. a \"user data dir\"." category = "main" -optional = true +optional = false python-versions = ">=3.7" [package.extras] @@ -789,6 +816,29 @@ dev = ["sphinx", "sphinx-rtd-theme", "zope.interface", "cryptography (>=3.3.1)", docs = ["sphinx", "sphinx-rtd-theme", "zope.interface"] tests = ["pytest (>=6.0.0,<7.0.0)", "coverage[toml] (==5.0.4)"] +[[package]] +name = "pylint" +version = "2.15.7" +description = "python code static checker" +category = "main" +optional = false +python-versions = ">=3.7.2" + +[package.dependencies] +astroid = ">=2.12.13,<=2.14.0-dev0" +colorama = {version = ">=0.4.5", markers = "sys_platform == \"win32\""} +dill = ">=0.2" +isort = ">=4.2.5,<6" +mccabe = ">=0.6,<0.8" +platformdirs = ">=2.2.0" +tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} +tomlkit = ">=0.10.1" +typing-extensions = {version = ">=3.10.0", markers = "python_version < \"3.10\""} + +[package.extras] +spelling = ["pyenchant (>=3.2,<4.0)"] +testutils = ["gitpython (>3)"] + [[package]] name = "pyparsing" version = "3.0.9" @@ -856,17 +906,6 @@ category = "main" optional = false python-versions = "*" -[[package]] -name = "pytz-deprecation-shim" -version = "0.1.0.post0" -description = "Shims to make deprecation of pytz easier" -category = "main" -optional = false -python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7" - -[package.dependencies] -tzdata = {version = "*", markers = "python_version >= \"3.6\""} - [[package]] name = "pyunifiprotect" version = "4.0.11" @@ -1020,7 +1059,15 @@ name = "tomli" version = "1.2.3" description = "A lil' TOML parser" category = "main" -optional = true +optional = false +python-versions = ">=3.6" + +[[package]] +name = "tomlkit" +version = "0.11.6" +description = "Style preserving TOML library" +category = "main" +optional = false python-versions = ">=3.6" [[package]] @@ -1148,30 +1195,6 @@ category = "main" optional = false python-versions = ">=3.7" -[[package]] -name = "tzdata" -version = "2022.1" -description = "Provider of IANA time zone data" -category = "main" -optional = false -python-versions = ">=2" - -[[package]] -name = "tzlocal" -version = "4.2" -description = "tzinfo object for the local timezone" -category = "main" -optional = false -python-versions = ">=3.6" - -[package.dependencies] -pytz-deprecation-shim = "*" -tzdata = {version = "*", markers = "platform_system == \"Windows\""} - -[package.extras] -devenv = ["black", "pyroma", "pytest-cov", "zest.releaser"] -test = ["pytest-mock (>=3.3)", "pytest (>=4.3)"] - [[package]] name = "urllib3" version = "1.26.9" @@ -1219,6 +1242,14 @@ category = "main" optional = true python-versions = "*" +[[package]] +name = "wrapt" +version = "1.14.1" +description = "Module for decorators, wrappers and monkey patching." +category = "main" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7" + [[package]] name = "yarl" version = "1.7.2" @@ -1250,13 +1281,9 @@ test = ["pytest", "black", "isort", "mypy", "flake8", "flake8-docstrings", "pyte [metadata] lock-version = "1.1" python-versions = ">=3.9.0,<4.0" -content-hash = "8a8ba8afef1dae213be16fa56b914cd35f56c70d034203c73f068dc7d1ae2268" +content-hash = "c4291adb62da91a97e4d6d5a4aac0be648838b95f96ed93228fd8aacdfce48b0" [metadata.files] -aiocron = [ - {file = "aiocron-1.8-py3-none-any.whl", hash = "sha256:b6313214c311b62aa2220e872b94139b648631b3103d062ef29e5d3230ddce6d"}, - {file = "aiocron-1.8.tar.gz", hash = "sha256:48546513faf2eb7901e65a64eba7b653c80106ed00ed9ca3419c3d10b6555a01"}, -] aiofiles = [ {file = "aiofiles-0.8.0-py3-none-any.whl", hash = "sha256:7a973fc22b29e9962d0897805ace5856e6a566ab1f0c8e5c91ff6c866519c937"}, {file = "aiofiles-0.8.0.tar.gz", hash = "sha256:8334f23235248a3b2e83b2c3a78a22674f39969b96397126cc93664d9a901e59"}, @@ -1335,6 +1362,7 @@ aiohttp = [ {file = "aiohttp-3.8.1-cp39-cp39-win_amd64.whl", hash = "sha256:1c182cb873bc91b411e184dab7a2b664d4fea2743df0e4d57402f7f3fa644bac"}, {file = "aiohttp-3.8.1.tar.gz", hash = "sha256:fc5471e1a54de15ef71c1bc6ebe80d4dc681ea600e68bfd1cbce40427f0b7578"}, ] +aiorun = [] aioshutil = [ {file = "aioshutil-1.1-py3-none-any.whl", hash = "sha256:4c17e1da55cf928b4a85bd6ff5e4f1560cf21db7a16b5da5844f8f3edf3e2895"}, {file = "aioshutil-1.1.tar.gz", hash = "sha256:d2e8d6baddab13137410b27ce24f39ce9889684cb47503d5af182ea8d038b0f1"}, @@ -1343,10 +1371,12 @@ aiosignal = [ {file = "aiosignal-1.2.0-py3-none-any.whl", hash = "sha256:26e62109036cd181df6e6ad646f91f0dcfd05fe16d0cb924138ff2ab75d64e3a"}, {file = "aiosignal-1.2.0.tar.gz", hash = "sha256:78ed67db6c7b7ced4f98e495e572106d5c432a93e1ddd1bf475e1dc05f5b7df2"}, ] +aiosqlite = [] appnope = [ {file = "appnope-0.1.3-py2.py3-none-any.whl", hash = "sha256:265a455292d0bd8a72453494fa24df5a11eb18373a60c7c0430889f22548605e"}, {file = "appnope-0.1.3.tar.gz", hash = "sha256:02bd91c4de869fbb1e1c50aafc4098827a7a54ab2f39d9dcba6c9547ed920e24"}, ] +astroid = [] asttokens = [ {file = "asttokens-2.0.5-py2.py3-none-any.whl", hash = "sha256:0844691e88552595a6f4a4281a9f7f79b8dd45ca4ccea82e5e05b4bbdb76705c"}, {file = "asttokens-2.0.5.tar.gz", hash = "sha256:9a54c114f02c7a9480d56550932546a3f1fe71d8a02f1bc7ccd0ee3ee35cf4d5"}, @@ -1367,10 +1397,7 @@ backcall = [ {file = "backcall-0.2.0-py2.py3-none-any.whl", hash = "sha256:fbbce6a29f263178a1f7915c1940bde0ec2b2a967566fe1c65c1dfb7422bd255"}, {file = "backcall-0.2.0.tar.gz", hash = "sha256:5cbdbf27be5e7cfadb448baf0aa95508f91f2bbc6c6437cd9cd06e2a4c215e1e"}, ] -black = [ - {file = "black-21.12b0-py3-none-any.whl", hash = "sha256:a615e69ae185e08fdd73e4715e260e2479c861b5740057fde6e8b4e3b7dd589f"}, - {file = "black-21.12b0.tar.gz", hash = "sha256:77b80f693a569e2e527958459634f18df9b0ba2625ba4e0c2d5da5be42e6f2b3"}, -] +black = [] bleach = [ {file = "bleach-5.0.0-py3-none-any.whl", hash = "sha256:08a1fe86d253b5c88c92cc3d810fd8048a16d15762e1e5b74d502256e5926aa1"}, {file = "bleach-5.0.0.tar.gz", hash = "sha256:c6d6cc054bdc9c83b48b8083e236e5f00f238428666d2ce2e083eaa5fd568565"}, @@ -1447,10 +1474,7 @@ click = [ {file = "click-8.0.1-py3-none-any.whl", hash = "sha256:fba402a4a47334742d782209a7c79bc448911afe1149d07bdabdf480b3e2f4b6"}, {file = "click-8.0.1.tar.gz", hash = "sha256:8c04c11192119b1ef78ea049e0a6f0463e4c48ef00a30160c704337586f3ad7a"}, ] -colorama = [ - {file = "colorama-0.4.4-py2.py3-none-any.whl", hash = "sha256:9f47eda37229f68eee03b24b9748937c7dc3868f906e8ba69fbcbdd3bc5dc3e2"}, - {file = "colorama-0.4.4.tar.gz", hash = "sha256:5941b2b48a20143d2267e95b1c2a7603ce057ee39fd88e7329b0c292aa16869b"}, -] +colorama = [] coverage = [ {file = "coverage-6.4.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:f1d5aa2703e1dab4ae6cf416eb0095304f49d004c39e9db1d86f57924f43006b"}, {file = "coverage-6.4.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:4ce1b258493cbf8aec43e9b50d89982346b98e9ffdfaae8ae5793bc112fb0068"}, @@ -1494,10 +1518,6 @@ coverage = [ {file = "coverage-6.4.1-pp36.pp37.pp38-none-any.whl", hash = "sha256:4803e7ccf93230accb928f3a68f00ffa80a88213af98ed338a57ad021ef06815"}, {file = "coverage-6.4.1.tar.gz", hash = "sha256:4321f075095a096e70aff1d002030ee612b65a205a0a0f5b815280d5dc58100c"}, ] -croniter = [ - {file = "croniter-1.3.5-py2.py3-none-any.whl", hash = "sha256:4f72faca42c00beb6e30907f1315145f43dfbe5ec0ad4ada24b4c0d57b86a33a"}, - {file = "croniter-1.3.5.tar.gz", hash = "sha256:7592fc0e8a00d82af98dfa2768b75983b6fb4c2adc8f6d0d7c931a715b7cefee"}, -] cryptography = [ {file = "cryptography-37.0.2-cp36-abi3-macosx_10_10_universal2.whl", hash = "sha256:ef15c2df7656763b4ff20a9bc4381d8352e6640cfeb95c2972c38ef508e75181"}, {file = "cryptography-37.0.2-cp36-abi3-macosx_10_10_x86_64.whl", hash = "sha256:3c81599befb4d4f3d7648ed3217e00d21a9341a9a688ecdd615ff72ffbed7336"}, @@ -1526,6 +1546,7 @@ decorator = [ {file = "decorator-5.1.1-py3-none-any.whl", hash = "sha256:b8c3f85900b9dc423225913c5aace94729fe1fa9763b38939a95226f02d37186"}, {file = "decorator-5.1.1.tar.gz", hash = "sha256:637996211036b6385ef91435e4fae22989472f9d571faba8927ba8253acbc330"}, ] +dill = [] distlib = [ {file = "distlib-0.3.4-py2.py3-none-any.whl", hash = "sha256:6564fe0a8f51e734df6333d08b8b94d4ea8ee6b99b5ed50613f731fd4089f34b"}, {file = "distlib-0.3.4.zip", hash = "sha256:e4b58818180336dc9c529bfb9a0b58728ffc09ad92027a3f30b7cd91e3458579"}, @@ -1650,6 +1671,7 @@ keyring = [ {file = "keyring-23.5.1-py3-none-any.whl", hash = "sha256:9ef58314bcc823f426b49ec787539a2d73571b37de4cd498f839803b01acff1e"}, {file = "keyring-23.5.1.tar.gz", hash = "sha256:dee502cdf18a98211bef428eea11456a33c00718b2f08524fd5727c7f424bffd"}, ] +lazy-object-proxy = [] matplotlib-inline = [ {file = "matplotlib-inline-0.1.3.tar.gz", hash = "sha256:a04bfba22e0d1395479f866853ec1ee28eea1485c1d69a6faf00dc3e24ff34ee"}, {file = "matplotlib_inline-0.1.3-py3-none-any.whl", hash = "sha256:aed605ba3b72462d64d475a21a9296f400a19c4f74a31b59103d2a99ffd5aa5c"}, @@ -1906,6 +1928,7 @@ pyjwt = [ {file = "PyJWT-2.4.0-py3-none-any.whl", hash = "sha256:72d1d253f32dbd4f5c88eaf1fdc62f3a19f676ccbadb9dbc5d07e951b2b26daf"}, {file = "PyJWT-2.4.0.tar.gz", hash = "sha256:d42908208c699b3b973cbeb01a969ba6a96c821eefb1c5bfe4c390c01d67abba"}, ] +pylint = [] pyparsing = [ {file = "pyparsing-3.0.9-py3-none-any.whl", hash = "sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc"}, {file = "pyparsing-3.0.9.tar.gz", hash = "sha256:2b020ecf7d21b687f219b71ecad3631f644a47f01403fa1d1036b0c6416d70fb"}, @@ -1926,10 +1949,6 @@ pytz = [ {file = "pytz-2022.1-py2.py3-none-any.whl", hash = "sha256:e68985985296d9a66a881eb3193b0906246245294a881e7c8afe623866ac6a5c"}, {file = "pytz-2022.1.tar.gz", hash = "sha256:1e760e2fe6a8163bc0b3d9a19c4f84342afa0a2affebfaa84b01b978a02ecaa7"}, ] -pytz-deprecation-shim = [ - {file = "pytz_deprecation_shim-0.1.0.post0-py2.py3-none-any.whl", hash = "sha256:8314c9692a636c8eb3bda879b9f119e350e93223ae83e70e80c31675a0fdc1a6"}, - {file = "pytz_deprecation_shim-0.1.0.post0.tar.gz", hash = "sha256:af097bae1b616dde5c5744441e2ddc69e74dfdcb0c263129610d85b87445a59d"}, -] pyunifiprotect = [] pywin32-ctypes = [ {file = "pywin32-ctypes-0.2.0.tar.gz", hash = "sha256:24ffc3b341d457d48e8922352130cf2644024a4ff09762a2261fd34c36ee5942"}, @@ -2010,6 +2029,7 @@ tomli = [ {file = "tomli-1.2.3-py3-none-any.whl", hash = "sha256:e3069e4be3ead9668e21cb9b074cd948f7b3113fd9c8bba083f48247aab8b11c"}, {file = "tomli-1.2.3.tar.gz", hash = "sha256:05b6166bff487dc068d322585c7ea4ef78deed501cc124060e0f238e89a9231f"}, ] +tomlkit = [] tox = [ {file = "tox-3.25.0-py2.py3-none-any.whl", hash = "sha256:0805727eb4d6b049de304977dfc9ce315a1938e6619c3ab9f38682bb04662a5a"}, {file = "tox-3.25.0.tar.gz", hash = "sha256:37888f3092aa4e9f835fc8cc6dadbaaa0782651c41ef359e3a5743fcb0308160"}, @@ -2046,14 +2066,6 @@ typing-extensions = [ {file = "typing_extensions-4.2.0-py3-none-any.whl", hash = "sha256:6657594ee297170d19f67d55c05852a874e7eb634f4f753dbd667855e07c1708"}, {file = "typing_extensions-4.2.0.tar.gz", hash = "sha256:f1c24655a0da0d1b67f07e17a5e6b2a105894e6824b92096378bb3668ef02376"}, ] -tzdata = [ - {file = "tzdata-2022.1-py2.py3-none-any.whl", hash = "sha256:238e70234214138ed7b4e8a0fab0e5e13872edab3be586ab8198c407620e2ab9"}, - {file = "tzdata-2022.1.tar.gz", hash = "sha256:8b536a8ec63dc0751342b3984193a3118f8fca2afe25752bb9b7fffd398552d3"}, -] -tzlocal = [ - {file = "tzlocal-4.2-py3-none-any.whl", hash = "sha256:89885494684c929d9191c57aa27502afc87a579be5cdd3225c77c463ea043745"}, - {file = "tzlocal-4.2.tar.gz", hash = "sha256:ee5842fa3a795f023514ac2d801c4a81d1743bbe642e3940143326b3a00addd7"}, -] urllib3 = [ {file = "urllib3-1.26.9-py2.py3-none-any.whl", hash = "sha256:44ece4d53fb1706f667c9bd1c648f5469a2ec925fcf3a776667042d645472c14"}, {file = "urllib3-1.26.9.tar.gz", hash = "sha256:aabaf16477806a5e1dd19aa41f8c2b7950dd3c746362d7e3223dbe6de6ac448e"}, @@ -2070,6 +2082,7 @@ webencodings = [ {file = "webencodings-0.5.1-py2.py3-none-any.whl", hash = "sha256:a0af1213f3c2226497a97e2b3aa01a7e4bee4f403f95be16fc9acd2947514a78"}, {file = "webencodings-0.5.1.tar.gz", hash = "sha256:b36a1c245f2d304965eb4e0a82848379241dc04b865afcc4aab16748587e1923"}, ] +wrapt = [] yarl = [ {file = "yarl-1.7.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:f2a8508f7350512434e41065684076f640ecce176d262a7d54f0da41d99c5a95"}, {file = "yarl-1.7.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:da6df107b9ccfe52d3a48165e48d72db0eca3e3029b5b8cb4fe6ee3cb870ba8b"}, diff --git a/pyproject.toml b/pyproject.toml index f9b0a1e..6066ecb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ packages = [ python = ">=3.9.0,<4.0" click = "8.0.1" -black = { version = "^21.5b2", optional = true} +black = { version = "^22.10.0", optional = true} isort = { version = "^5.8.0", optional = true} flake8 = { version = "^3.9.2", optional = true} flake8-docstrings = { version = "^1.6.0", optional = true } @@ -40,10 +40,13 @@ toml = {version = "^0.10.2", optional = true} bump2version = {version = "^1.0.1", optional = true} tox-asdf = {version = "^0.1.0", optional = true} pyunifiprotect = "^4.0.11" -aiocron = "^1.8" ipdb = {version = "^0.13.9", optional = true} types-pytz = {version = "^2021.3.5", optional = true} types-cryptography = {version = "^3.3.18", optional = true} +aiosqlite = "^0.17.0" +python-dateutil = "^2.8.2" +aiorun = "^2022.11.1" +pylint = {version = "^2.15.6", extras = ["dev"]} [tool.poetry.extras] test = [ diff --git a/unifi_protect_backup/__init__.py b/unifi_protect_backup/__init__.py index d6c2b3c..0db4653 100644 --- a/unifi_protect_backup/__init__.py +++ b/unifi_protect_backup/__init__.py @@ -4,4 +4,9 @@ __author__ = """sebastian.goscik""" __email__ = 'sebastian@goscik.com' __version__ = '0.7.4' -from .unifi_protect_backup import UnifiProtectBackup +# from .unifi_protect_backup import UnifiProtectBackup +from .downloader import VideoDownloader +from .uploader import VideoUploader +from .event_listener import EventListener +from .purge import Purge +from .missing_event_checker import MissingEventChecker diff --git a/unifi_protect_backup/cli.py b/unifi_protect_backup/cli.py index 105f709..efc9503 100644 --- a/unifi_protect_backup/cli.py +++ b/unifi_protect_backup/cli.py @@ -3,8 +3,10 @@ import asyncio import click +from aiorun import run -from unifi_protect_backup import UnifiProtectBackup, __version__ +from unifi_protect_backup import __version__ +from unifi_protect_backup.unifi_protect_backup import UnifiProtectBackup DETECTION_TYPES = ["motion", "person", "vehicle", "ring"] @@ -102,11 +104,17 @@ all warnings, and websocket data -vvvvv: Log websocket data, command output, all debug messages, all info messages and all warnings """, ) +@click.option( + '--sqlite_path', + default='events.sqlite', + envvar='SQLITE_PATH', + help="Path to the SQLite database to use/create", +) def main(**kwargs): """A Python based tool for backing up Unifi Protect event clips as they occur.""" loop = asyncio.get_event_loop() event_listener = UnifiProtectBackup(**kwargs) - loop.run_until_complete(event_listener.start()) + run(event_listener.start()) if __name__ == "__main__": diff --git a/unifi_protect_backup/downloader.py b/unifi_protect_backup/downloader.py new file mode 100644 index 0000000..3a43385 --- /dev/null +++ b/unifi_protect_backup/downloader.py @@ -0,0 +1,142 @@ +import asyncio +import json +import logging +import shutil +from datetime import datetime, timedelta, timezone + +import pytz +from aiohttp.client_exceptions import ClientPayloadError +from pyunifiprotect import ProtectApiClient +from pyunifiprotect.data.nvr import Event +from pyunifiprotect.data.types import EventType + +from unifi_protect_backup.utils import SubprocessException, VideoQueue, get_camera_name, human_readable_size + +logger = logging.getLogger(__name__) + + +async def get_video_length(video: bytes) -> float: + """Uses ffprobe to get the length of the video file passed in as a byte stream""" + cmd = 'ffprobe -v quiet -show_streams -select_streams v:0 -of json -' + proc = await asyncio.create_subprocess_shell( + cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await proc.communicate(video) + if proc.returncode == 0: + logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore + logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore + + json_data = json.loads(stdout.decode()) + return float(json_data['streams'][0]['duration']) + + else: + raise SubprocessException(stdout.decode(), stderr.decode(), proc.returncode) + + +class VideoDownloader: + """Downloads event video clips from Unifi Protect""" + + def __init__(self, protect: ProtectApiClient, download_queue: asyncio.Queue, buffer_size: int = 256): + self._protect: ProtectApiClient = protect + self._download_queue: asyncio.Queue = download_queue + self.video_queue = VideoQueue(buffer_size * 1024 * 1024) + + # Check if `ffprobe` is available + ffprobe = shutil.which('ffprobe') + if ffprobe is not None: + logger.debug(f"ffprobe found: {ffprobe}") + self._has_ffprobe = True + else: + self._has_ffprobe = False + + async def start(self): + """Main loop""" + logger.info("Starting Downloader") + while True: + try: + event = await self._download_queue.get() + + # Fix timezones since pyunifiprotect sets all timestamps to UTC. Instead localize them to + # the timezone of the unifi protect NVR. + event.start = event.start.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone) + event.end = event.end.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone) + + logger.info(f"Downloading event: {event.id}") + logger.debug(f"Remaining Download Queue: {self._download_queue.qsize()}") + output_queue_current_size = human_readable_size(self.video_queue.qsize()) + output_queue_max_size = human_readable_size(self.video_queue.maxsize) + logger.debug(f"Video Download Buffer: {output_queue_current_size}/{output_queue_max_size}") + logger.debug(f" Camera: {await get_camera_name(self._protect, event.camera_id)}") + if event.type == EventType.SMART_DETECT: + logger.debug(f" Type: {event.type} ({', '.join(event.smart_detect_types)})") + else: + logger.debug(f" Type: {event.type}") + logger.debug(f" Start: {event.start.strftime('%Y-%m-%dT%H-%M-%S')} ({event.start.timestamp()})") + logger.debug(f" End: {event.end.strftime('%Y-%m-%dT%H-%M-%S')} ({event.end.timestamp()})") + duration = (event.end - event.start).total_seconds() + logger.debug(f" Duration: {duration}s") + + # Unifi protect does not return full video clips if the clip is requested too soon. + # There are two issues at play here: + # - Protect will only cut a clip on an keyframe which happen every 5s + # - Protect's pipeline needs a finite amount of time to make a clip available + # So we will wait 1.5x the keyframe interval to ensure that there is always ample video + # stored and Protect can return a full clip (which should be at least the length requested, + # but often longer) + time_since_event_ended = datetime.utcnow().replace(tzinfo=timezone.utc) - event.end + sleep_time = (timedelta(seconds=5 * 1.5) - time_since_event_ended).total_seconds() + if sleep_time > 0: + logger.debug(f" Sleeping ({sleep_time}s) to ensure clip is ready to download...") + await asyncio.sleep(sleep_time) + + video = await self._download(event) + if video is None: + continue + + # Get the actual length of the downloaded video using ffprobe + if self._has_ffprobe: + await self._check_video_length(video, duration) + + await self.video_queue.put((event, video)) + logger.debug("Added to upload queue") + + except Exception as e: + logger.warn(f"Unexpected exception occurred, abandoning event {event.id}:") + logger.exception(e) + + async def _download(self, event: Event) -> bytes: + """Downloads the video clip for the given event""" + logger.debug(" Downloading video...") + for x in range(5): + try: + video = await self._protect.get_camera_video(event.camera_id, event.start, event.end) + assert isinstance(video, bytes) + break + except (AssertionError, ClientPayloadError, TimeoutError) as e: + logger.warn(f" Failed download attempt {x+1}, retying in 1s") + logger.exception(e) + await asyncio.sleep(1) + else: + logger.warn(f"Download failed after 5 attempts, abandoning event {event.id}:") + return + + logger.debug(f" Downloaded video size: {human_readable_size(len(video))}s") + return video + + async def _check_video_length(self, video, duration): + """Check if the downloaded event is at least the length of the event, warn otherwise + + It is expected for events to regularly be slightly longer than the event specified""" + try: + downloaded_duration = await get_video_length(video) + msg = f" Downloaded video length: {downloaded_duration:.3f}s" f"({downloaded_duration - duration:+.3f}s)" + if downloaded_duration < duration: + logger.warning(msg) + else: + logger.debug(msg) + except SubprocessException as e: + logger.warn(" `ffprobe` failed") + logger.exception(e) diff --git a/unifi_protect_backup/event_listener.py b/unifi_protect_backup/event_listener.py new file mode 100644 index 0000000..03bce20 --- /dev/null +++ b/unifi_protect_backup/event_listener.py @@ -0,0 +1,120 @@ +import logging +from time import sleep +import asyncio +from typing import List + +from pyunifiprotect.data.websocket import WSAction, WSSubscriptionMessage +from pyunifiprotect.data.nvr import Event +from pyunifiprotect.data.types import EventType +from pyunifiprotect.api import ProtectApiClient + +logger = logging.getLogger(__name__) + + +class EventListener: + """Listens to the unifi protect websocket for new events to backup""" + + def __init__( + self, + event_queue: asyncio.Queue, + protect: ProtectApiClient, + detection_types: List[str], + ignore_cameras: List[str], + ): + self._event_queue: asyncio.Queue = event_queue + self._protect: ProtectApiClient = protect + self._unsub = None + self.detection_types: List[str] = detection_types + self.ignore_cameras: List[str] = ignore_cameras + + async def start(self): + """Main Loop""" + logger.debug("Subscribed to websocket") + self._unsub = self._protect.subscribe_websocket(self._websocket_callback) + + while True: + await asyncio.sleep(60) + await self._check_websocket_and_reconnect() + + def _websocket_callback(self, msg: WSSubscriptionMessage) -> None: + """Callback for "EVENT" websocket messages. + + Filters the incoming events, and puts completed events onto the download queue + + Args: + msg (Event): Incoming event data + """ + logger.websocket_data(msg) # type: ignore + + assert isinstance(msg.new_obj, Event) + if msg.action != WSAction.UPDATE: + return + if msg.new_obj.camera_id in self.ignore_cameras: + return + if msg.new_obj.end is None: + return + if msg.new_obj.type not in [EventType.MOTION, EventType.SMART_DETECT, EventType.RING]: + return + if msg.new_obj.type is EventType.MOTION and "motion" not in self.detection_types: + logger.extra_debug(f"Skipping unwanted motion detection event: {msg.new_obj.id}") # type: ignore + return + if msg.new_obj.type is EventType.RING and "ring" not in self.detection_types: + logger.extra_debug(f"Skipping unwanted ring event: {msg.new_obj.id}") # type: ignore + return + elif msg.new_obj.type is EventType.SMART_DETECT: + for event_smart_detection_type in msg.new_obj.smart_detect_types: + if event_smart_detection_type not in self.detection_types: + logger.extra_debug( # type: ignore + f"Skipping unwanted {event_smart_detection_type} detection event: {msg.new_obj.id}" + ) + return + + # TODO: Will this even work? I think it will block the async loop + while self._event_queue.full(): + logger.extra_debug("Event queue full, waiting 1s...") + sleep(1) + + self._event_queue.put_nowait(msg.new_obj) + + # Unifi protect has started sending the event id in the websocket as a {event_id}-{camera_id} but when the + # API is queried they only have {event_id}. Keeping track of these both of these would be complicated so + # instead we fudge the ID here to match what the API returns + if '-' in msg.new_obj.id: + msg.new_obj.id = msg.new_obj.id.split('-')[0] + + logger.debug(f"Adding event {msg.new_obj.id} to queue (Current download queue={self._event_queue.qsize()})") + + async def _check_websocket_and_reconnect(self): + """Checks for websocket disconnect and triggers a reconnect""" + + logger.extra_debug("Checking the status of the websocket...") + if self._protect.check_ws(): + logger.extra_debug("Websocket is connected.") + else: + logger.warn("Lost connection to Unifi Protect.") + + # Unsubscribe, close the session. + self._unsub() + await self._protect.close_session() + + while True: + logger.warn("Attempting reconnect...") + + try: + # Start the pyunifiprotect connection by calling `update` + await self._protect.close_session() + self._protect._bootstrap = None + await self._protect.update(force=True) + if self._protect.check_ws(): + self._unsub = self._protect.subscribe_websocket(self._websocket_callback) + break + else: + logger.warn("Unable to establish connection to Unifi Protect") + except Exception as e: + logger.warn("Unexpected exception occurred while trying to reconnect:") + logger.exception(e) + + # Back off for a little while + await asyncio.sleep(10) + + logger.info("Re-established connection to Unifi Protect and to the websocket.") diff --git a/unifi_protect_backup/missing_event_checker.py b/unifi_protect_backup/missing_event_checker.py new file mode 100644 index 0000000..e361679 --- /dev/null +++ b/unifi_protect_backup/missing_event_checker.py @@ -0,0 +1,83 @@ +import asyncio +import logging +from datetime import datetime +from typing import List + +import aiosqlite +from dateutil.relativedelta import relativedelta + +from pyunifiprotect import ProtectApiClient +from pyunifiprotect.data.types import EventType + +logger = logging.getLogger(__name__) + + +class MissingEventChecker: + """Periodically checks if any unifi protect events exist within the retention period that are not backed up""" + + def __init__( + self, + protect: ProtectApiClient, + db: aiosqlite.Connection, + event_queue: asyncio.Queue, + retention: relativedelta, + detection_types: List[str], + ignore_cameras: List[str], + interval: int = 60 * 5, + ) -> None: + self._protect: ProtectApiClient = protect + self._db: aiosqlite.Connection = db + self._event_queue: asyncio.Queue = event_queue + self.retention: relativedelta = retention + self.detection_types: List[str] = detection_types + self.ignore_cameras: List[str] = ignore_cameras + self.interval: int = interval + + async def start(self): + """main loop""" + logger.info("Starting Missing Event Checker") + while True: + try: + logger.extra_debug("Running check for missing events...") + # Get list of events that need to be backed up from unifi protect + unifi_events = await self._protect.get_events( + start=datetime.now() - self.retention, + end=datetime.now(), + types=[EventType.MOTION, EventType.SMART_DETECT, EventType.RING], + ) + unifi_events = {event.id: event for event in unifi_events} + + # Get list of events that have been backed up from the database + + # events(id, type, camera_id, start, end) + async with self._db.execute("SELECT * FROM events") as cursor: + rows = await cursor.fetchall() + db_event_ids = {row[0] for row in rows} + + # Prevent re-adding events currently in the download queue + downloading_event_ids = {event.id for event in self._event_queue._queue} + + missing_event_ids = set(unifi_events.keys()) - (db_event_ids | downloading_event_ids) + + for event_id in missing_event_ids: + event = unifi_events[event_id] + if event.start is None or event.end is None: + continue # This event is still on-going + if event.type is EventType.MOTION and "motion" not in self.detection_types: + continue + if event.type is EventType.RING and "ring" not in self.detection_types: + continue + elif event.type is EventType.SMART_DETECT: + for event_smart_detection_type in event.smart_detect_types: + if event_smart_detection_type not in self.detection_types: + continue + else: + logger.warning( + f" Adding missing event to backup queue: {event.id} ({event.type}) ({event.start.strftime('%Y-%m-%dT%H-%M-%S')} - {event.end.strftime('%Y-%m-%dT%H-%M-%S')})" + ) + await self._event_queue.put(event) + except Exception as e: + logger.warn(f"Unexpected exception occurred during missing event check:") + logger.exception(e) + + await asyncio.sleep(self.interval) diff --git a/unifi_protect_backup/purge.py b/unifi_protect_backup/purge.py new file mode 100644 index 0000000..f9852a1 --- /dev/null +++ b/unifi_protect_backup/purge.py @@ -0,0 +1,75 @@ +import asyncio +import logging +import time +from datetime import datetime + +import aiosqlite +from dateutil.relativedelta import relativedelta + +from unifi_protect_backup.utils import parse_rclone_retention, run_command + +logger = logging.getLogger(__name__) + + +async def wait_until(dt): + # sleep until the specified datetime + now = datetime.now() + await asyncio.sleep((dt - now).total_seconds()) + + +async def delete_file(file_path): + returncode, stdout, stderr = await run_command(f'rclone delete -vv "{file_path}"') + if returncode != 0: + logger.warn(f" Failed to delete file: '{file_path}'") + + +async def tidy_empty_dirs(base_dir_path): + returncode, stdout, stderr = await run_command(f'rclone rmdirs -vv --ignore-errors --leave-root "{base_dir_path}"') + if returncode != 0: + logger.warn(f" Failed to tidy empty dirs") + + +class Purge: + """Deletes old files from rclone remotes""" + + def __init__(self, db: aiosqlite.Connection, retention: relativedelta, rclone_destination: str, interval: int = 60): + self._db: aiosqlite.Connection = db + self.retention: relativedelta = retention + self.rclone_destination: str = rclone_destination + self.interval: int = interval + + async def start(self): + """Main loop - runs forever""" + while True: + try: + deleted_a_file = False + + # For every event older than the retention time + retention_oldest_time = time.mktime((datetime.now() - self.retention).timetuple()) + async with self._db.execute( + f"SELECT * FROM events WHERE end < {retention_oldest_time}" + ) as event_cursor: + async for event_id, event_type, camera_id, event_start, event_end in event_cursor: + + logger.info(f"Purging event: {event_id}.") + + # For every backup for this event + async with self._db.execute(f"SELECT * FROM backups WHERE id = '{event_id}'") as backup_cursor: + async for _, remote, file_path in backup_cursor: + logger.debug(f" Deleted: {remote}:{file_path}") + await delete_file(f"{remote}:{file_path}") + deleted_a_file = True + + # delete event from database + # entries in the `backups` table are automatically deleted by sqlite triggers + await self._db.execute(f"DELETE FROM events WHERE id = '{event_id}'") + await self._db.commit() + + if deleted_a_file: + await tidy_empty_dirs(self.rclone_destination) + + except Exception as e: + logger.warn(f"Unexpected exception occurred during purge:") + logger.exception(e) + + await asyncio.sleep(self.interval) diff --git a/unifi_protect_backup/unifi_protect_backup.py b/unifi_protect_backup/unifi_protect_backup.py index 4c6fe5a..31b7386 100644 --- a/unifi_protect_backup/unifi_protect_backup.py +++ b/unifi_protect_backup/unifi_protect_backup.py @@ -1,44 +1,23 @@ """Main module.""" import asyncio -import json import logging -import pathlib -import re +import os import shutil -from asyncio.exceptions import TimeoutError -from datetime import datetime, timedelta, timezone +from cmath import log +from pprint import pprint +from time import sleep from typing import Callable, List, Optional -import aiocron -import pytz -from aiohttp.client_exceptions import ClientPayloadError -from pyunifiprotect import NvrError, ProtectApiClient -from pyunifiprotect.data.nvr import Event -from pyunifiprotect.data.types import EventType, ModelType -from pyunifiprotect.data.websocket import WSAction, WSSubscriptionMessage +import aiosqlite +from pyunifiprotect import ProtectApiClient +from pyunifiprotect.data.types import ModelType + +from unifi_protect_backup import EventListener, MissingEventChecker, Purge, VideoDownloader, VideoUploader +from unifi_protect_backup.utils import SubprocessException, parse_rclone_retention, run_command logger = logging.getLogger(__name__) - -class SubprocessException(Exception): - """Exception class for when rclone does not exit with `0`.""" - - def __init__(self, stdout, stderr, returncode): - """Exception class for when rclone does not exit with `0`. - - Args: - stdout (str): What rclone output to stdout - stderr (str): What rclone output to stderr - returncode (str): The return code of the rclone process - """ - super().__init__() - self.stdout: str = stdout - self.stderr: str = stderr - self.returncode: int = returncode - - def __str__(self): - """Turns excpetion into a human readable form.""" - return f"Return Code: {self.returncode}\nStdout:\n{self.stdout}\nStderr:\n{self.stderr}" +# TODO: https://github.com/cjrh/aiorun#id6 (smart shield) def add_logging_level(levelName: str, levelNum: int, methodName: Optional[str] = None) -> None: @@ -124,10 +103,46 @@ def setup_logging(verbosity: int) -> None: logging.DEBUG - 2, ) - format = "{asctime} [{levelname}]:{name: <20}:\t{message}" + format = "{asctime} [{levelname:^11s}] {name:<42} :\t{message}" date_format = "%Y-%m-%d %H:%M:%S" style = '{' + logger = logging.getLogger("unifi_protect_backup") + + sh = logging.StreamHandler() + formatter = logging.Formatter(format, date_format, style) + sh.setFormatter(formatter) + + def decorate_emit(fn): + # add methods we need to the class + def new(*args): + levelno = args[0].levelno + if levelno >= logging.CRITICAL: + color = '\x1b[31;1m' # RED + elif levelno >= logging.ERROR: + color = '\x1b[31;1m' # RED + elif levelno >= logging.WARNING: + color = '\x1b[33;1m' # YELLOW + elif levelno >= logging.INFO: + color = '\x1b[32;1m' # GREEN + elif levelno >= logging.DEBUG: + color = '\x1b[36;1m' # CYAN + elif levelno >= logging.EXTRA_DEBUG: + color = '\x1b[35;1m' # MAGENTA + else: + color = '\x1b[0m' + # add colored *** in the beginning of the message + + args[0].levelname = f"{color}{args[0].levelname:^11s}\x1b[0m" + + return fn(*args) + + return new + + sh.emit = decorate_emit(sh.emit) + logger.addHandler(sh) + logger.propagate = False + if verbosity == 0: logging.basicConfig(level=logging.WARN, format=format, style=style, datefmt=date_format) logger.setLevel(logging.INFO) @@ -143,24 +158,20 @@ def setup_logging(verbosity: int) -> None: elif verbosity == 4: logging.basicConfig(level=logging.INFO, format=format, style=style, datefmt=date_format) logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore - elif verbosity == 5: + elif verbosity >= 5: logging.basicConfig(level=logging.DEBUG, format=format, style=style, datefmt=date_format) logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore -def human_readable_size(num): - """Turns a number into a human readable number with ISO/IEC 80000 binary prefixes. - - Based on: https://stackoverflow.com/a/1094933 - - Args: - num (int): The number to be converted into human readable format - """ - for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]: - if abs(num) < 1024.0: - return f"{num:3.1f}{unit}" - num /= 1024.0 - raise ValueError("`num` too large, ran out of prefixes") +async def create_database(path: str): + """Creates sqlite database and creates the events abd backups tables""" + db = await aiosqlite.connect(path) + await db.execute("CREATE TABLE events(id PRIMARY KEY, type, camera_id, start REAL, end REAL)") + await db.execute( + "CREATE TABLE backups(id REFERENCES events(id) ON DELETE CASCADE, remote, path, PRIMARY KEY (id, remote))" + ) + await db.commit() + return db class UnifiProtectBackup: @@ -168,19 +179,6 @@ class UnifiProtectBackup: Listens to the Unifi Protect websocket for events. When a completed motion or smart detection event is detected, it will download the clip and back it up using rclone - - Attributes: - retention (str): How long should event clips be backed up for. Format as per the - `--max-age` argument of `rclone` - (https://rclone.org/filtering/#max-age-don-t-transfer-any-file-older-than-this) - rclone_args (str): Extra args passed directly to `rclone rcat`. - ignore_cameras (List[str]): List of camera IDs for which to not backup events - verbose (int): How verbose to setup logging, see :func:`setup_logging` for details. - detection_types (List[str]): List of which detection types to backup. - file_structure_format (str): A Python format string for output file path - _download_queue (asyncio.Queue): Queue of events that need to be backed up - _unsub (Callable): Unsubscribe from the websocket callback - _has_ffprobe (bool): If ffprobe was found on the host """ def __init__( @@ -196,6 +194,7 @@ class UnifiProtectBackup: ignore_cameras: List[str], file_structure_format: str, verbose: int, + sqlite_path: str = "events.sqlite", port: int = 443, ): """Will configure logging settings and the Unifi Protect API (but not actually connect). @@ -218,6 +217,7 @@ class UnifiProtectBackup: ignore_cameras (List[str]): List of camera IDs for which to not backup events. file_structure_format (str): A Python format string for output file path. verbose (int): How verbose to setup logging, see :func:`setup_logging` for details. + sqlite_path (str): Path where to find/create sqlite database """ setup_logging(verbose) @@ -238,9 +238,10 @@ class UnifiProtectBackup: logger.debug(f" {verbose=}") logger.debug(f" {detection_types=}") logger.debug(f" {file_structure_format=}") + logger.debug(f" {sqlite_path=}") self.rclone_destination = rclone_destination - self.retention = retention + self.retention = parse_rclone_retention(retention) self.rclone_args = rclone_args self.file_structure_format = file_structure_format @@ -262,8 +263,9 @@ class UnifiProtectBackup: self._download_queue: asyncio.Queue = asyncio.Queue() self._unsub: Callable[[], None] self.detection_types = detection_types - self._has_ffprobe = False + self._sqlite_path = sqlite_path + self._db = None async def start(self): """Bootstrap the backup process and kick off the main loop. @@ -271,114 +273,79 @@ class UnifiProtectBackup: You should run this to start the realtime backup of Unifi Protect clips as they are created """ - logger.info("Starting...") + try: + logger.info("Starting...") - # Ensure `rclone` is installed and properly configured - logger.info("Checking rclone configuration...") - await self._check_rclone() + # Ensure `rclone` is installed and properly configured + logger.info("Checking rclone configuration...") + await self._check_rclone() - # Check if `ffprobe` is available - ffprobe = shutil.which('ffprobe') - if ffprobe is not None: - logger.debug(f"ffprobe found: {ffprobe}") - self._has_ffprobe = True + # Start the pyunifiprotect connection by calling `update` + logger.info("Connecting to Unifi Protect...") + await self._protect.update() - # Start the pyunifiprotect connection by calling `update` - logger.info("Connecting to Unifi Protect...") - await self._protect.update() + # Get a mapping of camera ids -> names + logger.info("Found cameras:") + for camera in self._protect.bootstrap.cameras.values(): + logger.info(f" - {camera.id}: {camera.name}") - # Get a mapping of camera ids -> names - logger.info("Found cameras:") - for camera in self._protect.bootstrap.cameras.values(): - logger.info(f" - {camera.id}: {camera.name}") + tasks = [] - # Subscribe to the websocket - self._unsub = self._protect.subscribe_websocket(self._websocket_callback) + if not os.path.exists(self._sqlite_path): + logger.info("Database doesn't exist, creating a new one") + self._db = await create_database(self._sqlite_path) + else: + self._db = await aiosqlite.connect(self._sqlite_path) - # Set up a "purge" task to run at midnight each day to delete old recordings and empty directories - logger.info("Setting up purge task...") + event_queue = asyncio.Queue() - @aiocron.crontab("0 0 * * *") - async def rclone_purge_old(): - logger.info("Deleting old files...") - cmd = f'rclone delete -vv --min-age {self.retention} "{self.rclone_destination}"' - cmd += f' && rclone rmdirs -vv --leave-root "{self.rclone_destination}"' - proc = await asyncio.create_subprocess_shell( - cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + # Enable foreign keys in the database + await self._db.execute("PRAGMA foreign_keys = ON;") + + # Create downloader task + # This will download video files to its buffer + downloader = VideoDownloader(self._protect, event_queue) # TODO: Make buffer size configurable + tasks.append(asyncio.create_task(downloader.start())) + + # Create upload task + # This will upload the videos in the downloader's buffer to the rclone remotes and log it in the database + uploader = VideoUploader( + self._protect, + downloader.video_queue, + self.rclone_destination, + self.rclone_args, + self.file_structure_format, + self._db, ) - stdout, stderr = await proc.communicate() - if proc.returncode == 0: - logger.extra_debug(f"stdout:\n{stdout.decode()}") - logger.extra_debug(f"stderr:\n{stderr.decode()}") - logger.info("Successfully deleted old files") - else: - logger.warn("Failed to purge old files") - logger.warn(f"stdout:\n{stdout.decode()}") - logger.warn(f"stderr:\n{stderr.decode()}") + tasks.append(asyncio.create_task(uploader.start())) - # We need to catch websocket disconnect and trigger a reconnect. - @aiocron.crontab("* * * * *") - async def check_websocket_and_reconnect(): - logger.extra_debug("Checking the status of the websocket...") - if self._protect.check_ws(): - logger.extra_debug("Websocket is connected.") - else: - logger.warn("Lost connection to Unifi Protect.") + # Create event listener task + # This will connect to the unifi protect websocket and listen for events. When one is detected it will + # be added to the queue of events to download + event_listener = EventListener(event_queue, self._protect, self.detection_types, self.ignore_cameras) + tasks.append(asyncio.create_task(event_listener.start())) - # Unsubscribe, close the session. - self._unsub() + # Create purge task + # This will, every midnight, purge old backups from the rclone remotes and database + purge = Purge(self._db, self.retention, self.rclone_destination) + tasks.append(asyncio.create_task(purge.start())) + + # Create missing event task + # This will check all the events within the retention period, if any have been missed and not backed up + # they will be added to the event queue + missing = MissingEventChecker( + self._protect, self._db, event_queue, self.retention, self.detection_types, self.ignore_cameras + ) + tasks.append(asyncio.create_task(missing.start())) + + logger.info("Starting...") + await asyncio.gather(*tasks) + + except asyncio.CancelledError: + if self._protect is not None: await self._protect.close_session() - - while True: - logger.warn("Attempting reconnect...") - - try: - # Start again from scratch. In principle if Unifi - # Protect has not been restarted we should just be able - # to call self._protect.update() to reconnect to the - # websocket. However, if the server has been restarted a - # call to self._protect.check_ws() returns true and some - # seconds later pyunifiprotect detects the websocket as - # disconnected again. Therefore, kill it all and try - # again! - replacement_protect = ProtectApiClient( - self.address, - self.port, - self.username, - self.password, - verify_ssl=self.verify_ssl, - subscribed_models={ModelType.EVENT}, - ) - # Start the pyunifiprotect connection by calling `update` - await replacement_protect.update() - if replacement_protect.check_ws(): - self._protect = replacement_protect - self._unsub = self._protect.subscribe_websocket(self._websocket_callback) - break - else: - logger.warn("Unable to establish connection to Unifi Protect") - except Exception as e: - logger.warn("Unexpected exception occurred while trying to reconnect:") - logger.exception(e) - finally: - # If we get here we need to close the replacement session again - await replacement_protect.close_session() - - # Back off for a little while - await asyncio.sleep(10) - - logger.info("Re-established connection to Unifi Protect and to the websocket.") - - # Launches the main loop - logger.info("Listening for events...") - await self._backup_events() - - logger.info("Stopping...") - - # Unsubscribes from the websocket - self._unsub() + if self._db is not None: + await self._db.close() async def _check_rclone(self) -> None: """Check if rclone is installed and the specified remote is configured. @@ -393,258 +360,17 @@ class UnifiProtectBackup: raise RuntimeError("`rclone` is not installed on this system") logger.debug(f"rclone found: {rclone}") - cmd = "rclone listremotes -vv" - proc = await asyncio.create_subprocess_shell( - cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - stdout, stderr = await proc.communicate() - logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore - logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore - if proc.returncode != 0: - raise SubprocessException(stdout.decode(), stderr.decode(), proc.returncode) + returncode, stdout, stderr = await run_command("rclone listremotes -vv") + if returncode != 0: + raise SubprocessException(stdout, stderr, returncode) # Check if the destination is for a configured remote for line in stdout.splitlines(): - if self.rclone_destination.startswith(line.decode()): + if self.rclone_destination.startswith(line): break else: remote = self.rclone_destination.split(":")[0] raise ValueError(f"rclone does not have a remote called `{remote}`") - def _websocket_callback(self, msg: WSSubscriptionMessage) -> None: - """Callback for "EVENT" websocket messages. - - Filters the incoming events, and puts completed events onto the download queue - - Args: - msg (Event): Incoming event data - """ - logger.websocket_data(msg) # type: ignore - - # We are only interested in updates that end motion/smartdetection event - assert isinstance(msg.new_obj, Event) - if msg.action != WSAction.UPDATE: - return - if msg.new_obj.camera_id in self.ignore_cameras: - return - if msg.new_obj.end is None: - return - if msg.new_obj.type not in [EventType.MOTION, EventType.SMART_DETECT, EventType.RING]: - return - if msg.new_obj.type is EventType.MOTION and "motion" not in self.detection_types: - logger.extra_debug(f"Skipping unwanted motion detection event: {msg.new_obj.id}") # type: ignore - return - if msg.new_obj.type is EventType.RING and "ring" not in self.detection_types: - logger.extra_debug(f"Skipping unwanted ring event: {msg.new_obj.id}") # type: ignore - return - elif msg.new_obj.type is EventType.SMART_DETECT: - for event_smart_detection_type in msg.new_obj.smart_detect_types: - if event_smart_detection_type not in self.detection_types: - logger.extra_debug( # type: ignore - f"Skipping unwanted {event_smart_detection_type} detection event: {msg.new_obj.id}" - ) - return - - self._download_queue.put_nowait(msg.new_obj) - logger.debug(f"Adding event {msg.new_obj.id} to queue (Current queue={self._download_queue.qsize()})") - - async def _backup_events(self) -> None: - """Main loop for backing up events. - - Waits for an event in the queue, then downloads the corresponding clip and uploads it using rclone. - If errors occur it will simply log the errors and wait for the next event. In a future release, - retries will be added. - - """ - while True: - try: - event = await self._download_queue.get() - - # Fix timezones since pyunifiprotect sets all timestamps to UTC. Instead localize them to - # the timezone of the unifi protect NVR. - event.start = event.start.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone) - event.end = event.end.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone) - - logger.info(f"Backing up event: {event.id}") - logger.debug(f"Remaining Queue: {self._download_queue.qsize()}") - logger.debug(f" Camera: {await self._get_camera_name(event.camera_id)}") - if event.type == EventType.SMART_DETECT: - logger.debug(f" Type: {event.type} ({', '.join(event.smart_detect_types)})") - else: - logger.debug(f" Type: {event.type}") - logger.debug(f" Start: {event.start.strftime('%Y-%m-%dT%H-%M-%S')} ({event.start.timestamp()})") - logger.debug(f" End: {event.end.strftime('%Y-%m-%dT%H-%M-%S')} ({event.end.timestamp()})") - duration = (event.end - event.start).total_seconds() - logger.debug(f" Duration: {duration}") - - # Unifi protect does not return full video clips if the clip is requested too soon. - # There are two issues at play here: - # - Protect will only cut a clip on an keyframe which happen every 5s - # - Protect's pipeline needs a finite amount of time to make a clip available - # So we will wait 1.5x the keyframe interval to ensure that there is always ample video - # stored and Protect can return a full clip (which should be at least the length requested, - # but often longer) - time_since_event_ended = datetime.utcnow().replace(tzinfo=timezone.utc) - event.end - sleep_time = (timedelta(seconds=5 * 1.5) - time_since_event_ended).total_seconds() - if sleep_time > 0: - logger.debug(f" Sleeping ({sleep_time}s) to ensure clip is ready to download...") - await asyncio.sleep(sleep_time) - - # Download video - logger.debug(" Downloading video...") - for x in range(5): - try: - video = await self._protect.get_camera_video(event.camera_id, event.start, event.end) - assert isinstance(video, bytes) - break - except (AssertionError, ClientPayloadError, TimeoutError) as e: - logger.warn(f" Failed download attempt {x+1}, retying in 1s") - logger.exception(e) - await asyncio.sleep(1) - else: - logger.warn(f"Download failed after 5 attempts, abandoning event {event.id}:") - continue - - destination = await self.generate_file_path(event) - - # Get the actual length of the downloaded video using ffprobe - if self._has_ffprobe: - try: - downloaded_duration = await self._get_video_length(video) - msg = ( - f" Downloaded video length: {downloaded_duration:.3f}s" - f"({downloaded_duration - duration:+.3f}s)" - ) - if downloaded_duration < duration: - logger.warning(msg) - else: - logger.debug(msg) - except SubprocessException as e: - logger.warn(" `ffprobe` failed") - logger.exception(e) - - # Upload video - logger.debug(" Uploading video via rclone...") - logger.debug(f" To: {destination}") - logger.debug(f" Size: {human_readable_size(len(video))}") - for x in range(5): - try: - await self._upload_video(video, destination, self.rclone_args) - break - except SubprocessException as e: - logger.warn(f" Failed upload attempt {x+1}, retying in 1s") - logger.exception(e) - await asyncio.sleep(1) - else: - logger.warn(f"Upload failed after 5 attempts, abandoning event {event.id}:") - continue - - logger.info("Backed up successfully!") - - except Exception as e: - logger.warn(f"Unexpected exception occurred, abandoning event {event.id}:") - logger.exception(e) - - async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str): - """Upload video using rclone. - - In order to avoid writing to disk, the video file data is piped directly - to the rclone process and uploaded using the `rcat` function of rclone. - - Args: - video (bytes): The data to be written to the file - destination (pathlib.Path): Where rclone should write the file - rclone_args (str): Optional extra arguments to pass to `rclone` - - Raises: - RuntimeError: If rclone returns a non-zero exit code - """ - cmd = f'rclone rcat -vv {rclone_args} "{destination}"' - proc = await asyncio.create_subprocess_shell( - cmd, - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - stdout, stderr = await proc.communicate(video) - if proc.returncode == 0: - logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore - logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore - else: - raise SubprocessException(stdout.decode(), stderr.decode(), proc.returncode) - - async def _get_video_length(self, video: bytes) -> float: - cmd = 'ffprobe -v quiet -show_streams -select_streams v:0 -of json -' - proc = await asyncio.create_subprocess_shell( - cmd, - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - stdout, stderr = await proc.communicate(video) - if proc.returncode == 0: - logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore - logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore - - json_data = json.loads(stdout.decode()) - return float(json_data['streams'][0]['duration']) - - else: - raise SubprocessException(stdout.decode(), stderr.decode(), proc.returncode) - - async def generate_file_path(self, event: Event) -> pathlib.Path: - """Generates the rclone destination path for the provided event. - - Generates rclone destination path for the given even based upon the format string - in `self.file_structure_format`. - - Provides the following fields to the format string: - event: The `Event` object as per - https://github.com/briis/pyunifiprotect/blob/master/pyunifiprotect/data/nvr.py - duration_seconds: The duration of the event in seconds - detection_type: A nicely formatted list of the event detection type and the smart detection types (if any) - camera_name: The name of the camera that generated this event - - Args: - event: The event for which to create an output path - - Returns: - pathlib.Path: The rclone path the event should be backed up to - - """ - assert isinstance(event.camera_id, str) - assert isinstance(event.start, datetime) - assert isinstance(event.end, datetime) - - format_context = { - "event": event, - "duration_seconds": (event.end - event.start).total_seconds(), - "detection_type": f"{event.type} ({' '.join(event.smart_detect_types)})" - if event.smart_detect_types - else f"{event.type}", - "camera_name": await self._get_camera_name(event.camera_id), - } - - file_path = self.file_structure_format.format(**format_context) - file_path = re.sub(r'[^\w\-_\.\(\)/ ]', '', file_path) # Sanitize any invalid chars - - return pathlib.Path(f"{self.rclone_destination}/{file_path}") - - async def _get_camera_name(self, id: str): - try: - return self._protect.bootstrap.cameras[id].name - except KeyError: - # Refresh cameras - logger.debug(f"Unknown camera id: '{id}', checking API") - - try: - await self._protect.update(force=True) - except NvrError: - logger.debug(f"Unknown camera id: '{id}'") - raise - - name = self._protect.bootstrap.cameras[id].name - logger.debug(f"Found camera - {id}: {name}") - return name + # Ensure the base directory exists + await run_command(f"rclone mkdir -vv {self.rclone_destination}") diff --git a/unifi_protect_backup/uploader.py b/unifi_protect_backup/uploader.py new file mode 100644 index 0000000..87af349 --- /dev/null +++ b/unifi_protect_backup/uploader.py @@ -0,0 +1,146 @@ +import asyncio +import logging +import pathlib +import re +from datetime import datetime + +import aiosqlite +from pyunifiprotect.data.nvr import Event +from pyunifiprotect import ProtectApiClient + +from unifi_protect_backup.utils import get_camera_name, SubprocessException, VideoQueue + +logger = logging.getLogger(__name__) + + +class VideoUploader: + """Uploads videos from the video_queue to the provided rclone destination + + Keeps a log of what its uploaded in `db` + """ + + def __init__( + self, + protect: ProtectApiClient, + video_queue: VideoQueue, + rclone_destination: str, + rclone_args: str, + file_structure_format: str, + db: aiosqlite.Connection, + ): + self._protect: ProtectApiClient = protect + self._video_queue: VideoQueue = video_queue + self._rclone_destination: str = rclone_destination + self._rclone_args: str = rclone_args + self._file_structure_format: str = file_structure_format + self._db: aiosqlite.Connection = db + + async def start(self): + """Main loop + + Runs forever looking for video data in the video queue and then uploads it using rclone, finally it updates the database + """ + + logger.info("Starting Uploader") + while True: + try: + event, video = await self._video_queue.get() + logger.info(f"Uploading event: {event.id}") + logger.debug(f" Remaining Upload Queue: {self._video_queue.qsize_files()}") + + destination = await self._generate_file_path(event) + logger.debug(f" Destination: {destination}") + + await self._upload_video(video, destination, self._rclone_args) + await self._update_database(event, destination) + + logger.debug(f"Uploaded") + + except Exception as e: + logger.warn(f"Unexpected exception occurred, abandoning event {event.id}:") + logger.exception(e) + + async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str): + """Upload video using rclone. + + In order to avoid writing to disk, the video file data is piped directly + to the rclone process and uploaded using the `rcat` function of rclone. + + Args: + video (bytes): The data to be written to the file + destination (pathlib.Path): Where rclone should write the file + rclone_args (str): Optional extra arguments to pass to `rclone` + + Raises: + RuntimeError: If rclone returns a non-zero exit code + """ + cmd = f'rclone rcat -vv {rclone_args} "{destination}"' + proc = await asyncio.create_subprocess_shell( + cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await proc.communicate(video) + if proc.returncode == 0: + logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore + logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore + else: + raise SubprocessException(stdout.decode(), stderr.decode(), proc.returncode) + + async def _update_database(self, event: Event, destination: str): + """ + Add the backed up event to the database along with where it was backed up to + """ + await self._db.execute( + f"""INSERT INTO events VALUES + ('{event.id}', '{event.type}', '{event.camera_id}', '{event.start.timestamp()}', '{event.end.timestamp()}') + """ + ) + + remote, file_path = str(destination).split(":") + await self._db.execute( + f"""INSERT INTO backups VALUES + ('{event.id}', '{remote}', '{file_path}') + """ + ) + + await self._db.commit() + + async def _generate_file_path(self, event: Event) -> pathlib.Path: + """Generates the rclone destination path for the provided event. + + Generates rclone destination path for the given even based upon the format string + in `self.file_structure_format`. + + Provides the following fields to the format string: + event: The `Event` object as per + https://github.com/briis/pyunifiprotect/blob/master/pyunifiprotect/data/nvr.py + duration_seconds: The duration of the event in seconds + detection_type: A nicely formatted list of the event detection type and the smart detection types (if any) + camera_name: The name of the camera that generated this event + + Args: + event: The event for which to create an output path + + Returns: + pathlib.Path: The rclone path the event should be backed up to + + """ + assert isinstance(event.camera_id, str) + assert isinstance(event.start, datetime) + assert isinstance(event.end, datetime) + + format_context = { + "event": event, + "duration_seconds": (event.end - event.start).total_seconds(), + "detection_type": f"{event.type} ({' '.join(event.smart_detect_types)})" + if event.smart_detect_types + else f"{event.type}", + "camera_name": await get_camera_name(self._protect, event.camera_id), + } + + file_path = self._file_structure_format.format(**format_context) + file_path = re.sub(r'[^\w\-_\.\(\)/ ]', '', file_path) # Sanitize any invalid chars + + return pathlib.Path(f"{self._rclone_destination}/{file_path}") diff --git a/unifi_protect_backup/utils.py b/unifi_protect_backup/utils.py new file mode 100644 index 0000000..394331c --- /dev/null +++ b/unifi_protect_backup/utils.py @@ -0,0 +1,191 @@ +import logging +import re +import asyncio + +from dateutil.relativedelta import relativedelta + +from pyunifiprotect import ProtectApiClient + +logger = logging.getLogger(__name__) + + +def human_readable_size(num: float): + """Turns a number into a human readable number with ISO/IEC 80000 binary prefixes. + + Based on: https://stackoverflow.com/a/1094933 + + Args: + num (int): The number to be converted into human readable format + """ + for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]: + if abs(num) < 1024.0: + return f"{num:3.1f}{unit}" + num /= 1024.0 + raise ValueError("`num` too large, ran out of prefixes") + + +async def get_camera_name(protect: ProtectApiClient, id: str): + """ + Returns the name for the camera with the given ID + + If the camera ID is not know, it tries refreshing the cached data + """ + try: + return protect.bootstrap.cameras[id].name + except KeyError: + # Refresh cameras + logger.debug(f"Unknown camera id: '{id}', checking API") + + await protect.update(force=True) + + try: + name = protect.bootstrap.cameras[id].name + except KeyError: + logger.debug(f"Unknown camera id: '{id}'") + raise + + logger.debug(f"Found camera - {id}: {name}") + return name + + +class SubprocessException(Exception): + def __init__(self, stdout, stderr, returncode): + """Exception class for when rclone does not exit with `0`. + + Args: + stdout (str): What rclone output to stdout + stderr (str): What rclone output to stderr + returncode (str): The return code of the rclone process + """ + super().__init__() + self.stdout: str = stdout + self.stderr: str = stderr + self.returncode: int = returncode + + def __str__(self): + """Turns exception into a human readable form.""" + return f"Return Code: {self.returncode}\nStdout:\n{self.stdout}\nStderr:\n{self.stderr}" + + +def parse_rclone_retention(retention: str) -> relativedelta: + """ + Parses the rclone `retention` parameter into a relativedelta which can then be used + to calculate datetimes + """ + + matches = {k: int(v) for v, k in re.findall(r"([\d]+)(ms|s|m|h|d|w|M|y)", retention)} + return relativedelta( + microseconds=matches.get("ms", 0) * 1000, + seconds=matches.get("s", 0), + minutes=matches.get("m", 0), + hours=matches.get("h", 0), + days=matches.get("d", 0), + weeks=matches.get("w", 0), + months=matches.get("M", 0), + years=matches.get("Y", 0), + ) + + +async def run_command(cmd: str): + """ + Runs the given command returning the exit code, stdout and stderr + """ + proc = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await proc.communicate() + stdout = stdout.decode().replace('\n', '\n\t').strip() + stderr = stderr.decode().replace('\n', '\n\t').strip() + + if proc.returncode != 0: + logger.warn(f"Failed to run: '{cmd}") + logger.warn(f"stdout:\n{stdout}") + logger.warn(f"stderr:\n{stderr}") + else: + logger.extra_debug(f"stdout:\n{stdout}") + logger.extra_debug(f"stderr:\n{stderr}") + + return proc.returncode, stdout, stderr + + +class VideoQueue(asyncio.Queue): + """A queue that limits the number of bytes it can store rather than discrete entries""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._bytes_sum = 0 + + def qsize(self): + """Number of items in the queue.""" + return self._bytes_sum + + def qsize_files(self): + """Number of items in the queue.""" + return super().qsize() + + def _get(self): + data = self._queue.popleft() + self._bytes_sum -= len(data[1]) + return data + + def _put(self, item: bytes): + self._queue.append(item) + self._bytes_sum += len(item[1]) + + def full(self, item: bytes = None): + """Return True if there are maxsize bytes in the queue. + + optionally if `item` is provided, it will return False if there is enough space to + fit it, otherwise it will return True + + Note: if the Queue was initialized with maxsize=0 (the default), + then full() is never True. + """ + if self._maxsize <= 0: + return False + else: + if item is None: + return self.qsize() >= self._maxsize + else: + return self.qsize() + len(item[1]) >= self._maxsize + + async def put(self, item: bytes): + """Put an item into the queue. + + Put an item into the queue. If the queue is full, wait until a free + slot is available before adding item. + """ + while self.full(item): + putter = self._loop.create_future() + self._putters.append(putter) + try: + await putter + except: + putter.cancel() # Just in case putter is not done yet. + try: + # Clean self._putters from canceled putters. + self._putters.remove(putter) + except ValueError: + # The putter could be removed from self._putters by a + # previous get_nowait call. + pass + if not self.full(item) and not putter.cancelled(): + # We were woken up by get_nowait(), but can't take + # the call. Wake up the next in line. + self._wakeup_next(self._putters) + raise + return self.put_nowait(item) + + def put_nowait(self, item: bytes): + """Put an item into the queue without blocking. + + If no free slot is immediately available, raise QueueFull. + """ + if self.full(item): + raise asyncio.QueueFull + self._put(item) + self._unfinished_tasks += 1 + self._finished.clear() + self._wakeup_next(self._getters)