Compare commits

...

91 Commits

Author SHA1 Message Date
Sebastian Goscik
f2f1c49ae9 Bump version: 0.9.3 → 0.9.4 2023-07-29 11:32:32 +01:00
Sebastian Goscik
8786f2ceb0 Fixed time period parsing
Also updated link to rclone docs to be more direct to the format docs
2023-07-29 11:32:32 +01:00
Sebastian Goscik
1f2a48f95e Bump version: 0.9.2 → 0.9.3 2023-07-08 16:56:23 +01:00
Sebastian Goscik
5d2391e005 Remove Arm v7 docker builds
See: https://www.linuxserver.io/blog/a-farewell-to-arm-hf
2023-07-08 16:55:12 +01:00
Sebastian Goscik
c4e9a42c1a Block all calls to protect client when the connection is
dropped and we are awaiting a reconnect
2023-07-08 16:30:09 +01:00
Sebastian Goscik
6c719c0162 Cache camera names
so an active protect connection is not need to perform actions like
uploads which don't rely on protect.
2023-07-08 15:32:47 +01:00
Sebastian Goscik
498f72a09b Bump version: 0.9.1 → 0.9.2 2023-05-24 00:45:00 +01:00
Sebastian Goscik
d0080a569b Changelog 2023-05-24 00:44:54 +01:00
Sebastian Goscik
f89388327f Fix missing event checker not ignoring unwanted cameras 2023-05-22 23:22:41 +01:00
Sebastian Goscik
0a7eb92a36 Bump version: 0.9.0 → 0.9.1 2023-04-29 09:51:33 +01:00
Sebastian Goscik
694e9c6fde updated changelog 2023-04-29 09:50:55 +01:00
Sebastian Goscik
63fdea402d Linting fixes 2023-04-29 09:49:27 +01:00
Sebastian Goscik
f4c3c68f0d Fixed download failure counting
previously it would only count as a failure if the download "succeeded" but was None
2023-04-29 09:48:46 +01:00
Igor Wolbers
e5112de35c Add extra param to purge (#86)
* Added optional argument string to pass directly to the `rclone delete` command used to purge video files. This will allow for immediate deletion of files on destinations where the file might otherwise go to a recycle bin by default.

---------

Co-authored-by: Igor Wolbers <igor@sparcobv.onmicrosoft.com>
Co-authored-by: Sebastian Goscik <sebastian.goscik@live.co.uk>
2023-04-29 08:19:41 +00:00
Sebastian Goscik
1b38cb3db3 Fix typo in readme 2023-04-26 10:20:22 +01:00
Sebastian Goscik
237d7ceeb1 Merge pull request #83 from IgorWolbers/add-service-documentation 2023-04-03 11:14:23 +00:00
Sebastian Goscik
6b1066d31e Log when an error occurs trying to add a notifier 2023-04-02 23:15:47 +01:00
Sebastian Goscik
798139a182 Fix arm v7 build 2023-03-24 15:22:20 +00:00
Sebastian Goscik
9def99ff97 linter fixes 2023-03-24 15:06:10 +00:00
Igor Wolbers
8d3ee5bdfd Running Backup Tool as a Service (LINUX ONLY) 2023-03-24 08:56:01 -04:00
Igor Wolbers
c6584759d9 Fixed the docker run command example which
had a ` instead of a '. This caused the command to never
terminate whene executing.
2023-03-24 12:23:03 +00:00
Sebastian Goscik
b46c9485c8 Bump version: 0.8.8 → 0.9.0 2023-03-24 12:22:32 +00:00
Sebastian Goscik
561ce181ea changelog 2023-03-24 12:22:32 +00:00
Sebastian Goscik
cec323f803 Make download failure assertion more specific 2023-03-24 12:18:09 +00:00
Sebastian Goscik
89fe672693 Add ability to ignore events that keep failing 2023-03-24 12:17:44 +00:00
Sebastian Goscik
c55f50153f isort 2023-03-24 11:17:17 +00:00
Sebastian Goscik
144938f7e5 Fix error log when no notifiers are setup 2023-03-24 11:16:37 +00:00
Sebastian Goscik
782d126ae5 Add ability to skip missing events at launch 2023-03-24 01:02:58 +00:00
Sebastian Goscik
0d3395b74a Fix tasks being started prematurely 2023-03-24 00:50:42 +00:00
Sebastian Goscik
d9af6a03a5 fix isort induced circular import 2023-03-08 00:35:37 +00:00
Sebastian Goscik
48f743bc8e flake8 & mypy fixes 2023-03-08 00:03:26 +00:00
Sebastian Goscik
6121f74a80 remove pylint dependency 2023-03-07 00:53:07 +00:00
Sebastian Goscik
07c2278428 isort 2023-03-07 00:42:49 +00:00
Sebastian Goscik
1ff59773f1 Tidy poetry files 2023-03-07 00:41:49 +00:00
Sebastian Goscik
08f2674497 Stop apprise errors from preventing regular logging 2023-03-07 00:17:18 +00:00
Sebastian Goscik
818f2eb5b3 Reclassify log messages 2023-03-07 00:16:19 +00:00
Sebastian Goscik
dfdc85001c color logging no longer uses global variable 2023-03-07 00:16:19 +00:00
Sebastian Goscik
22d20c9905 Add star graph 2023-02-26 00:09:48 +00:00
Sebastian Goscik
86963fb0ff Add apprise env var to readme 2023-02-26 00:05:42 +00:00
Sebastian Goscik
93e8e1a812 Update poetry.lock 2023-02-26 00:00:36 +00:00
Sebastian Goscik
fb1f266eae Refactor logging customisations into custom handler 2023-02-26 00:00:25 +00:00
Sebastian Goscik
ce34afaf06 Add the ability to send logging output to apprise 2023-02-25 20:51:35 +00:00
Sebastian Goscik
6b60fac3c1 Log main loop exception
and allow time for other tasks to finish before closing the program
2023-02-25 20:51:18 +00:00
Sebastian Goscik
73022fddf1 Simplify exception logging 2023-02-25 20:51:18 +00:00
Sebastian Goscik
900d0d2881 Re-try connecting to unifi protect if initial connection fails 2023-02-25 20:51:18 +00:00
Sebastian Goscik
f7e43b8e95 Add notes about reducing disk wear 2023-02-25 12:26:15 +00:00
Sebastian Goscik
cf7229e05f Restructure readme 2023-02-25 12:18:22 +00:00
Sebastian Goscik
4798b3d269 fix module and package name clash 2023-01-16 13:20:29 +00:00
Sebastian Goscik
5b50b8144b remove stray print 2023-01-16 12:41:57 +00:00
Sebastian Goscik
965dde53f6 Merge pull request #70 from darron/main
Build for arm/v7, add makefile target, adjust Github Action.
2023-01-11 11:53:20 +00:00
Darron Froese
3677e4a86f Build for arm/v7, add makefile target, adjust Github Action. 2023-01-01 14:32:58 -07:00
Sebastian Goscik
3540ec1d04 Bump version: 0.8.7 → 0.8.8 2022-12-30 13:16:45 +00:00
Sebastian Goscik
8ed60aa925 Made purge interval configurable and default back to once a day 2022-12-30 13:14:31 +00:00
Sebastian Goscik
ca455ebcd0 Bump version: 0.8.6 → 0.8.7 2022-12-11 13:46:52 +00:00
Sebastian Goscik
16315ca23c Fix improper unpacking of upload events 2022-12-11 13:36:40 +00:00
Sebastian Goscik
ac0f6f5fcb Bump version: 0.8.5 → 0.8.6 2022-12-10 06:59:45 +00:00
Sebastian Goscik
0c34294b7e clear current event after upload/download 2022-12-10 06:44:56 +00:00
Sebastian Goscik
f195b8a4a4 Fix ignoring missing event before one has started downloading/uploading 2022-12-10 06:35:38 +00:00
Sebastian Goscik
645e339314 Bump version: 0.8.4 → 0.8.5 2022-12-09 23:20:05 +00:00
Sebastian Goscik
13c5b630d4 fix using event instead of event id in set to exclude missing events 2022-12-09 23:19:38 +00:00
Sebastian Goscik
44867e7427 Bump version: 0.8.3 → 0.8.4 2022-12-09 11:15:07 +00:00
Sebastian Goscik
0978798078 Fix uploading files not being accounted for when checking for missing events 2022-12-09 11:12:08 +00:00
Sebastian Goscik
8e3ea2b13f Log buffer size in human readable format 2022-12-09 11:12:08 +00:00
Sebastian Goscik
8a67311fda show default buffer size in command help 2022-12-08 12:40:32 +00:00
Sebastian Goscik
8aedb35c45 Update readme 2022-12-08 12:40:19 +00:00
Sebastian Goscik
4eed1c01c4 Bump version: 0.8.2 → 0.8.3 2022-12-08 12:22:47 +00:00
Sebastian Goscik
a4091699a1 Fix setting no verbosity for the docker container 2022-12-08 12:12:54 +00:00
Sebastian Goscik
58eb1fd8a7 Added event ID to uploader/downloader logging
Also fixed issue where logging outside of unifi_protect_backup was not adding colors
2022-12-08 12:04:36 +00:00
Sebastian Goscik
bba96e9d86 Make video download buffer size configurable 2022-12-08 00:15:11 +00:00
Sebastian Goscik
dd69a18dbf Raise an error when trying to add a video larger than the buffer 2022-12-08 00:14:08 +00:00
Sebastian Goscik
3510a50d0f remove unused asyncio loop 2022-12-07 23:25:41 +00:00
Sebastian Goscik
3e0044cd80 Make color logging optional
Returns to the previous default mode of plain logging but allows color logging to be enabled
2022-12-06 00:57:05 +00:00
Sebastian Goscik
1b3d196672 Add timezone info to debug log 2022-12-06 00:57:05 +00:00
Sebastian Goscik
c22819c04d Correct missing event logging for smart detections 2022-12-06 00:57:05 +00:00
Sebastian Goscik
ac66f4eaab Reduce log spam from missing events unless using extra_debug 2022-12-06 00:57:05 +00:00
Sebastian Goscik
34bc37bd0b Bump version: 0.8.1 → 0.8.2 2022-12-05 14:27:11 +00:00
Sebastian Goscik
f15cdf9a9b updated changelog 2022-12-05 14:27:06 +00:00
Sebastian Goscik
63d368f14c Added note to readme about 0.8 docker changes 2022-12-05 14:24:27 +00:00
Sebastian Goscik
ee01edf55c Make sure config directories exist in the container 2022-12-05 14:04:43 +00:00
Sebastian Goscik
4e10e0f10e Use run_command in downloader and uploader 2022-12-05 14:03:59 +00:00
Sebastian Goscik
385f115eab Add ability for run_command to pass data to stdin 2022-12-05 14:03:23 +00:00
Sebastian Goscik
b4062d3b53 Fix issue where indented stdout/stderr was being returned
The indentation was supposed to be only for the logging to make it easier to read but was also being returned, thus breaking parsing of the command output

Fixes #60
2022-12-05 13:40:32 +00:00
Sebastian Goscik
7bfcb548e2 Bump version: 0.8.0 → 0.8.1 2022-12-04 12:04:15 +00:00
Sebastian Goscik
a74e4b042d changelog 2022-12-04 12:03:57 +00:00
Sebastian Goscik
2c5308aa20 updated name in pyproject.toml 2022-12-04 12:03:54 +00:00
Sebastian Goscik
9d375d4e7b update bumpversion cfg to use new tar.gz name 2022-12-04 11:59:36 +00:00
Sebastian Goscik
df4390688b Update docs and dockerfile to save events database 2022-12-03 22:40:40 +00:00
Sebastian Goscik
3acfd1f543 Fix dockerfile - to _
I have no idea how this worked before but not now
2022-12-03 22:04:50 +00:00
Sebastian Goscik
49c11c1872 Make ci show all temp files 2022-12-03 22:00:22 +00:00
Sebastian Goscik
93cf297371 Bump version: 0.7.4 → 0.8.0 2022-12-03 21:54:45 +00:00
Sebastian Goscik
8baa413a23 Merge pull request #57 from ep1cman/restructure
Major Restructure
2022-12-03 21:51:20 +00:00
20 changed files with 3147 additions and 2219 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.7.4
current_version = 0.9.4
commit = True
tag = True
@@ -12,5 +12,5 @@ search = __version__ = '{current_version}'
replace = __version__ = '{new_version}'
[bumpversion:file:Dockerfile]
search = COPY dist/unifi-protect-backup-{current_version}.tar.gz sdist.tar.gz
replace = COPY dist/unifi-protect-backup-{new_version}.tar.gz sdist.tar.gz
search = COPY dist/unifi_protect_backup-{current_version}.tar.gz sdist.tar.gz
replace = COPY dist/unifi_protect_backup-{new_version}.tar.gz sdist.tar.gz

View File

@@ -52,7 +52,7 @@ jobs:
- name: show temporary files
run: >-
ls -l
ls -lR
- name: Set up QEMU
uses: docker/setup-qemu-action@v1

View File

@@ -4,6 +4,81 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.9.4] - 2023-07-29
### Fixed
- Time period parsing, 'Y' -> 'y'
## [0.9.3] - 2023-07-08
### Fixed
- Queued up downloads etc now wait for dropped connections to be re-established.
## [0.9.2] - 2023-04-21
### Fixed
- Missing event checker ignoring the "ignored cameras" list
## [0.9.1] - 2023-04-21
### Added
- Added optional argument string to pass directly to the `rclone delete` command used to purge video files
### Fixed
- Fixed download errors not counting as failures
## [0.9.0] - 2023-03-24
### Added
- The ability to send logging out via apprise notifications
- Color logging is now optional
- Events are now permanently ignored if they fail to download 10 times
## [0.8.8] - 2022-12-30
### Added
- Added ability to configure purge interval
### Fixed
- Purge interval returned to previous default of once a day
## [0.8.7] - 2022-12-11
### Fixed
- Fix improper unpacking of upload events
## [0.8.6] - 2022-12-10
### Fixed
- check that current event is not none before trying to get it its ID
- downloader/uploaded clear their current event once its been processed
## [0.8.5] - 2022-12-09
### Fixed
- use event ID of currently up/downloading event, not whole event object when checking missing events
## [0.8.4] - 2022-12-09
### Added
- Logging of remaining upload queue size
### Fixed
- Uploading files were not accounted for when checking for missing events
- Buffer size parameter is logged in human-readable format
## [0.8.3] - 2022-12-08
### Added
- Now logs time zone settings for both the host and NVR
- Color logging is now optional and defaults to disabled (to match previous behavior before v0.8.0)
- Ability to configure download buffer size (bumped default up to 512MiB)
- Event IDs to upload/download logging
### Fixed
- Log spam when lots of events are missing, this will now only occur if the logging level is set to `EXTRA_DEBUG` (-vv)
- corrected logging not showing smart detection types
- The application no longer stalls when a video is downloaded larger than the available buffer size
- Ability to set the least verbose logging for the docker container
## [0.8.2] - 2022-12-05
### Fixed
- Fixed issue where command output was being returned with added indentation intended for logging only
- Fixed issue where some command logging was not indented
- Fixed issue where the tool could crash when run in a container if /config/database didn't exist
## [0.8.1] - 2022-12-04
version 0.8.0 was used by accident previously and PyPI would not accept it so bumping by one patch version
## [0.8.0] - 2022-12-03
Major internal refactoring. Each task is now its own class and asyncio task.

View File

@@ -1,6 +1,5 @@
# To build run:
# $ poetry build
# $ docker build -t ghcr.io/ep1cman/unifi-protect-backup .
# make docker
FROM ghcr.io/linuxserver/baseimage-alpine:3.16
@@ -8,7 +7,10 @@ LABEL maintainer="ep1cman"
WORKDIR /app
COPY dist/unifi-protect-backup-0.7.4.tar.gz sdist.tar.gz
COPY dist/unifi_protect_backup-0.9.4.tar.gz sdist.tar.gz
# https://github.com/rust-lang/cargo/issues/2808
ENV CARGO_NET_GIT_FETCH_WITH_CLI=true
RUN \
echo "**** install build packages ****" && \
@@ -18,7 +20,8 @@ RUN \
jpeg-dev \
zlib-dev \
python3-dev \
cargo && \
cargo \
git && \
echo "**** install packages ****" && \
apk add --no-cache \
rclone \
@@ -45,8 +48,11 @@ ENV RCLONE_DESTINATION=local:/data
ENV VERBOSITY="v"
ENV TZ=UTC
ENV IGNORE_CAMERAS=""
ENV SQLITE_PATH=/config/database/events.sqlite
COPY docker_root/ /
RUN mkdir -p /config/database /config/rclone
VOLUME [ "/config" ]
VOLUME [ "/data" ]

300
README.md
View File

@@ -32,15 +32,9 @@ retention period.
- Unifi Protect version 1.20 or higher (as per [`pyunifiprotect`](https://github.com/briis/pyunifiprotect))
- `rclone` installed with at least one remote configured.
## Installation
# Setup
1. Install `rclone`. Instructions for your platform can be found here: https://rclone.org/install/#quickstart
2. Configure the `rclone` remote you want to backup to. Instructions can be found here: https://rclone.org/docs/#configure
3. `pip install unifi-protect-backup`
4. Optional: Install `ffprobe` so that `unifi-protect-backup` can check the length of the clips it downloads
### Account Setup
## Unifi Protect Account Setup
In order to connect to your unifi protect instance, you will first need to setup a local admin account:
* Login to your *Local Portal* on your UniFiOS device, and click on *Users*
@@ -52,8 +46,64 @@ In order to connect to your unifi protect instance, you will first need to setup
* Click *Add* in at the bottom Right.
* Select the newly created user in the list, and navigate to the `Assignments` tab in the left-hand pane, and ensure all cameras are ticked.
## Installation
## Usage
*The prefered way to run this tool is using a container*
### Docker Container
You can run this tool as a container if you prefer with the following command.
Remember to change the variable to make your setup.
> **Note**
> As of version 0.8.0, the event database needs to be persisted for the tool to function properly
> please see the updated commands below
#### Backing up locally:
By default, if no rclone config is provided clips will be backed up to `/data`.
```
docker run \
-e UFP_USERNAME='USERNAME' \
-e UFP_PASSWORD='PASSWORD' \
-e UFP_ADDRESS='UNIFI_PROTECT_IP' \
-e UFP_SSL_VERIFY='false' \
-v '/path/to/save/clips':'/data' \
-v '/path/to/save/database':/config/database/ \
ghcr.io/ep1cman/unifi-protect-backup
```
#### Backing up to cloud storage:
In order to backup to cloud storage you need to provide a `rclone.conf` file.
If you do not already have a `rclone.conf` file you can create one as follows:
```
$ docker run -it --rm -v $PWD:/root/.config/rclone --entrypoint rclone ghcr.io/ep1cman/unifi-protect-backup config
```
Follow the interactive configuration process, this will create a `rclone.conf`
file in your current directory.
Finally, start the container:
```
docker run \
-e UFP_USERNAME='USERNAME' \
-e UFP_PASSWORD='PASSWORD' \
-e UFP_ADDRESS='UNIFI_PROTECT_IP' \
-e UFP_SSL_VERIFY='false' \
-e RCLONE_DESTINATION='my_remote:/unifi_protect_backup' \
-v '/path/to/save/clips':'/data' \
-v '/path/to/rclone.conf':'/config/rclone/rclone.conf' \
-v '/path/to/save/database':/config/database/ \
ghcr.io/ep1cman/unifi-protect-backup
```
### Installing on host:
1. Install `rclone`. Instructions for your platform can be found here: https://rclone.org/install/#quickstart
2. Configure the `rclone` remote you want to backup to. Instructions can be found here: https://rclone.org/docs/#configure
3. `pip install unifi-protect-backup`
4. Optional: Install `ffprobe` so that `unifi-protect-backup` can check the length of the clips it downloads
# Usage
```
Usage: unifi-protect-backup [OPTIONS]
@@ -62,69 +112,83 @@ Usage: unifi-protect-backup [OPTIONS]
Options:
--version Show the version and exit.
--address TEXT Address of Unifi Protect instance
[required]
--port INTEGER Port of Unifi Protect instance [default:
443]
--username TEXT Username to login to Unifi Protect instance
[required]
--address TEXT Address of Unifi Protect instance [required]
--port INTEGER Port of Unifi Protect instance [default: 443]
--username TEXT Username to login to Unifi Protect instance [required]
--password TEXT Password for Unifi Protect user [required]
--verify-ssl / --no-verify-ssl Set if you do not have a valid HTTPS
Certificate for your instance [default:
verify-ssl]
--rclone-destination TEXT `rclone` destination path in the format
{rclone remote}:{path on remote}. E.g.
`gdrive:/backups/unifi_protect` [required]
--retention TEXT How long should event clips be backed up
for. Format as per the `--max-age` argument
of `rclone`
(https://rclone.org/filtering/#max-age-don-
t-transfer-any-file-older-than-this)
[default: 7d]
--rclone-args TEXT Optional extra arguments to pass to `rclone
rcat` directly. Common usage for this would
be to set a bandwidth limit, for example.
--detection-types TEXT A comma separated list of which types of
detections to backup. Valid options are:
`motion`, `person`, `vehicle`, `ring`
--verify-ssl / --no-verify-ssl Set if you do not have a valid HTTPS Certificate for your
instance [default: verify-ssl]
--rclone-destination TEXT `rclone` destination path in the format {rclone remote}:{path on
remote}. E.g. `gdrive:/backups/unifi_protect` [required]
--retention TEXT How long should event clips be backed up for. Format as per the
`--max-age` argument of `rclone`
(https://rclone.org/filtering/#max-age-don-t-transfer-any-file-
older-than-this) [default: 7d]
--rclone-args TEXT Optional extra arguments to pass to `rclone rcat` directly.
Common usage for this would be to set a bandwidth limit, for
example.
--rclone-purge-args TEXT Optional extra arguments to pass to `rclone delete` directly.
Common usage for this would be to execute a permanent delete
instead of using the recycle bin on a destination.
Google Drive example: `--drive-use-trash=false`
--detection-types TEXT A comma separated list of which types of detections to backup.
Valid options are: `motion`, `person`, `vehicle`, `ring`
[default: motion,person,vehicle,ring]
--ignore-camera TEXT IDs of cameras for which events should not
be backed up. Use multiple times to ignore
multiple IDs. If being set as an environment
variable the IDs should be separated by
whitespace.
--file-structure-format TEXT A Python format string used to generate the
file structure/name on the rclone remote.For
details of the fields available, see the
projects `README.md` file. [default: {camer
a_name}/{event.start:%Y-%m-%d}/{event.end:%Y
-%m-%dT%H-%M-%S} {detection_type}.mp4]
--ignore-camera TEXT IDs of cameras for which events should not be backed up. Use
multiple times to ignore multiple IDs. If being set as an
environment variable the IDs should be separated by whitespace.
--file-structure-format TEXT A Python format string used to generate the file structure/name
on the rclone remote.For details of the fields available, see
the projects `README.md` file. [default: {camera_name}/{event.s
tart:%Y-%m-%d}/{event.end:%Y-%m-%dT%H-%M-%S}
{detection_type}.mp4]
-v, --verbose How verbose the logging output should be.
None: Only log info messages created by
`unifi-protect-backup`, and all warnings
None: Only log info messages created by `unifi-protect-
backup`, and all warnings
-v: Only log info & debug messages
created by `unifi-protect-backup`, and
all warnings
-v: Only log info & debug messages created by `unifi-
protect-backup`, and all warnings
-vv: Log info & debug messages created
by `unifi-protect-backup`, command
output, and all warnings
-vv: Log info & debug messages created by `unifi-protect-
backup`, command output, and all warnings
-vvv Log debug messages created by
`unifi-protect-backup`, command output,
all info messages, and all warnings
-vvv Log debug messages created by `unifi-protect-backup`,
command output, all info messages, and all warnings
-vvvv: Log debug messages created by
`unifi-protect-backup` command output,
all info messages, all warnings, and
-vvvv: Log debug messages created by `unifi-protect-backup`
command output, all info messages, all warnings, and
websocket data
-vvvvv: Log websocket data, command
output, all debug messages, all info
messages and all warnings [x>=0]
-vvvvv: Log websocket data, command output, all debug
messages, all info messages and all warnings [x>=0]
--sqlite_path TEXT Path to the SQLite database to use/create
--color-logging / --plain-logging
Set if you want to use color in logging output [default: plain-
logging]
--download-buffer-size TEXT How big the download buffer should be (you can use suffixes like
"B", "KiB", "MiB", "GiB") [default: 512MiB]
--purge_interval TEXT How frequently to check for file to purge.
NOTE: Can create a lot of API calls, so be careful if your cloud
provider charges you per api call [default: 1d]
--apprise-notifier TEXT Apprise URL for sending notifications.
E.g: ERROR,WARNING=tgram://[BOT KEY]/[CHAT ID]
You can use this parameter multiple times to use more than one
notification platform.
The following notification tags are available (corresponding to
the respective logging levels):
ERROR, WARNING, INFO, DEBUG, EXTRA_DEBUG, WEBSOCKET_DATA
If no tags are specified, it defaults to ERROR
More details about supported platforms can be found here:
https://github.com/caronc/apprise
--skip-missing If set, events which are 'missing' at the start will be ignored.
Subsequent missing events will be downloaded (e.g. a missed event) [default: False]
--help Show this message and exit.
```
@@ -138,10 +202,16 @@ always take priority over environment variables):
- `RCLONE_RETENTION`
- `RCLONE_DESTINATION`
- `RCLONE_ARGS`
- `RCLONE_PURGE_ARGS`
- `IGNORE_CAMERAS`
- `DETECTION_TYPES`
- `FILE_STRUCTURE_FORMAT`
- `SQLITE_PATH`
- `DOWNLOAD_BUFFER_SIZE`
- `COLOR_LOGGING`
- `PURGE_INTERVAL`
- `APPRISE_NOTIFIERS`
- `SKIP_MISSING`
## File path formatting
@@ -160,49 +230,80 @@ The following fields are provided to the format string:
You can optionally format the `event.start`/`event.end` timestamps as per the [`strftime` format](https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior) by appending it after a `:` e.g to get just the date without the time: `{event.start:%Y-%m-%d}`
## Skipping initially missing events
If you prefer to avoid backing up the entire backlog of events, and would instead prefer to back up events that occur from
now on, you can use the `--skip-missing` flag. This does not enable the periodic check for missing event (e.g. one that was missed by a disconnection) but instead marks all missing events at start-up as backed up.
## Docker Container
You can run this tool as a container if you prefer with the following command.
Remember to change the variable to make your setup.
If you use this feature it is advised that your run the tool once with this flag, then stop it once the database has been created and the events are ignored. Keeping this flag set permanently could cause events to be missed if the tool crashes and is restarted etc.
# A note about `rclone` backends and disk wear
This tool attempts to not write the downloaded files to disk to minimise disk wear, and instead streams them directly to
rclone. Sadly, not all storage backends supported by `rclone` allow "Stream Uploads". Please refer to the `StreamUpload` column on this table to see which one do and don't: https://rclone.org/overview/#optional-features
### Backing up locally
By default, if no rclone config is provided clips will be backed up to `/data`.
If you are using a storage medium with poor write durability e.g. an SD card on a Raspberry Pi, it is advised to avoid
such backends.
If you are running on a linux host you can setup `rclone` to use `tmpfs` (which is in RAM) to store its temp files, but this will significantly increase memory usage of the tool.
### Running Docker Container (LINUX ONLY)
Add the following arguments to your docker run command:
```
docker run \
-e UFP_USERNAME='USERNAME' \
-e UFP_PASSWORD='PASSWORD' \
-e UFP_ADDRESS='UNIFI_PROTECT_IP' \
-e UFP_SSL_VERIFY='false' \
-v '/path/to/save/clips':'/data' \
ghcr.io/ep1cman/unifi-protect-backup
-e RCLONE_ARGS='--temp-dir=/rclone_tmp'
--tmpfs /rclone_tmp
```
### Backing up to cloud storage
In order to backup to cloud storage you need to provide a `rclone.conf` file.
If you do not already have a `rclone.conf` file you can create one as follows:
### Running Directly (LINUX ONLY)
```
$ docker run -it --rm -v $PWD:/root/.config/rclone --entrypoint rclone ghcr.io/ep1cman/unifi-protect-backup config
```
Follow the interactive configuration proceed, this will create a `rclone.conf`
file in your current directory.
Finally, start the container:
```
docker run \
-e UFP_USERNAME='USERNAME' \
-e UFP_PASSWORD='PASSWORD' \
-e UFP_ADDRESS='UNIFI_PROTECT_IP' \
-e UFP_SSL_VERIFY='false' \
-e RCLONE_DESTINATION='my_remote:/unifi_protect_backup' \
-v '/path/to/save/clips':'/data' \
-v `/path/to/rclone.conf':'/config/rclone/rclone.conf'
ghcr.io/ep1cman/unifi-protect-backup
sudo mkdir /mnt/tmpfs
sudo mount -o size=1G -t tmpfs none /mnt/tmpfs
$ unifi-protect-backup --rclone-args "--temp-dir=/mnt/tmpfs"
```
### Debugging
To make this persist reboots add the following to `/etc/fstab`:
```
tmpfs /mnt/tmpfs tmpfs nosuid,nodev,noatime 0 0
```
# Running Backup Tool as a Service (LINUX ONLY)
You can create a service that will run the docker or local version of this backup tool. The service can be configured to launch on boot. This is likely the preferred way you want to execute the tool once you have it completely configured and tested so it is continiously running.
First create a service configuration file. You can replace `protectbackup` in the filename below with the name you wish to use for your service, if you change it remember to change the other locations in the following scripts as well.
```
sudo nano /lib/systemd/system/protectbackup.service
```
Next edit the content and fill in the 4 placeholders indicated by {}, replace these placeholders (including the leading `{` and trailing `}` characters) with the values you are using.
```
[Unit]
Description=Unifi Protect Backup
[Service]
User={your machine username}
Group={your machine user group, could be the same as the username}
Restart=on-abort
WorkingDirectory=/home/{your machine username}
ExecStart={put your complete docker or local command here}
[Install]
WantedBy=multi-user.target
```
Now enable the service and then start the service.
```
sudo systemctl enable protectbackup.service
sudo systemctl start protectbackup.service
```
To check the status of the service use this command.
```
sudo systemctl status protectbackup.service --no-pager
```
# Debugging
If you need to debug your rclone setup, you can invoke rclone directly like so:
@@ -227,8 +328,17 @@ docker run \
listremotes
```
## Credits
# Credits
- All the contributors who have helped make this project:
<a href="https://github.com/ep1cman/unifi-protect-backup/graphs/contributors">
<img src="https://contrib.rocks/image?repo=ep1cman/unifi-protect-backup" />
</a>
- Heavily utilises [`pyunifiprotect`](https://github.com/briis/pyunifiprotect) by [@briis](https://github.com/briis/)
- All the cloud functionality is provided by [`rclone`](https://rclone.org/)
- This package was created with [Cookiecutter](https://github.com/audreyr/cookiecutter) and the [waynerv/cookiecutter-pypackage](https://github.com/waynerv/cookiecutter-pypackage) project template.
# Star History
[![Star History Chart](https://api.star-history.com/svg?repos=ep1cman/unifi-protect-backup&type=Date)](https://star-history.com/#ep1cman/unifi-protect-backup&Date)

View File

@@ -2,5 +2,8 @@
export RCLONE_CONFIG=/config/rclone/rclone.conf
echo $VERBOSITY
[[ -n "$VERBOSITY" ]] && export VERBOSITY_ARG=-$VERBOSITY || export VERBOSITY_ARG=""
exec \
s6-setuidgid abc unifi-protect-backup -${VERBOSITY}
s6-setuidgid abc unifi-protect-backup ${VERBOSITY_ARG}

View File

@@ -1,4 +1,6 @@
sources = unifi_protect_backup
container_name ?= ghcr.io/ep1cman/unifi-protect-backup
container_arches ?= linux/amd64,linux/arm64
.PHONY: test format lint unittest coverage pre-commit clean
test: format lint unittest
@@ -25,3 +27,7 @@ clean:
rm -rf *.egg-info
rm -rf .tox dist site
rm -rf coverage.xml .coverage
docker:
poetry build
docker buildx build . --platform $(container_arches) -t $(container_name) --push

3656
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,15 +1,15 @@
[tool]
[tool.poetry]
name = "unifi-protect-backup"
version = "0.7.4"
name = "unifi_protect_backup"
version = "0.9.4"
homepage = "https://github.com/ep1cman/unifi-protect-backup"
description = "Python tool to backup unifi event clips in realtime."
authors = ["sebastian.goscik <sebastian@goscik.com>"]
readme = "README.md"
license = "MIT"
classifiers=[
'Development Status :: 2 - Pre-Alpha',
'Intended Audience :: Developers',
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Information Technology',
'License :: OSI Approved :: MIT License',
'Natural Language :: English',
'Programming Language :: Python :: 3',
@@ -23,45 +23,39 @@ packages = [
[tool.poetry.dependencies]
python = ">=3.9.0,<4.0"
click = "8.0.1"
black = { version = "^22.10.0", optional = true}
isort = { version = "^5.8.0", optional = true}
flake8 = { version = "^3.9.2", optional = true}
flake8-docstrings = { version = "^1.6.0", optional = true }
mypy = {version = "^0.900", optional = true}
pytest = { version = "^6.2.4", optional = true}
pytest-cov = { version = "^2.12.0", optional = true}
tox = { version = "^3.20.1", optional = true}
virtualenv = { version = "^20.2.2", optional = true}
pip = { version = "^20.3.1", optional = true}
twine = { version = "^3.3.0", optional = true}
pre-commit = {version = "^2.12.0", optional = true}
toml = {version = "^0.10.2", optional = true}
bump2version = {version = "^1.0.1", optional = true}
tox-asdf = {version = "^0.1.0", optional = true}
pyunifiprotect = "^4.0.11"
ipdb = {version = "^0.13.9", optional = true}
types-pytz = {version = "^2021.3.5", optional = true}
types-cryptography = {version = "^3.3.18", optional = true}
aiorun = "^2022.11.1"
aiosqlite = "^0.17.0"
python-dateutil = "^2.8.2"
aiorun = "^2022.11.1"
pylint = {version = "^2.15.6", extras = ["dev"]}
apprise = "^1.3.0"
expiring-dict = "^1.1.0"
async-lru = "^2.0.3"
[tool.poetry.extras]
test = [
"pytest",
"black",
"isort",
"mypy",
"flake8",
"flake8-docstrings",
"pytest-cov",
"types-pytz",
"types-cryptography"
]
[tool.poetry.group.dev]
optional = true
dev = ["tox", "pre-commit", "virtualenv", "pip", "twine", "toml", "bump2version", "tox-asdf", "ipdb"]
[tool.poetry.group.dev.dependencies]
black = "^22.10.0"
isort = "^5.8.0"
flake8 = "^3.9.2"
flake8-docstrings = "^1.6.0"
virtualenv = "^20.2.2"
mypy = "^0.900"
types-pytz = "^2021.3.5"
types-cryptography = "^3.3.18"
twine = "^3.3.0"
bump2version = "^1.0.1"
pre-commit = "^2.12.0"
types-python-dateutil = "^2.8.19.10"
[tool.poetry.group.test]
optional = true
[tool.poetry.group.test.dependencies]
pytest = "^6.2.4"
pytest-cov = "^2.12.0"
tox = "^3.20.1"
tox-asdf = "^0.1.0"
[tool.poetry.scripts]
unifi-protect-backup = 'unifi_protect_backup.cli:main'
@@ -97,6 +91,9 @@ skip_gitignore = true
# you can skip files as below
#skip_glob = docs/conf.py
[tool.mypy]
allow_redefinition=true
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python
"""Tests for `unifi_protect_backup` package."""
import pytest
import pytest # type: ignore
# from click.testing import CliRunner

View File

@@ -2,11 +2,11 @@
__author__ = """sebastian.goscik"""
__email__ = 'sebastian@goscik.com'
__version__ = '0.7.4'
__version__ = '0.9.4'
# from .unifi_protect_backup import UnifiProtectBackup
from .downloader import VideoDownloader
from .uploader import VideoUploader
from .event_listener import EventListener
from .purge import Purge
from .missing_event_checker import MissingEventChecker
from .uploader import VideoUploader
from .missing_event_checker import MissingEventChecker # isort: skip

View File

@@ -1,12 +1,11 @@
"""Console script for unifi_protect_backup."""
import asyncio
import click
from aiorun import run
from aiorun import run # type: ignore
from unifi_protect_backup import __version__
from unifi_protect_backup.unifi_protect_backup import UnifiProtectBackup
from unifi_protect_backup.unifi_protect_backup_core import UnifiProtectBackup
from unifi_protect_backup.utils import human_readable_to_float
DETECTION_TYPES = ["motion", "person", "vehicle", "ring"]
@@ -23,7 +22,7 @@ def _parse_detection_types(ctx, param, value):
return types
@click.command()
@click.command(context_settings=dict(max_content_width=100))
@click.version_option(__version__)
@click.option('--address', required=True, envvar='UFP_ADDRESS', help='Address of Unifi Protect instance')
@click.option('--port', default=443, envvar='UFP_PORT', show_default=True, help='Port of Unifi Protect instance')
@@ -48,8 +47,8 @@ def _parse_detection_types(ctx, param, value):
default='7d',
show_default=True,
envvar='RCLONE_RETENTION',
help="How long should event clips be backed up for. Format as per the `--max-age` argument of "
"`rclone` (https://rclone.org/filtering/#max-age-don-t-transfer-any-file-older-than-this)",
help="How long should event clips be backed up for. Format as per the `rclone1 time option format "
"(https://rclone.org/docs/#time-option)",
)
@click.option(
'--rclone-args',
@@ -58,6 +57,14 @@ def _parse_detection_types(ctx, param, value):
help="Optional extra arguments to pass to `rclone rcat` directly. Common usage for this would "
"be to set a bandwidth limit, for example.",
)
@click.option(
'--rclone-purge-args',
default='',
envvar='RCLONE_PURGE_ARGS',
help="Optional extra arguments to pass to `rclone delete` directly. Common usage for this would "
"be to execute a permanent delete instead of using the recycle bin on a destination. "
"Google Drive example: `--drive-use-trash=false`",
)
@click.option(
'--detection-types',
envvar='DETECTION_TYPES',
@@ -110,11 +117,63 @@ all warnings, and websocket data
envvar='SQLITE_PATH',
help="Path to the SQLite database to use/create",
)
@click.option(
'--color-logging/--plain-logging',
default=False,
show_default=True,
envvar='COLOR_LOGGING',
help="Set if you want to use color in logging output",
)
@click.option(
'--download-buffer-size',
default='512MiB',
show_default=True,
envvar='DOWNLOAD_BUFFER_SIZE',
help='How big the download buffer should be (you can use suffixes like "B", "KiB", "MiB", "GiB")',
callback=lambda ctx, param, value: human_readable_to_float(value),
)
@click.option(
'--purge_interval',
default='1d',
show_default=True,
envvar='PURGE_INTERVAL',
help="How frequently to check for file to purge.\n\nNOTE: Can create a lot of API calls, so be careful if "
"your cloud provider charges you per api call",
)
@click.option(
'--apprise-notifier',
'apprise_notifiers',
multiple=True,
envvar="APPRISE_NOTIFIERS",
help="""\b
Apprise URL for sending notifications.
E.g: ERROR,WARNING=tgram://[BOT KEY]/[CHAT ID]
You can use this parameter multiple times to use more than one notification platform.
The following notification tags are available (corresponding to the respective logging levels):
ERROR, WARNING, INFO, DEBUG, EXTRA_DEBUG, WEBSOCKET_DATA
If no tags are specified, it defaults to ERROR
More details about supported platforms can be found here: https://github.com/caronc/apprise""",
)
@click.option(
'--skip-missing',
default=False,
show_default=True,
is_flag=True,
envvar='SKIP_MISSING',
help="""\b
If set, events which are 'missing' at the start will be ignored.
Subsequent missing events will be downloaded (e.g. a missed event)
""",
)
def main(**kwargs):
"""A Python based tool for backing up Unifi Protect event clips as they occur."""
loop = asyncio.get_event_loop()
event_listener = UnifiProtectBackup(**kwargs)
run(event_listener.start())
run(event_listener.start(), stop_on_unhandled_errors=True)
if __name__ == "__main__":

View File

@@ -1,83 +1,113 @@
# noqa: D100
import asyncio
import json
import logging
import shutil
from datetime import datetime, timedelta, timezone
from typing import Optional
import aiosqlite
import pytz
from aiohttp.client_exceptions import ClientPayloadError
from expiring_dict import ExpiringDict # type: ignore
from pyunifiprotect import ProtectApiClient
from pyunifiprotect.data.nvr import Event
from pyunifiprotect.data.types import EventType
from unifi_protect_backup.utils import SubprocessException, VideoQueue, get_camera_name, human_readable_size
logger = logging.getLogger(__name__)
from unifi_protect_backup.utils import (
SubprocessException,
VideoQueue,
get_camera_name,
human_readable_size,
run_command,
setup_event_logger,
)
async def get_video_length(video: bytes) -> float:
"""Uses ffprobe to get the length of the video file passed in as a byte stream"""
cmd = 'ffprobe -v quiet -show_streams -select_streams v:0 -of json -'
proc = await asyncio.create_subprocess_shell(
cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
"""Uses ffprobe to get the length of the video file passed in as a byte stream."""
returncode, stdout, stderr = await run_command(
'ffprobe -v quiet -show_streams -select_streams v:0 -of json -', video
)
stdout, stderr = await proc.communicate(video)
if proc.returncode == 0:
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
json_data = json.loads(stdout.decode())
return float(json_data['streams'][0]['duration'])
if returncode != 0:
raise SubprocessException(stdout, stderr, returncode)
else:
raise SubprocessException(stdout.decode(), stderr.decode(), proc.returncode)
json_data = json.loads(stdout)
return float(json_data['streams'][0]['duration'])
class VideoDownloader:
"""Downloads event video clips from Unifi Protect"""
"""Downloads event video clips from Unifi Protect."""
def __init__(self, protect: ProtectApiClient, download_queue: asyncio.Queue, buffer_size: int = 256):
def __init__(
self,
protect: ProtectApiClient,
db: aiosqlite.Connection,
download_queue: asyncio.Queue,
upload_queue: VideoQueue,
color_logging: bool,
):
"""Init.
Args:
protect (ProtectApiClient): UniFi Protect API client to use
db (aiosqlite.Connection): Async SQLite database to check for missing events
download_queue (asyncio.Queue): Queue to get event details from
upload_queue (VideoQueue): Queue to place downloaded videos on
color_logging (bool): Whether or not to add color to logging output
"""
self._protect: ProtectApiClient = protect
self._download_queue: asyncio.Queue = download_queue
self.video_queue = VideoQueue(buffer_size * 1024 * 1024)
self._db: aiosqlite.Connection = db
self.download_queue: asyncio.Queue = download_queue
self.upload_queue: VideoQueue = upload_queue
self.current_event = None
self._failures = ExpiringDict(60 * 60 * 12) # Time to live = 12h
self.base_logger = logging.getLogger(__name__)
setup_event_logger(self.base_logger, color_logging)
self.logger = logging.LoggerAdapter(self.base_logger, {'event': ''})
# Check if `ffprobe` is available
ffprobe = shutil.which('ffprobe')
if ffprobe is not None:
logger.debug(f"ffprobe found: {ffprobe}")
self.logger.debug(f"ffprobe found: {ffprobe}")
self._has_ffprobe = True
else:
self._has_ffprobe = False
async def start(self):
"""Main loop"""
logger.info("Starting Downloader")
"""Main loop."""
self.logger.info("Starting Downloader")
while True:
try:
event = await self._download_queue.get()
# Wait for unifi protect to be connected
await self._protect.connect_event.wait()
event = await self.download_queue.get()
self.current_event = event
self.logger = logging.LoggerAdapter(self.base_logger, {'event': f' [{event.id}]'})
# Fix timezones since pyunifiprotect sets all timestamps to UTC. Instead localize them to
# the timezone of the unifi protect NVR.
event.start = event.start.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone)
event.end = event.end.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone)
logger.info(f"Downloading event: {event.id}")
logger.debug(f"Remaining Download Queue: {self._download_queue.qsize()}")
output_queue_current_size = human_readable_size(self.video_queue.qsize())
output_queue_max_size = human_readable_size(self.video_queue.maxsize)
logger.debug(f"Video Download Buffer: {output_queue_current_size}/{output_queue_max_size}")
logger.debug(f" Camera: {await get_camera_name(self._protect, event.camera_id)}")
self.logger.info(f"Downloading event: {event.id}")
self.logger.debug(f"Remaining Download Queue: {self.download_queue.qsize()}")
output_queue_current_size = human_readable_size(self.upload_queue.qsize())
output_queue_max_size = human_readable_size(self.upload_queue.maxsize)
self.logger.debug(f"Video Download Buffer: {output_queue_current_size}/{output_queue_max_size}")
self.logger.debug(f" Camera: {await get_camera_name(self._protect, event.camera_id)}")
if event.type == EventType.SMART_DETECT:
logger.debug(f" Type: {event.type} ({', '.join(event.smart_detect_types)})")
self.logger.debug(f" Type: {event.type} ({', '.join(event.smart_detect_types)})")
else:
logger.debug(f" Type: {event.type}")
logger.debug(f" Start: {event.start.strftime('%Y-%m-%dT%H-%M-%S')} ({event.start.timestamp()})")
logger.debug(f" End: {event.end.strftime('%Y-%m-%dT%H-%M-%S')} ({event.end.timestamp()})")
self.logger.debug(f" Type: {event.type}")
self.logger.debug(f" Start: {event.start.strftime('%Y-%m-%dT%H-%M-%S')} ({event.start.timestamp()})")
self.logger.debug(f" End: {event.end.strftime('%Y-%m-%dT%H-%M-%S')} ({event.end.timestamp()})")
duration = (event.end - event.start).total_seconds()
logger.debug(f" Duration: {duration}s")
self.logger.debug(f" Duration: {duration}s")
# Unifi protect does not return full video clips if the clip is requested too soon.
# There are two issues at play here:
@@ -89,54 +119,82 @@ class VideoDownloader:
time_since_event_ended = datetime.utcnow().replace(tzinfo=timezone.utc) - event.end
sleep_time = (timedelta(seconds=5 * 1.5) - time_since_event_ended).total_seconds()
if sleep_time > 0:
logger.debug(f" Sleeping ({sleep_time}s) to ensure clip is ready to download...")
self.logger.debug(f" Sleeping ({sleep_time}s) to ensure clip is ready to download...")
await asyncio.sleep(sleep_time)
video = await self._download(event)
if video is None:
try:
video = await self._download(event)
assert video is not None
except Exception as e:
# Increment failure count
if event.id not in self._failures:
self._failures[event.id] = 1
else:
self._failures[event.id] += 1
self.logger.warning(f"Event failed download attempt {self._failures[event.id]}", exc_info=e)
if self._failures[event.id] >= 10:
self.logger.error(
"Event has failed to download 10 times in a row. Permanently ignoring this event"
)
# ignore event
await self._db.execute(
"INSERT INTO events VALUES "
f"('{event.id}', '{event.type}', '{event.camera_id}',"
f"'{event.start.timestamp()}', '{event.end.timestamp()}')"
)
await self._db.commit()
continue
# Remove successfully downloaded event from failures list
if event.id in self._failures:
del self._failures[event.id]
# Get the actual length of the downloaded video using ffprobe
if self._has_ffprobe:
await self._check_video_length(video, duration)
await self.video_queue.put((event, video))
logger.debug("Added to upload queue")
await self.upload_queue.put((event, video))
self.logger.debug("Added to upload queue")
self.current_event = None
except Exception as e:
logger.warn(f"Unexpected exception occurred, abandoning event {event.id}:")
logger.exception(e)
self.logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e)
async def _download(self, event: Event) -> bytes:
"""Downloads the video clip for the given event"""
logger.debug(" Downloading video...")
async def _download(self, event: Event) -> Optional[bytes]:
"""Downloads the video clip for the given event."""
self.logger.debug(" Downloading video...")
for x in range(5):
assert isinstance(event.camera_id, str)
assert isinstance(event.start, datetime)
assert isinstance(event.end, datetime)
try:
video = await self._protect.get_camera_video(event.camera_id, event.start, event.end)
assert isinstance(video, bytes)
break
except (AssertionError, ClientPayloadError, TimeoutError) as e:
logger.warn(f" Failed download attempt {x+1}, retying in 1s")
logger.exception(e)
self.logger.warning(f" Failed download attempt {x+1}, retying in 1s", exc_info=e)
await asyncio.sleep(1)
else:
logger.warn(f"Download failed after 5 attempts, abandoning event {event.id}:")
return
self.logger.error(f"Download failed after 5 attempts, abandoning event {event.id}:")
return None
logger.debug(f" Downloaded video size: {human_readable_size(len(video))}s")
self.logger.debug(f" Downloaded video size: {human_readable_size(len(video))}s")
return video
async def _check_video_length(self, video, duration):
"""Check if the downloaded event is at least the length of the event, warn otherwise
"""Check if the downloaded event is at least the length of the event, warn otherwise.
It is expected for events to regularly be slightly longer than the event specified"""
It is expected for events to regularly be slightly longer than the event specified
"""
try:
downloaded_duration = await get_video_length(video)
msg = f" Downloaded video length: {downloaded_duration:.3f}s" f"({downloaded_duration - duration:+.3f}s)"
if downloaded_duration < duration:
logger.warning(msg)
self.logger.warning(msg)
else:
logger.debug(msg)
self.logger.debug(msg)
except SubprocessException as e:
logger.warn(" `ffprobe` failed")
logger.exception(e)
self.logger.warning(" `ffprobe` failed", exc_info=e)

View File

@@ -1,18 +1,20 @@
# noqa: D100
import asyncio
import logging
from time import sleep
import asyncio
from typing import List
from pyunifiprotect.data.websocket import WSAction, WSSubscriptionMessage
from pyunifiprotect.api import ProtectApiClient
from pyunifiprotect.data.nvr import Event
from pyunifiprotect.data.types import EventType
from pyunifiprotect.api import ProtectApiClient
from pyunifiprotect.data.websocket import WSAction, WSSubscriptionMessage
logger = logging.getLogger(__name__)
class EventListener:
"""Listens to the unifi protect websocket for new events to backup"""
"""Listens to the unifi protect websocket for new events to backup."""
def __init__(
self,
@@ -21,6 +23,14 @@ class EventListener:
detection_types: List[str],
ignore_cameras: List[str],
):
"""Init.
Args:
event_queue (asyncio.Queue): Queue to place events to backup on
protect (ProtectApiClient): UniFI Protect API client to use
detection_types (List[str]): Desired Event detection types to look for
ignore_cameras (List[str]): Cameras IDs to ignore events from
"""
self._event_queue: asyncio.Queue = event_queue
self._protect: ProtectApiClient = protect
self._unsub = None
@@ -28,7 +38,7 @@ class EventListener:
self.ignore_cameras: List[str] = ignore_cameras
async def start(self):
"""Main Loop"""
"""Main Loop."""
logger.debug("Subscribed to websocket")
self._unsub = self._protect.subscribe_websocket(self._websocket_callback)
@@ -71,7 +81,7 @@ class EventListener:
# TODO: Will this even work? I think it will block the async loop
while self._event_queue.full():
logger.extra_debug("Event queue full, waiting 1s...")
logger.extra_debug("Event queue full, waiting 1s...") # type: ignore
sleep(1)
self._event_queue.put_nowait(msg.new_obj)
@@ -85,20 +95,20 @@ class EventListener:
logger.debug(f"Adding event {msg.new_obj.id} to queue (Current download queue={self._event_queue.qsize()})")
async def _check_websocket_and_reconnect(self):
"""Checks for websocket disconnect and triggers a reconnect"""
"""Checks for websocket disconnect and triggers a reconnect."""
logger.extra_debug("Checking the status of the websocket...")
if self._protect.check_ws():
logger.extra_debug("Websocket is connected.")
else:
logger.warn("Lost connection to Unifi Protect.")
self._protect.connect_event.clear()
logger.warning("Lost connection to Unifi Protect.")
# Unsubscribe, close the session.
self._unsub()
await self._protect.close_session()
while True:
logger.warn("Attempting reconnect...")
logger.warning("Attempting reconnect...")
try:
# Start the pyunifiprotect connection by calling `update`
@@ -109,12 +119,12 @@ class EventListener:
self._unsub = self._protect.subscribe_websocket(self._websocket_callback)
break
else:
logger.warn("Unable to establish connection to Unifi Protect")
logger.error("Unable to establish connection to Unifi Protect")
except Exception as e:
logger.warn("Unexpected exception occurred while trying to reconnect:")
logger.exception(e)
logger.error("Unexpected exception occurred while trying to reconnect:", exc_info=e)
# Back off for a little while
await asyncio.sleep(10)
self._protect.connect_event.set()
logger.info("Re-established connection to Unifi Protect and to the websocket.")

View File

@@ -1,3 +1,5 @@
# noqa: D100
import asyncio
import logging
from datetime import datetime
@@ -5,79 +7,151 @@ from typing import List
import aiosqlite
from dateutil.relativedelta import relativedelta
from pyunifiprotect import ProtectApiClient
from pyunifiprotect.data.nvr import Event
from pyunifiprotect.data.types import EventType
from unifi_protect_backup import VideoDownloader, VideoUploader
logger = logging.getLogger(__name__)
class MissingEventChecker:
"""Periodically checks if any unifi protect events exist within the retention period that are not backed up"""
"""Periodically checks if any unifi protect events exist within the retention period that are not backed up."""
def __init__(
self,
protect: ProtectApiClient,
db: aiosqlite.Connection,
event_queue: asyncio.Queue,
download_queue: asyncio.Queue,
downloader: VideoDownloader,
uploader: VideoUploader,
retention: relativedelta,
detection_types: List[str],
ignore_cameras: List[str],
interval: int = 60 * 5,
) -> None:
"""Init.
Args:
protect (ProtectApiClient): UniFi Protect API client to use
db (aiosqlite.Connection): Async SQLite database to check for missing events
download_queue (asyncio.Queue): Download queue to check for on-going downloads
downloader (VideoDownloader): Downloader to check for on-going downloads
uploader (VideoUploader): Uploader to check for on-going uploads
retention (relativedelta): Retention period to limit search window
detection_types (List[str]): Detection types wanted to limit search
ignore_cameras (List[str]): Ignored camera IDs to limit search
interval (int): How frequently, in seconds, to check for missing events,
"""
self._protect: ProtectApiClient = protect
self._db: aiosqlite.Connection = db
self._event_queue: asyncio.Queue = event_queue
self._download_queue: asyncio.Queue = download_queue
self._downloader: VideoDownloader = downloader
self._uploader: VideoUploader = uploader
self.retention: relativedelta = retention
self.detection_types: List[str] = detection_types
self.ignore_cameras: List[str] = ignore_cameras
self.interval: int = interval
async def _get_missing_events(self) -> List[Event]:
# Get list of events that need to be backed up from unifi protect
unifi_events = await self._protect.get_events(
start=datetime.now() - self.retention,
end=datetime.now(),
types=[EventType.MOTION, EventType.SMART_DETECT, EventType.RING],
)
unifi_events = {event.id: event for event in unifi_events}
# Get list of events that have been backed up from the database
# events(id, type, camera_id, start, end)
async with self._db.execute("SELECT * FROM events") as cursor:
rows = await cursor.fetchall()
db_event_ids = {row[0] for row in rows}
# Prevent re-adding events currently in the download/upload queue
downloading_event_ids = {event.id for event in self._downloader.download_queue._queue} # type: ignore
current_download = self._downloader.current_event
if current_download is not None:
downloading_event_ids.add(current_download.id)
uploading_event_ids = {event.id for event, video in self._uploader.upload_queue._queue} # type: ignore
current_upload = self._uploader.current_event
if current_upload is not None:
uploading_event_ids.add(current_upload.id)
missing_event_ids = set(unifi_events.keys()) - (db_event_ids | downloading_event_ids | uploading_event_ids)
def wanted_event_type(event_id):
event = unifi_events[event_id]
if event.start is None or event.end is None:
return False # This event is still on-going
if event.camera_id in self.ignore_cameras:
return False
if event.type is EventType.MOTION and "motion" not in self.detection_types:
return False
if event.type is EventType.RING and "ring" not in self.detection_types:
return False
elif event.type is EventType.SMART_DETECT:
for event_smart_detection_type in event.smart_detect_types:
if event_smart_detection_type not in self.detection_types:
return False
return True
wanted_event_ids = set(filter(wanted_event_type, missing_event_ids))
return [unifi_events[id] for id in wanted_event_ids]
async def ignore_missing(self):
"""Ignore missing events by adding them to the event table."""
wanted_events = await self._get_missing_events()
logger.info(f" Ignoring {len(wanted_events)} missing events")
for event in wanted_events:
logger.extra_debug(f"Ignoring event '{event.id}'")
await self._db.execute(
"INSERT INTO events VALUES "
f"('{event.id}', '{event.type}', '{event.camera_id}',"
f"'{event.start.timestamp()}', '{event.end.timestamp()}')"
)
await self._db.commit()
async def start(self):
"""main loop"""
"""Main loop."""
logger.info("Starting Missing Event Checker")
while True:
try:
# Wait for unifi protect to be connected
await self._protect.connect_event.wait()
logger.extra_debug("Running check for missing events...")
# Get list of events that need to be backed up from unifi protect
unifi_events = await self._protect.get_events(
start=datetime.now() - self.retention,
end=datetime.now(),
types=[EventType.MOTION, EventType.SMART_DETECT, EventType.RING],
)
unifi_events = {event.id: event for event in unifi_events}
# Get list of events that have been backed up from the database
wanted_events = await self._get_missing_events()
# events(id, type, camera_id, start, end)
async with self._db.execute("SELECT * FROM events") as cursor:
rows = await cursor.fetchall()
db_event_ids = {row[0] for row in rows}
logger.debug(f" Undownloaded events of wanted types: {len(wanted_events)}")
# Prevent re-adding events currently in the download queue
downloading_event_ids = {event.id for event in self._event_queue._queue}
if len(wanted_events) > 20:
logger.warning(f" Adding {len(wanted_events)} missing events to backup queue")
missing_logger = logger.extra_debug
else:
missing_logger = logger.warning
missing_event_ids = set(unifi_events.keys()) - (db_event_ids | downloading_event_ids)
for event_id in missing_event_ids:
event = unifi_events[event_id]
if event.start is None or event.end is None:
continue # This event is still on-going
if event.type is EventType.MOTION and "motion" not in self.detection_types:
continue
if event.type is EventType.RING and "ring" not in self.detection_types:
continue
elif event.type is EventType.SMART_DETECT:
for event_smart_detection_type in event.smart_detect_types:
if event_smart_detection_type not in self.detection_types:
continue
for event in wanted_events:
if event.type != EventType.SMART_DETECT:
event_name = f"{event.id} ({event.type})"
else:
logger.warning(
f" Adding missing event to backup queue: {event.id} ({event.type}) ({event.start.strftime('%Y-%m-%dT%H-%M-%S')} - {event.end.strftime('%Y-%m-%dT%H-%M-%S')})"
)
await self._event_queue.put(event)
event_name = f"{event.id} ({', '.join(event.smart_detect_types)})"
missing_logger(
f" Adding missing event to backup queue: {event_name}"
f" ({event.start.strftime('%Y-%m-%dT%H-%M-%S')} -"
f" {event.end.strftime('%Y-%m-%dT%H-%M-%S')})"
)
await self._download_queue.put(event)
except Exception as e:
logger.warn(f"Unexpected exception occurred during missing event check:")
logger.exception(e)
logger.error("Unexpected exception occurred during missing event check:", exc_info=e)
await asyncio.sleep(self.interval)

View File

@@ -0,0 +1,18 @@
"""A 'singleton' module for registering apprise notifiers."""
import apprise
notifier = apprise.Apprise()
def add_notification_service(url):
"""Add apprise URI with support for tags e.g. TAG1,TAG2=PROTOCOL://settings."""
config = apprise.AppriseConfig()
config.add_config(url, format='text')
# If not tags are specified, default to errors otherwise ALL logging will
# be spammed to the notification service
if not config.servers()[0].tags:
config.servers()[0].tags = {'ERROR'}
notifier.add(config)

View File

@@ -1,4 +1,5 @@
import asyncio
# noqa: D100
import logging
import time
from datetime import datetime
@@ -6,40 +7,53 @@ from datetime import datetime
import aiosqlite
from dateutil.relativedelta import relativedelta
from unifi_protect_backup.utils import parse_rclone_retention, run_command
from unifi_protect_backup.utils import run_command, wait_until
logger = logging.getLogger(__name__)
async def wait_until(dt):
# sleep until the specified datetime
now = datetime.now()
await asyncio.sleep((dt - now).total_seconds())
async def delete_file(file_path):
returncode, stdout, stderr = await run_command(f'rclone delete -vv "{file_path}"')
async def delete_file(file_path, rclone_purge_args):
"""Deletes `file_path` via rclone."""
returncode, stdout, stderr = await run_command(f'rclone delete -vv "{file_path}" {rclone_purge_args}')
if returncode != 0:
logger.warn(f" Failed to delete file: '{file_path}'")
logger.error(f" Failed to delete file: '{file_path}'")
async def tidy_empty_dirs(base_dir_path):
"""Deletes any empty directories in `base_dir_path` via rclone."""
returncode, stdout, stderr = await run_command(f'rclone rmdirs -vv --ignore-errors --leave-root "{base_dir_path}"')
if returncode != 0:
logger.warn(f" Failed to tidy empty dirs")
logger.error(" Failed to tidy empty dirs")
class Purge:
"""Deletes old files from rclone remotes"""
"""Deletes old files from rclone remotes."""
def __init__(self, db: aiosqlite.Connection, retention: relativedelta, rclone_destination: str, interval: int = 60):
def __init__(
self,
db: aiosqlite.Connection,
retention: relativedelta,
rclone_destination: str,
interval: relativedelta = relativedelta(days=1),
rclone_purge_args: str = "",
):
"""Init.
Args:
db (aiosqlite.Connection): Async SQlite database connection to purge clips from
retention (relativedelta): How long clips should be kept
rclone_destination (str): What rclone destination the clips are stored in
interval (relativedelta): How often to purge old clips
rclone_purge_args (str): Optional extra arguments to pass to `rclone delete` directly.
"""
self._db: aiosqlite.Connection = db
self.retention: relativedelta = retention
self.rclone_destination: str = rclone_destination
self.interval: int = interval
self.interval: relativedelta = interval
self.rclone_purge_args: str = rclone_purge_args
async def start(self):
"""Main loop - runs forever"""
"""Main loop - runs forever."""
while True:
try:
deleted_a_file = False
@@ -57,7 +71,7 @@ class Purge:
async with self._db.execute(f"SELECT * FROM backups WHERE id = '{event_id}'") as backup_cursor:
async for _, remote, file_path in backup_cursor:
logger.debug(f" Deleted: {remote}:{file_path}")
await delete_file(f"{remote}:{file_path}")
await delete_file(f"{remote}:{file_path}", self.rclone_purge_args)
deleted_a_file = True
# delete event from database
@@ -69,7 +83,8 @@ class Purge:
await tidy_empty_dirs(self.rclone_destination)
except Exception as e:
logger.warn(f"Unexpected exception occurred during purge:")
logger.exception(e)
logger.error("Unexpected exception occurred during purge:", exc_info=e)
await asyncio.sleep(self.interval)
next_purge_time = datetime.now() + self.interval
logger.extra_debug(f'sleeping until {next_purge_time}')
await wait_until(next_purge_time)

View File

@@ -3,168 +3,37 @@ import asyncio
import logging
import os
import shutil
from cmath import log
from pprint import pprint
from time import sleep
from typing import Callable, List, Optional
from datetime import datetime, timezone
from typing import Callable, List
import aiosqlite
from pyunifiprotect import ProtectApiClient
from pyunifiprotect.data.types import ModelType
from unifi_protect_backup import EventListener, MissingEventChecker, Purge, VideoDownloader, VideoUploader
from unifi_protect_backup.utils import SubprocessException, parse_rclone_retention, run_command
from unifi_protect_backup import (
EventListener,
MissingEventChecker,
Purge,
VideoDownloader,
VideoUploader,
notifications,
)
from unifi_protect_backup.utils import (
SubprocessException,
VideoQueue,
human_readable_size,
parse_rclone_retention,
run_command,
setup_logging,
)
logger = logging.getLogger(__name__)
# TODO: https://github.com/cjrh/aiorun#id6 (smart shield)
def add_logging_level(levelName: str, levelNum: int, methodName: Optional[str] = None) -> None:
"""Comprehensively adds a new logging level to the `logging` module and the currently configured logging class.
`levelName` becomes an attribute of the `logging` module with the value
`levelNum`. `methodName` becomes a convenience method for both `logging`
itself and the class returned by `logging.getLoggerClass()` (usually just
`logging.Logger`).
To avoid accidental clobbering of existing attributes, this method will
raise an `AttributeError` if the level name is already an attribute of the
`logging` module or if the method name is already present
Credit: https://stackoverflow.com/a/35804945
Args:
levelName (str): The name of the new logging level (in all caps).
levelNum (int): The priority value of the logging level, lower=more verbose.
methodName (str): The name of the method used to log using this.
If `methodName` is not specified, `levelName.lower()` is used.
Example:
::
>>> add_logging_level('TRACE', logging.DEBUG - 5)
>>> logging.getLogger(__name__).setLevel("TRACE")
>>> logging.getLogger(__name__).trace('that worked')
>>> logging.trace('so did this')
>>> logging.TRACE
5
"""
if not methodName:
methodName = levelName.lower()
if hasattr(logging, levelName):
raise AttributeError('{} already defined in logging module'.format(levelName))
if hasattr(logging, methodName):
raise AttributeError('{} already defined in logging module'.format(methodName))
if hasattr(logging.getLoggerClass(), methodName):
raise AttributeError('{} already defined in logger class'.format(methodName))
# This method was inspired by the answers to Stack Overflow post
# http://stackoverflow.com/q/2183233/2988730, especially
# http://stackoverflow.com/a/13638084/2988730
def logForLevel(self, message, *args, **kwargs):
if self.isEnabledFor(levelNum):
self._log(levelNum, message, args, **kwargs)
def logToRoot(message, *args, **kwargs):
logging.log(levelNum, message, *args, **kwargs)
logging.addLevelName(levelNum, levelName)
setattr(logging, levelName, levelNum)
setattr(logging.getLoggerClass(), methodName, logForLevel)
setattr(logging, methodName, logToRoot)
def setup_logging(verbosity: int) -> None:
"""Configures loggers to provided the desired level of verbosity.
Verbosity 0: Only log info messages created by `unifi-protect-backup`, and all warnings
verbosity 1: Only log info & debug messages created by `unifi-protect-backup`, and all warnings
verbosity 2: Log info & debug messages created by `unifi-protect-backup`, command output, and
all warnings
Verbosity 3: Log debug messages created by `unifi-protect-backup`, command output, all info
messages, and all warnings
Verbosity 4: Log debug messages created by `unifi-protect-backup` command output, all info
messages, all warnings, and websocket data
Verbosity 5: Log websocket data, command output, all debug messages, all info messages and all
warnings
Args:
verbosity (int): The desired level of verbosity
"""
add_logging_level(
'EXTRA_DEBUG',
logging.DEBUG - 1,
)
add_logging_level(
'WEBSOCKET_DATA',
logging.DEBUG - 2,
)
format = "{asctime} [{levelname:^11s}] {name:<42} :\t{message}"
date_format = "%Y-%m-%d %H:%M:%S"
style = '{'
logger = logging.getLogger("unifi_protect_backup")
sh = logging.StreamHandler()
formatter = logging.Formatter(format, date_format, style)
sh.setFormatter(formatter)
def decorate_emit(fn):
# add methods we need to the class
def new(*args):
levelno = args[0].levelno
if levelno >= logging.CRITICAL:
color = '\x1b[31;1m' # RED
elif levelno >= logging.ERROR:
color = '\x1b[31;1m' # RED
elif levelno >= logging.WARNING:
color = '\x1b[33;1m' # YELLOW
elif levelno >= logging.INFO:
color = '\x1b[32;1m' # GREEN
elif levelno >= logging.DEBUG:
color = '\x1b[36;1m' # CYAN
elif levelno >= logging.EXTRA_DEBUG:
color = '\x1b[35;1m' # MAGENTA
else:
color = '\x1b[0m'
# add colored *** in the beginning of the message
args[0].levelname = f"{color}{args[0].levelname:^11s}\x1b[0m"
return fn(*args)
return new
sh.emit = decorate_emit(sh.emit)
logger.addHandler(sh)
logger.propagate = False
if verbosity == 0:
logging.basicConfig(level=logging.WARN, format=format, style=style, datefmt=date_format)
logger.setLevel(logging.INFO)
elif verbosity == 1:
logging.basicConfig(level=logging.WARN, format=format, style=style, datefmt=date_format)
logger.setLevel(logging.DEBUG)
elif verbosity == 2:
logging.basicConfig(level=logging.WARN, format=format, style=style, datefmt=date_format)
logger.setLevel(logging.EXTRA_DEBUG) # type: ignore
elif verbosity == 3:
logging.basicConfig(level=logging.INFO, format=format, style=style, datefmt=date_format)
logger.setLevel(logging.EXTRA_DEBUG) # type: ignore
elif verbosity == 4:
logging.basicConfig(level=logging.INFO, format=format, style=style, datefmt=date_format)
logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore
elif verbosity >= 5:
logging.basicConfig(level=logging.DEBUG, format=format, style=style, datefmt=date_format)
logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore
async def create_database(path: str):
"""Creates sqlite database and creates the events abd backups tables"""
"""Creates sqlite database and creates the events abd backups tables."""
db = await aiosqlite.connect(path)
await db.execute("CREATE TABLE events(id PRIMARY KEY, type, camera_id, start REAL, end REAL)")
await db.execute(
@@ -190,11 +59,17 @@ class UnifiProtectBackup:
rclone_destination: str,
retention: str,
rclone_args: str,
rclone_purge_args: str,
detection_types: List[str],
ignore_cameras: List[str],
file_structure_format: str,
verbose: int,
download_buffer_size: int,
purge_interval: str,
apprise_notifiers: str,
skip_missing: bool,
sqlite_path: str = "events.sqlite",
color_logging=False,
port: int = 443,
):
"""Will configure logging settings and the Unifi Protect API (but not actually connect).
@@ -213,13 +88,27 @@ class UnifiProtectBackup:
(https://rclone.org/filtering/#max-age-don-t-transfer-any-file-older-than-this)
rclone_args (str): A bandwidth limit which is passed to the `--bwlimit` argument of
`rclone` (https://rclone.org/docs/#bwlimit-bandwidth-spec)
rclone_purge_args (str): Optional extra arguments to pass to `rclone delete` directly.
detection_types (List[str]): List of which detection types to backup.
ignore_cameras (List[str]): List of camera IDs for which to not backup events.
file_structure_format (str): A Python format string for output file path.
verbose (int): How verbose to setup logging, see :func:`setup_logging` for details.
download_buffer_size (int): How many bytes big the download buffer should be
purge_interval (str): How often to check for files to delete
apprise_notifiers (str): Apprise URIs for notifications
skip_missing (bool): If initial missing events should be ignored
sqlite_path (str): Path where to find/create sqlite database
color_logging (bool): Whether to add color to logging output or not
"""
setup_logging(verbose)
self.color_logging = color_logging
setup_logging(verbose, self.color_logging)
for notifier in apprise_notifiers:
try:
notifications.add_notification_service(notifier)
except Exception as e:
logger.error(f"Error occurred when setting up logger `{notifier}`", exc_info=e)
raise
logger.debug("Config:")
logger.debug(f" {address=}")
@@ -234,15 +123,21 @@ class UnifiProtectBackup:
logger.debug(f" {rclone_destination=}")
logger.debug(f" {retention=}")
logger.debug(f" {rclone_args=}")
logger.debug(f" {rclone_purge_args=}")
logger.debug(f" {ignore_cameras=}")
logger.debug(f" {verbose=}")
logger.debug(f" {detection_types=}")
logger.debug(f" {file_structure_format=}")
logger.debug(f" {sqlite_path=}")
logger.debug(f" download_buffer_size={human_readable_size(download_buffer_size)}")
logger.debug(f" {purge_interval=}")
logger.debug(f" {apprise_notifiers=}")
logger.debug(f" {skip_missing=}")
self.rclone_destination = rclone_destination
self.retention = parse_rclone_retention(retention)
self.rclone_args = rclone_args
self.rclone_purge_args = rclone_purge_args
self.file_structure_format = file_structure_format
self.address = address
@@ -266,6 +161,9 @@ class UnifiProtectBackup:
self._has_ffprobe = False
self._sqlite_path = sqlite_path
self._db = None
self._download_buffer_size = download_buffer_size
self._purge_interval = parse_rclone_retention(purge_interval)
self._skip_missing = skip_missing
async def start(self):
"""Bootstrap the backup process and kick off the main loop.
@@ -275,6 +173,8 @@ class UnifiProtectBackup:
"""
try:
logger.info("Starting...")
if notifications.notifier.servers:
await notifications.notifier.async_notify("Starting UniFi Protect Backup")
# Ensure `rclone` is installed and properly configured
logger.info("Checking rclone configuration...")
@@ -282,13 +182,31 @@ class UnifiProtectBackup:
# Start the pyunifiprotect connection by calling `update`
logger.info("Connecting to Unifi Protect...")
await self._protect.update()
for attempts in range(1):
try:
await self._protect.update()
break
except Exception as e:
logger.warning(f"Failed to connect to UniFi Protect, retrying in {attempts}s...", exc_info=e)
await asyncio.sleep(attempts)
else:
raise ConnectionError("Failed to connect to UniFi Protect after 10 attempts")
# Add a lock to the protect client that can be used to prevent code accessing the client when it has
# lost connection
self._protect.connect_event = asyncio.Event()
self._protect.connect_event.set()
# Get a mapping of camera ids -> names
logger.info("Found cameras:")
for camera in self._protect.bootstrap.cameras.values():
logger.info(f" - {camera.id}: {camera.name}")
# Print timezone info for debugging
logger.debug(f'NVR TZ: {self._protect.bootstrap.nvr.timezone}')
logger.debug(f'Local TZ: {datetime.now(timezone.utc).astimezone().tzinfo}')
tasks = []
if not os.path.exists(self._sqlite_path):
@@ -297,49 +215,63 @@ class UnifiProtectBackup:
else:
self._db = await aiosqlite.connect(self._sqlite_path)
event_queue = asyncio.Queue()
download_queue = asyncio.Queue()
upload_queue = VideoQueue(self._download_buffer_size)
# Enable foreign keys in the database
await self._db.execute("PRAGMA foreign_keys = ON;")
# Create downloader task
# This will download video files to its buffer
downloader = VideoDownloader(self._protect, event_queue) # TODO: Make buffer size configurable
tasks.append(asyncio.create_task(downloader.start()))
downloader = VideoDownloader(self._protect, self._db, download_queue, upload_queue, self.color_logging)
tasks.append(downloader.start())
# Create upload task
# This will upload the videos in the downloader's buffer to the rclone remotes and log it in the database
uploader = VideoUploader(
self._protect,
downloader.video_queue,
upload_queue,
self.rclone_destination,
self.rclone_args,
self.file_structure_format,
self._db,
self.color_logging,
)
tasks.append(asyncio.create_task(uploader.start()))
tasks.append(uploader.start())
# Create event listener task
# This will connect to the unifi protect websocket and listen for events. When one is detected it will
# be added to the queue of events to download
event_listener = EventListener(event_queue, self._protect, self.detection_types, self.ignore_cameras)
tasks.append(asyncio.create_task(event_listener.start()))
event_listener = EventListener(download_queue, self._protect, self.detection_types, self.ignore_cameras)
tasks.append(event_listener.start())
# Create purge task
# This will, every midnight, purge old backups from the rclone remotes and database
purge = Purge(self._db, self.retention, self.rclone_destination)
tasks.append(asyncio.create_task(purge.start()))
purge = Purge(
self._db, self.retention, self.rclone_destination, self._purge_interval, self.rclone_purge_args
)
tasks.append(purge.start())
# Create missing event task
# This will check all the events within the retention period, if any have been missed and not backed up
# they will be added to the event queue
missing = MissingEventChecker(
self._protect, self._db, event_queue, self.retention, self.detection_types, self.ignore_cameras
self._protect,
self._db,
download_queue,
downloader,
uploader,
self.retention,
self.detection_types,
self.ignore_cameras,
)
tasks.append(asyncio.create_task(missing.start()))
if self._skip_missing:
logger.info("Ignoring missing events")
await missing.ignore_missing()
tasks.append(missing.start())
logger.info("Starting...")
await asyncio.gather(*tasks)
logger.info("Starting Tasks...")
await asyncio.gather(*[asyncio.create_task(task) for task in tasks])
except asyncio.CancelledError:
if self._protect is not None:
@@ -347,6 +279,11 @@ class UnifiProtectBackup:
if self._db is not None:
await self._db.close()
except Exception as e:
logger.error("Unexpected exception occurred in main loop:", exc_info=e)
await asyncio.sleep(10) # Give remaining tasks a chance to complete e.g sending notifications
raise
async def _check_rclone(self) -> None:
"""Check if rclone is installed and the specified remote is configured.

View File

@@ -1,20 +1,19 @@
import asyncio
# noqa: D100
import logging
import pathlib
import re
from datetime import datetime
import aiosqlite
from pyunifiprotect.data.nvr import Event
from pyunifiprotect import ProtectApiClient
from pyunifiprotect.data.nvr import Event
from unifi_protect_backup.utils import get_camera_name, SubprocessException, VideoQueue
logger = logging.getLogger(__name__)
from unifi_protect_backup.utils import VideoQueue, get_camera_name, human_readable_size, run_command, setup_event_logger
class VideoUploader:
"""Uploads videos from the video_queue to the provided rclone destination
"""Uploads videos from the video_queue to the provided rclone destination.
Keeps a log of what its uploaded in `db`
"""
@@ -22,43 +21,67 @@ class VideoUploader:
def __init__(
self,
protect: ProtectApiClient,
video_queue: VideoQueue,
upload_queue: VideoQueue,
rclone_destination: str,
rclone_args: str,
file_structure_format: str,
db: aiosqlite.Connection,
color_logging: bool,
):
"""Init.
Args:
protect (ProtectApiClient): UniFi Protect API client to use
upload_queue (VideoQueue): Queue to get video files from
rclone_destination (str): rclone file destination URI
rclone_args (str): arguments to pass to the rclone command
file_structure_format (str): format string for how to structure the uploaded files
db (aiosqlite.Connection): Async SQlite database connection
color_logging (bool): Whether or not to add color to logging output
"""
self._protect: ProtectApiClient = protect
self._video_queue: VideoQueue = video_queue
self.upload_queue: VideoQueue = upload_queue
self._rclone_destination: str = rclone_destination
self._rclone_args: str = rclone_args
self._file_structure_format: str = file_structure_format
self._db: aiosqlite.Connection = db
self.current_event = None
self.base_logger = logging.getLogger(__name__)
setup_event_logger(self.base_logger, color_logging)
self.logger = logging.LoggerAdapter(self.base_logger, {'event': ''})
async def start(self):
"""Main loop
"""Main loop.
Runs forever looking for video data in the video queue and then uploads it using rclone, finally it updates the database
Runs forever looking for video data in the video queue and then uploads it
using rclone, finally it updates the database
"""
logger.info("Starting Uploader")
self.logger.info("Starting Uploader")
while True:
try:
event, video = await self._video_queue.get()
logger.info(f"Uploading event: {event.id}")
logger.debug(f" Remaining Upload Queue: {self._video_queue.qsize_files()}")
event, video = await self.upload_queue.get()
self.current_event = event
self.logger = logging.LoggerAdapter(self.base_logger, {'event': f' [{event.id}]'})
self.logger.info(f"Uploading event: {event.id}")
self.logger.debug(
f" Remaining Upload Queue: {self.upload_queue.qsize_files()}"
f" ({human_readable_size(self.upload_queue.qsize())})"
)
destination = await self._generate_file_path(event)
logger.debug(f" Destination: {destination}")
self.logger.debug(f" Destination: {destination}")
await self._upload_video(video, destination, self._rclone_args)
await self._update_database(event, destination)
logger.debug(f"Uploaded")
self.logger.debug("Uploaded")
self.current_event = None
except Exception as e:
logger.warn(f"Unexpected exception occurred, abandoning event {event.id}:")
logger.exception(e)
self.logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e)
async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str):
"""Upload video using rclone.
@@ -74,28 +97,18 @@ class VideoUploader:
Raises:
RuntimeError: If rclone returns a non-zero exit code
"""
cmd = f'rclone rcat -vv {rclone_args} "{destination}"'
proc = await asyncio.create_subprocess_shell(
cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate(video)
if proc.returncode == 0:
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
else:
raise SubprocessException(stdout.decode(), stderr.decode(), proc.returncode)
returncode, stdout, stderr = await run_command(f'rclone rcat -vv {rclone_args} "{destination}"', video)
if returncode != 0:
self.logger.error(f" Failed to upload file: '{destination}'")
async def _update_database(self, event: Event, destination: str):
"""
Add the backed up event to the database along with where it was backed up to
"""
"""Add the backed up event to the database along with where it was backed up to."""
assert isinstance(event.start, datetime)
assert isinstance(event.end, datetime)
await self._db.execute(
f"""INSERT INTO events VALUES
('{event.id}', '{event.type}', '{event.camera_id}', '{event.start.timestamp()}', '{event.end.timestamp()}')
"""
"INSERT INTO events VALUES "
f"('{event.id}', '{event.type}', '{event.camera_id}',"
f"'{event.start.timestamp()}', '{event.end.timestamp()}')"
)
remote, file_path = str(destination).split(":")

View File

@@ -1,14 +1,255 @@
"""Utility functions used throughout the code, kept here to allow re use and/or minimize clutter elsewhere."""
import asyncio
import logging
import re
import asyncio
from datetime import datetime
from typing import List, Optional
from apprise import NotifyType
from dateutil.relativedelta import relativedelta
from pyunifiprotect import ProtectApiClient
from pyunifiprotect.data.nvr import Event
from async_lru import alru_cache
from unifi_protect_backup import notifications
logger = logging.getLogger(__name__)
def add_logging_level(levelName: str, levelNum: int, methodName: Optional[str] = None) -> None:
"""Comprehensively adds a new logging level to the `logging` module and the currently configured logging class.
`levelName` becomes an attribute of the `logging` module with the value
`levelNum`. `methodName` becomes a convenience method for both `logging`
itself and the class returned by `logging.getLoggerClass()` (usually just
`logging.Logger`).
To avoid accidental clobbering of existing attributes, this method will
raise an `AttributeError` if the level name is already an attribute of the
`logging` module or if the method name is already present
Credit: https://stackoverflow.com/a/35804945
Args:
levelName (str): The name of the new logging level (in all caps).
levelNum (int): The priority value of the logging level, lower=more verbose.
methodName (str): The name of the method used to log using this.
If `methodName` is not specified, `levelName.lower()` is used.
Example:
::
>>> add_logging_level('TRACE', logging.DEBUG - 5)
>>> logging.getLogger(__name__).setLevel("TRACE")
>>> logging.getLogger(__name__).trace('that worked')
>>> logging.trace('so did this')
>>> logging.TRACE
5
"""
if not methodName:
methodName = levelName.lower()
if hasattr(logging, levelName):
raise AttributeError('{} already defined in logging module'.format(levelName))
if hasattr(logging, methodName):
raise AttributeError('{} already defined in logging module'.format(methodName))
if hasattr(logging.getLoggerClass(), methodName):
raise AttributeError('{} already defined in logger class'.format(methodName))
# This method was inspired by the answers to Stack Overflow post
# http://stackoverflow.com/q/2183233/2988730, especially
# http://stackoverflow.com/a/13638084/2988730
def logForLevel(self, message, *args, **kwargs):
if self.isEnabledFor(levelNum):
self._log(levelNum, message, args, **kwargs)
def logToRoot(message, *args, **kwargs):
logging.log(levelNum, message, *args, **kwargs)
def adapterLog(self, msg, *args, **kwargs):
"""Delegate an error call to the underlying logger."""
self.log(levelNum, msg, *args, **kwargs)
logging.addLevelName(levelNum, levelName)
setattr(logging, levelName, levelNum)
setattr(logging.getLoggerClass(), methodName, logForLevel)
setattr(logging, methodName, logToRoot)
setattr(logging.LoggerAdapter, methodName, adapterLog)
color_logging = False
def add_color_to_record_levelname(record):
"""Colorizes logging level names."""
levelno = record.levelno
if levelno >= logging.CRITICAL:
color = '\x1b[31;1m' # RED
elif levelno >= logging.ERROR:
color = '\x1b[31;1m' # RED
elif levelno >= logging.WARNING:
color = '\x1b[33;1m' # YELLOW
elif levelno >= logging.INFO:
color = '\x1b[32;1m' # GREEN
elif levelno >= logging.DEBUG:
color = '\x1b[36;1m' # CYAN
elif levelno >= logging.EXTRA_DEBUG:
color = '\x1b[35;1m' # MAGENTA
else:
color = '\x1b[0m'
return f"{color}{record.levelname}\x1b[0m"
class AppriseStreamHandler(logging.StreamHandler):
"""Logging handler that also sends logging output to configured Apprise notifiers."""
def __init__(self, color_logging: bool, *args, **kwargs):
"""Init.
Args:
color_logging (bool): If true logging levels will be colorized
"""
super().__init__(*args, **kwargs)
self.color_logging = color_logging
def _emit_apprise(self, record):
try:
loop = asyncio.get_event_loop()
except RuntimeError:
return # There is no running loop
msg = self.format(record)
logging_map = {
logging.ERROR: NotifyType.FAILURE,
logging.WARNING: NotifyType.WARNING,
logging.INFO: NotifyType.INFO,
logging.DEBUG: NotifyType.INFO,
logging.EXTRA_DEBUG: NotifyType.INFO,
logging.WEBSOCKET_DATA: NotifyType.INFO,
}
# Only try notifying if there are notification servers configured
# and the asyncio loop isn't closed (aka we are quitting)
if notifications.notifier.servers and not loop.is_closed():
notify = notifications.notifier.async_notify(
body=msg,
title=record.levelname,
notify_type=logging_map[record.levelno],
tag=[record.levelname],
)
if loop.is_running():
asyncio.create_task(notify)
else:
loop.run_until_complete(notify)
def _emit_stream(self, record):
record.levelname = f"{record.levelname:^11s}" # Pad level name to max width
if self.color_logging:
record.levelname = add_color_to_record_levelname(record)
msg = self.format(record)
stream = self.stream
# issue 35046: merged two stream.writes into one.
stream.write(msg + self.terminator)
self.flush()
def emit(self, record):
"""Emit log to stdout and apprise."""
try:
self._emit_apprise(record)
except RecursionError: # See issue 36272
raise
except Exception:
self.handleError(record)
try:
self._emit_stream(record)
except RecursionError: # See issue 36272
raise
except Exception:
self.handleError(record)
def create_logging_handler(format, color_logging):
"""Constructs apprise logging handler for the given format."""
date_format = "%Y-%m-%d %H:%M:%S"
style = '{'
sh = AppriseStreamHandler(color_logging)
formatter = logging.Formatter(format, date_format, style)
sh.setFormatter(formatter)
return sh
def setup_logging(verbosity: int, color_logging: bool = False, apprise_notifiers: List[str] = []) -> None:
"""Configures loggers to provided the desired level of verbosity.
Verbosity 0: Only log info messages created by `unifi-protect-backup`, and all warnings
verbosity 1: Only log info & debug messages created by `unifi-protect-backup`, and all warnings
verbosity 2: Log info & debug messages created by `unifi-protect-backup`, command output, and
all warnings
Verbosity 3: Log debug messages created by `unifi-protect-backup`, command output, all info
messages, and all warnings
Verbosity 4: Log debug messages created by `unifi-protect-backup` command output, all info
messages, all warnings, and websocket data
Verbosity 5: Log websocket data, command output, all debug messages, all info messages and all
warnings
Args:
verbosity (int): The desired level of verbosity
color_logging (bool): If colors should be used in the log (default=False)
apprise_notifiers (List[str]): Notification services to hook into the logger
"""
add_logging_level(
'EXTRA_DEBUG',
logging.DEBUG - 1,
)
add_logging_level(
'WEBSOCKET_DATA',
logging.DEBUG - 2,
)
format = "{asctime} [{levelname:^11s}] {name:<42} : {message}"
sh = create_logging_handler(format, color_logging)
logger = logging.getLogger("unifi_protect_backup")
logger.addHandler(sh)
logger.propagate = False
if verbosity == 0:
logging.basicConfig(level=logging.WARN, handlers=[sh])
logger.setLevel(logging.INFO)
elif verbosity == 1:
logging.basicConfig(level=logging.WARN, handlers=[sh])
logger.setLevel(logging.DEBUG)
elif verbosity == 2:
logging.basicConfig(level=logging.WARN, handlers=[sh])
logger.setLevel(logging.EXTRA_DEBUG) # type: ignore
elif verbosity == 3:
logging.basicConfig(level=logging.INFO, handlers=[sh])
logger.setLevel(logging.EXTRA_DEBUG) # type: ignore
elif verbosity == 4:
logging.basicConfig(level=logging.INFO, handlers=[sh])
logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore
elif verbosity >= 5:
logging.basicConfig(level=logging.DEBUG, handlers=[sh])
logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore
def setup_event_logger(logger, color_logging):
"""Sets up a logger that also displays the event ID currently being processed."""
format = "{asctime} [{levelname:^11s}] {name:<42} :{event} {message}"
sh = create_logging_handler(format, color_logging)
logger.addHandler(sh)
logger.propagate = False
_suffixes = ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]
def human_readable_size(num: float):
"""Turns a number into a human readable number with ISO/IEC 80000 binary prefixes.
@@ -17,19 +258,37 @@ def human_readable_size(num: float):
Args:
num (int): The number to be converted into human readable format
"""
for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]:
for unit in _suffixes:
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}"
num /= 1024.0
raise ValueError("`num` too large, ran out of prefixes")
def human_readable_to_float(num: str):
"""Turns a human readable ISO/IEC 80000 suffix value to its full float value."""
pattern = r"([\d.]+)(" + "|".join(_suffixes) + ")"
result = re.match(pattern, num)
if result is None:
raise ValueError(f"Value '{num}' is not a valid ISO/IEC 80000 binary value")
value = float(result[1])
suffix = result[2]
multiplier = 1024 ** _suffixes.index(suffix)
return value * multiplier
# Cached so that actions like uploads can continue when the connection to the api is lost
# No max size, and a 6 hour ttl
@alru_cache(None, ttl=60 * 60 * 6)
async def get_camera_name(protect: ProtectApiClient, id: str):
"""
Returns the name for the camera with the given ID
"""Returns the name for the camera with the given ID.
If the camera ID is not know, it tries refreshing the cached data
"""
# Wait for unifi protect to be connected
await protect.connect_event.wait()
try:
return protect.bootstrap.cameras[id].name
except KeyError:
@@ -49,6 +308,8 @@ async def get_camera_name(protect: ProtectApiClient, id: str):
class SubprocessException(Exception):
"""Class to capture: stdout, stderr, and return code of Subprocess errors."""
def __init__(self, stdout, stderr, returncode):
"""Exception class for when rclone does not exit with `0`.
@@ -68,11 +329,7 @@ class SubprocessException(Exception):
def parse_rclone_retention(retention: str) -> relativedelta:
"""
Parses the rclone `retention` parameter into a relativedelta which can then be used
to calculate datetimes
"""
"""Parses the rclone `retention` parameter into a relativedelta which can then be used to calculate datetimes."""
matches = {k: int(v) for v, k in re.findall(r"([\d]+)(ms|s|m|h|d|w|M|y)", retention)}
return relativedelta(
microseconds=matches.get("ms", 0) * 1000,
@@ -82,38 +339,40 @@ def parse_rclone_retention(retention: str) -> relativedelta:
days=matches.get("d", 0),
weeks=matches.get("w", 0),
months=matches.get("M", 0),
years=matches.get("Y", 0),
years=matches.get("y", 0),
)
async def run_command(cmd: str):
"""
Runs the given command returning the exit code, stdout and stderr
"""
async def run_command(cmd: str, data=None):
"""Runs the given command returning the exit code, stdout and stderr."""
proc = await asyncio.create_subprocess_shell(
cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
stdout = stdout.decode().replace('\n', '\n\t').strip()
stderr = stderr.decode().replace('\n', '\n\t').strip()
stdout, stderr = await proc.communicate(data)
stdout = stdout.decode()
stdout_indented = '\t' + stdout.replace('\n', '\n\t').strip()
stderr = stderr.decode()
stderr_indented = '\t' + stderr.replace('\n', '\n\t').strip()
if proc.returncode != 0:
logger.warn(f"Failed to run: '{cmd}")
logger.warn(f"stdout:\n{stdout}")
logger.warn(f"stderr:\n{stderr}")
logger.error(f"Failed to run: '{cmd}")
logger.error(f"stdout:\n{stdout_indented}")
logger.error(f"stderr:\n{stderr_indented}")
else:
logger.extra_debug(f"stdout:\n{stdout}")
logger.extra_debug(f"stderr:\n{stderr}")
logger.extra_debug(f"stdout:\n{stdout_indented}") # type: ignore
logger.extra_debug(f"stderr:\n{stderr_indented}") # type: ignore
return proc.returncode, stdout, stderr
class VideoQueue(asyncio.Queue):
"""A queue that limits the number of bytes it can store rather than discrete entries"""
"""A queue that limits the number of bytes it can store rather than discrete entries."""
def __init__(self, *args, **kwargs):
"""Init."""
super().__init__(*args, **kwargs)
self._bytes_sum = 0
@@ -130,11 +389,11 @@ class VideoQueue(asyncio.Queue):
self._bytes_sum -= len(data[1])
return data
def _put(self, item: bytes):
self._queue.append(item)
def _put(self, item: tuple[Event, bytes]):
self._queue.append(item) # type: ignore
self._bytes_sum += len(item[1])
def full(self, item: bytes = None):
def full(self, item: tuple[Event, bytes] = None):
"""Return True if there are maxsize bytes in the queue.
optionally if `item` is provided, it will return False if there is enough space to
@@ -143,30 +402,36 @@ class VideoQueue(asyncio.Queue):
Note: if the Queue was initialized with maxsize=0 (the default),
then full() is never True.
"""
if self._maxsize <= 0:
if self._maxsize <= 0: # type: ignore
return False
else:
if item is None:
return self.qsize() >= self._maxsize
return self.qsize() >= self._maxsize # type: ignore
else:
return self.qsize() + len(item[1]) >= self._maxsize
return self.qsize() + len(item[1]) >= self._maxsize # type: ignore
async def put(self, item: bytes):
async def put(self, item: tuple[Event, bytes]):
"""Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free
slot is available before adding item.
"""
if len(item[1]) > self._maxsize: # type: ignore
raise ValueError(
f"Item is larger ({human_readable_size(len(item[1]))}) "
f"than the size of the buffer ({human_readable_size(self._maxsize)})" # type: ignore
)
while self.full(item):
putter = self._loop.create_future()
self._putters.append(putter)
putter = self._loop.create_future() # type: ignore
self._putters.append(putter) # type: ignore
try:
await putter
except:
except: # noqa: E722
putter.cancel() # Just in case putter is not done yet.
try:
# Clean self._putters from canceled putters.
self._putters.remove(putter)
self._putters.remove(putter) # type: ignore
except ValueError:
# The putter could be removed from self._putters by a
# previous get_nowait call.
@@ -174,11 +439,11 @@ class VideoQueue(asyncio.Queue):
if not self.full(item) and not putter.cancelled():
# We were woken up by get_nowait(), but can't take
# the call. Wake up the next in line.
self._wakeup_next(self._putters)
self._wakeup_next(self._putters) # type: ignore
raise
return self.put_nowait(item)
def put_nowait(self, item: bytes):
def put_nowait(self, item: tuple[Event, bytes]):
"""Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
@@ -186,6 +451,12 @@ class VideoQueue(asyncio.Queue):
if self.full(item):
raise asyncio.QueueFull
self._put(item)
self._unfinished_tasks += 1
self._finished.clear()
self._wakeup_next(self._getters)
self._unfinished_tasks += 1 # type: ignore
self._finished.clear() # type: ignore
self._wakeup_next(self._getters) # type: ignore
async def wait_until(dt):
"""Sleep until the specified datetime."""
now = datetime.now()
await asyncio.sleep((dt - now).total_seconds())