mirror of
https://github.com/ep1cman/unifi-protect-backup.git
synced 2025-12-05 23:53:30 +00:00
Compare commits
73 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
df4390688b | ||
|
|
3acfd1f543 | ||
|
|
49c11c1872 | ||
|
|
93cf297371 | ||
|
|
8baa413a23 | ||
|
|
471ecb0662 | ||
|
|
031d4e4862 | ||
|
|
f109ec2a48 | ||
|
|
6a8bb39b63 | ||
|
|
49ddb081a8 | ||
|
|
941c92142f | ||
|
|
150d8e6f49 | ||
|
|
5ae43f08af | ||
|
|
0a36102eed | ||
|
|
92be1cea5d | ||
|
|
1813bc0176 | ||
|
|
9451fb4235 | ||
|
|
6fe18a193b | ||
|
|
f3a8bf6957 | ||
|
|
cb93ec7c6e | ||
|
|
f82e6064e7 | ||
|
|
6aac1aadab | ||
|
|
13b11359fa | ||
|
|
540ad6e9f6 | ||
|
|
912433e640 | ||
|
|
f4a0c2bdcd | ||
|
|
f2c9ee5c76 | ||
|
|
53ab3dc432 | ||
|
|
381f90f497 | ||
|
|
af8ca90356 | ||
|
|
189450e590 | ||
|
|
3f55fa5fdb | ||
|
|
52e72a7425 | ||
|
|
003e6eb990 | ||
|
|
8bebeceaa6 | ||
|
|
e2eb7858da | ||
|
|
453fed6c57 | ||
|
|
ae323e68aa | ||
|
|
4eec2fdde0 | ||
|
|
d31b9bffc6 | ||
|
|
0a4a2401be | ||
|
|
3c3c47b3b4 | ||
|
|
51e2446e44 | ||
|
|
5f8ae03d7a | ||
|
|
92bb362f2b | ||
|
|
401031dc2f | ||
|
|
24e508bf69 | ||
|
|
71c86714c1 | ||
|
|
7ee34c1c6a | ||
|
|
5bd4a35d5d | ||
|
|
298f500811 | ||
|
|
0125b6d21a | ||
|
|
04694712d8 | ||
|
|
e3ed8ef303 | ||
|
|
43dd561d81 | ||
|
|
ad6b4dc632 | ||
|
|
a268ad652a | ||
|
|
2b46b5bd4a | ||
|
|
9e164de686 | ||
|
|
78e7b8fbb0 | ||
|
|
76a0591beb | ||
|
|
15e0ae5f4d | ||
|
|
c9634ba10a | ||
|
|
e3fbb1be10 | ||
|
|
47c9338fe5 | ||
|
|
48042aee04 | ||
|
|
e56a38b73f | ||
|
|
3e53d43f95 | ||
|
|
90e50fd982 | ||
|
|
0a2c0aa326 | ||
|
|
9f6ec7628c | ||
|
|
091b38b038 | ||
|
|
5e1803c06c |
@@ -1,5 +1,5 @@
|
||||
[bumpversion]
|
||||
current_version = 0.2.0
|
||||
current_version = 0.8.0
|
||||
commit = True
|
||||
tag = True
|
||||
|
||||
|
||||
2
.github/ISSUE_TEMPLATE.md
vendored
2
.github/ISSUE_TEMPLATE.md
vendored
@@ -1,6 +1,8 @@
|
||||
* Unifi Protect Backup version:
|
||||
* Unifi Protect version:
|
||||
* Python version:
|
||||
* Operating System:
|
||||
* Are you using a docker container or native?:
|
||||
|
||||
### Description
|
||||
|
||||
|
||||
48
.github/workflows/dev.yml
vendored
48
.github/workflows/dev.yml
vendored
@@ -2,16 +2,11 @@
|
||||
|
||||
name: dev workflow
|
||||
|
||||
env:
|
||||
IMAGE_NAME: ${{ github.repository }}
|
||||
|
||||
# Controls when the action will run.
|
||||
on:
|
||||
# Triggers the workflow on push or pull request events but only for the master branch
|
||||
# Triggers the workflow on push events but only for the dev branch
|
||||
push:
|
||||
branches: [ master, main, dev ]
|
||||
pull_request:
|
||||
branches: [ master, main, dev ]
|
||||
branches: [ dev ]
|
||||
|
||||
# Allows you to run this workflow manually from the Actions tab
|
||||
workflow_dispatch:
|
||||
@@ -55,9 +50,7 @@ jobs:
|
||||
dev_container:
|
||||
name: Create dev container
|
||||
runs-on: ubuntu-20.04
|
||||
strategy:
|
||||
matrix:
|
||||
python-versions: [3.9]
|
||||
if: github.event_name != 'pull_request'
|
||||
|
||||
# Steps represent a sequence of tasks that will be executed as part of the job
|
||||
steps:
|
||||
@@ -66,7 +59,7 @@ jobs:
|
||||
|
||||
- uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: ${{ matrix.python-versions }}
|
||||
python-version: 3.9
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
@@ -77,20 +70,23 @@ jobs:
|
||||
run: >-
|
||||
poetry build
|
||||
|
||||
- name: build container
|
||||
id: docker_build
|
||||
run: docker build . --file Dockerfile --tag $IMAGE_NAME --label "runnumber=${GITHUB_RUN_ID}"
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v1
|
||||
|
||||
- name: log in to container registry
|
||||
run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u ${{ github.actor }} --password-stdin
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v1
|
||||
|
||||
- name: Log in to container registry
|
||||
uses: docker/login-action@v2
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: push container image
|
||||
run: |
|
||||
IMAGE_ID=ghcr.io/$IMAGE_NAME
|
||||
|
||||
# Change all uppercase to lowercase
|
||||
IMAGE_ID=$(echo $IMAGE_ID | tr '[A-Z]' '[a-z]')
|
||||
echo IMAGE_ID=$IMAGE_ID
|
||||
echo VERSION=$VERSION
|
||||
docker tag $IMAGE_NAME $IMAGE_ID:dev
|
||||
docker push $IMAGE_ID:dev
|
||||
- name: Build and push dev
|
||||
uses: docker/build-push-action@v2
|
||||
with:
|
||||
context: .
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: true
|
||||
tags: ghcr.io/${{ github.repository }}:dev
|
||||
|
||||
50
.github/workflows/preview.yml
vendored
50
.github/workflows/preview.yml
vendored
@@ -1,50 +0,0 @@
|
||||
# This is a basic workflow to help you get started with Actions
|
||||
|
||||
name: stage & preview workflow
|
||||
|
||||
# Controls when the action will run.
|
||||
on:
|
||||
# Triggers the workflow on push or pull request events but only for the master branch
|
||||
push:
|
||||
branches: [ master, main ]
|
||||
|
||||
# Allows you to run this workflow manually from the Actions tab
|
||||
workflow_dispatch:
|
||||
|
||||
# A workflow run is made up of one or more jobs that can run sequentially or in parallel
|
||||
jobs:
|
||||
publish_dev_build:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
python-versions: [ 3.9 ]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: ${{ matrix.python-versions }}
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install poetry tox tox-gh-actions
|
||||
|
||||
- name: test with tox
|
||||
run:
|
||||
tox
|
||||
|
||||
- name: Build wheels and source tarball
|
||||
run: |
|
||||
poetry version $(poetry version --short)-dev.$GITHUB_RUN_NUMBER
|
||||
poetry version --short
|
||||
poetry build
|
||||
|
||||
- name: publish to Test PyPI
|
||||
uses: pypa/gh-action-pypi-publish@master
|
||||
with:
|
||||
user: __token__
|
||||
password: ${{ secrets.TEST_PYPI_API_TOKEN}}
|
||||
repository_url: https://test.pypi.org/legacy/
|
||||
skip_existing: true
|
||||
55
.github/workflows/release.yml
vendored
55
.github/workflows/release.yml
vendored
@@ -12,9 +12,6 @@ on:
|
||||
# Allows you to run this workflow manually from the Actions tab
|
||||
workflow_dispatch:
|
||||
|
||||
env:
|
||||
IMAGE_NAME: ${{ github.repository }}
|
||||
|
||||
# A workflow run is made up of one or more jobs that can run sequentially or in parallel
|
||||
jobs:
|
||||
# This workflow contains a single job called "release"
|
||||
@@ -22,10 +19,6 @@ jobs:
|
||||
name: Create Release
|
||||
runs-on: ubuntu-20.04
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
python-versions: [3.9]
|
||||
|
||||
# Steps represent a sequence of tasks that will be executed as part of the job
|
||||
steps:
|
||||
- name: Get version from tag
|
||||
@@ -46,7 +39,7 @@ jobs:
|
||||
|
||||
- uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: ${{ matrix.python-versions }}
|
||||
python-version: 3.9
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
@@ -59,14 +52,28 @@ jobs:
|
||||
|
||||
- name: show temporary files
|
||||
run: >-
|
||||
ls -l
|
||||
ls -lR
|
||||
|
||||
- name: build container
|
||||
id: docker_build
|
||||
run: docker build . --file Dockerfile --tag $IMAGE_NAME --label "runnumber=${GITHUB_RUN_ID}"
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v1
|
||||
|
||||
- name: log in to container registry
|
||||
run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u ${{ github.actor }} --password-stdin
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v1
|
||||
|
||||
- name: Log in to container registry
|
||||
uses: docker/login-action@v2
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Build and push dev
|
||||
uses: docker/build-push-action@v2
|
||||
with:
|
||||
context: .
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: true
|
||||
tags: ghcr.io/${{ github.repository }}:${{ steps.tag_name.outputs.current_version }}, ghcr.io/${{ github.repository }}:latest
|
||||
|
||||
- name: create github release
|
||||
id: create_release
|
||||
@@ -79,26 +86,6 @@ jobs:
|
||||
draft: false
|
||||
prerelease: false
|
||||
|
||||
- name: push container image
|
||||
run: |
|
||||
IMAGE_ID=ghcr.io/$IMAGE_NAME
|
||||
|
||||
# Change all uppercase to lowercase
|
||||
IMAGE_ID=$(echo $IMAGE_ID | tr '[A-Z]' '[a-z]')
|
||||
# Strip git ref prefix from version
|
||||
VERSION=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,')
|
||||
# Strip "v" prefix from tag name
|
||||
[[ "${{ github.ref }}" == "refs/tags/"* ]] && VERSION=$(echo $VERSION | sed -e 's/^v//')
|
||||
# Use Docker `latest` tag convention
|
||||
[ "$VERSION" == "master" ] && VERSION=latest
|
||||
echo IMAGE_ID=$IMAGE_ID
|
||||
echo VERSION=$VERSION
|
||||
docker tag $IMAGE_NAME $IMAGE_ID:$VERSION
|
||||
docker tag $IMAGE_NAME $IMAGE_ID:latest
|
||||
docker push $IMAGE_ID:$VERSION
|
||||
docker push $IMAGE_ID:latest
|
||||
|
||||
|
||||
- name: publish to PyPI
|
||||
uses: pypa/gh-action-pypi-publish@release/v1
|
||||
with:
|
||||
|
||||
5
.gitignore
vendored
5
.gitignore
vendored
@@ -113,5 +113,10 @@ ENV/
|
||||
# mkdocs build dir
|
||||
site/
|
||||
|
||||
# Docker mounted volumes
|
||||
config/
|
||||
data/
|
||||
|
||||
.envrc
|
||||
clips/
|
||||
*.sqlite
|
||||
95
CHANGELOG.md
95
CHANGELOG.md
@@ -4,6 +4,101 @@ All notable changes to this project will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [0.8.0] - 2022-12-03
|
||||
Major internal refactoring. Each task is now its own class and asyncio task.
|
||||
|
||||
### Added
|
||||
- A database of backed up events and where they are stored
|
||||
- A periodic check for missed events
|
||||
- This will also ensure past events before the tool was used are backed up, up until the retention period
|
||||
|
||||
### Fixed
|
||||
- Pruning is no longer done based on file timestamps, the database is used instead. The tool will no longer delete files it didn't create.
|
||||
- Pruning now runs much more frequently (every minute) so retention periods of less than a day are now possible.
|
||||
|
||||
|
||||
## [0.7.4] - 2022-08-21
|
||||
No functional changes in this version. This is just to trigger the release CI.
|
||||
### Added
|
||||
- Arm docker container
|
||||
- rclone debugging instructions when using docker
|
||||
|
||||
### Fixed
|
||||
- Documentation error in rclone config path of docker container.
|
||||
|
||||
## [0.7.3] - 2022-07-31
|
||||
### Fixed
|
||||
- Updated to the 4.0.0 version of pyunifiprotect
|
||||
- Added rust to the container, and bumped it to alpine 3.16
|
||||
|
||||
## [0.7.2] - 2022-07-17
|
||||
### Fixed
|
||||
- Updated to the latest version of pyunifiprotect to fix issues introduced in unifi protect 2.1.1
|
||||
|
||||
## [0.7.1] - 2022-06-08
|
||||
### Fixed
|
||||
- Updated to the latest version of pyunifiprotect to fix issues introduced in unifi protect 2.0.1
|
||||
- Updated documentation to include how to set up local user accounts on unifi protect
|
||||
|
||||
## [0.7.0] - 2022-03-26
|
||||
### Added
|
||||
- Added a the ability to change the way the clip files are structured via a template string.
|
||||
### Fixed
|
||||
- Fixed issue where event types without clips would attempt (and fail1) to download clips
|
||||
- Drastically reduced the size of the docker container
|
||||
- Fixed typos in the documentation
|
||||
- Some dev dependencies are now not installed as default
|
||||
|
||||
## [0.6.0] - 2022-03-18
|
||||
### Added
|
||||
- Support for doorbell ring events
|
||||
- `detection_types` parameter to limit which kinds of events are backed up
|
||||
### Fixed
|
||||
- Actually fixed timestamps this time.
|
||||
|
||||
## [0.5.3] - 2022-03-11
|
||||
### Fixed
|
||||
- Timestamps in filenames and logging now show time in the timezone of the NVR not UTC
|
||||
|
||||
## [0.5.2] - 2022-03-10
|
||||
### Fixed
|
||||
- rclone delete command now works as expected on windows when spaces are in the file path
|
||||
- Dockerfile now allows setting of user and group to run as, as well as a default config
|
||||
|
||||
## [0.5.1] - 2022-03-07
|
||||
### Fixed
|
||||
- rclone command now works as expected on windows when spaces are in the file path
|
||||
|
||||
## [0.5.0] - 2022-03-06
|
||||
### Added
|
||||
- If `ffprobe` is available, the downloaded clips length is checked and logged
|
||||
### Fixed
|
||||
- A time delay has been added before downloading clips to try to resolve an issue where
|
||||
downloaded clips were too short
|
||||
|
||||
## [0.4.0] - 2022-03-05
|
||||
### Added
|
||||
- A `--version` command line option to show the tools version
|
||||
### Fixed
|
||||
- Websocket checks are no longer logged in verbosity level 1 to reduce log spam
|
||||
|
||||
## [0.3.1] - 2022-02-24
|
||||
### Fixed
|
||||
- Now checks if the websocket connection is alive, and attempts to reconnect if it isn't.
|
||||
|
||||
## [0.3.0] - 2022-02-22
|
||||
### Added
|
||||
- New CLI argument for passing CLI arguments directly to `rclone`.
|
||||
|
||||
### Fixed
|
||||
- A new camera getting added while running no longer crashes the application.
|
||||
- A timeout during download now correctly retries the download instead of
|
||||
abandoning the event.
|
||||
|
||||
## [0.2.1] - 2022-02-21
|
||||
### Fixed
|
||||
- Retry logging formatting
|
||||
|
||||
## [0.2.0] - 2022-02-21
|
||||
### Added
|
||||
- Ability to ignore cameras
|
||||
|
||||
@@ -59,7 +59,8 @@ Ready to contribute? Here's how to set up `unifi-protect-backup` for local devel
|
||||
4. Install dependencies and start your virtualenv:
|
||||
|
||||
```
|
||||
$ poetry install -E test -E doc -E dev
|
||||
$ poetry install -E test -E dev
|
||||
$ poetry shell
|
||||
```
|
||||
|
||||
5. Create a branch for local development:
|
||||
@@ -70,14 +71,21 @@ Ready to contribute? Here's how to set up `unifi-protect-backup` for local devel
|
||||
|
||||
Now you can make your changes locally.
|
||||
|
||||
6. When you're done making changes, check that your changes pass the
|
||||
6. To run `unifi-protect-backup` while developing you will need to either
|
||||
be inside the `poetry shell` virtualenv or run it via poetry:
|
||||
|
||||
```
|
||||
$ poetry run unifi-protect-backup {args}
|
||||
```
|
||||
|
||||
7. When you're done making changes, check that your changes pass the
|
||||
tests, including testing other Python versions, with tox:
|
||||
|
||||
```
|
||||
$ poetry run tox
|
||||
```
|
||||
|
||||
7. Commit your changes and push your branch to GitHub:
|
||||
8. Commit your changes and push your branch to GitHub:
|
||||
|
||||
```
|
||||
$ git add .
|
||||
@@ -85,7 +93,7 @@ Ready to contribute? Here's how to set up `unifi-protect-backup` for local devel
|
||||
$ git push origin name-of-your-bugfix-or-feature
|
||||
```
|
||||
|
||||
8. Submit a pull request through the GitHub website.
|
||||
9. Submit a pull request through the GitHub website.
|
||||
|
||||
## Pull Request Guidelines
|
||||
|
||||
@@ -93,8 +101,8 @@ Before you submit a pull request, check that it meets these guidelines:
|
||||
|
||||
1. The pull request should include tests.
|
||||
2. If the pull request adds functionality, the docs should be updated. Put
|
||||
your new functionality into a function with a docstring, and add the
|
||||
feature to the list in README.md.
|
||||
your new functionality into a function with a docstring. If adding a CLI
|
||||
option, you should update the "usage" in README.md.
|
||||
3. The pull request should work for Python 3.9. Check
|
||||
https://github.com/ep1cman/unifi-protect-backup/actions
|
||||
and make sure that the tests pass for all supported Python versions.
|
||||
@@ -120,4 +128,5 @@ $ git push
|
||||
$ git push --tags
|
||||
```
|
||||
|
||||
GitHub Actions will then deploy to PyPI if tests pass.
|
||||
GitHub Actions will then deploy to PyPI, produce a GitHub release, and a container
|
||||
build if tests pass.
|
||||
|
||||
43
Dockerfile
43
Dockerfile
@@ -1,24 +1,53 @@
|
||||
# To build run:
|
||||
# $ poetry build
|
||||
# $ docker build -t ghcr.io/ep1cman/unifi-protect-backup .
|
||||
FROM python:3.9-alpine
|
||||
|
||||
FROM ghcr.io/linuxserver/baseimage-alpine:3.16
|
||||
|
||||
LABEL maintainer="ep1cman"
|
||||
|
||||
WORKDIR /app
|
||||
RUN apk add gcc musl-dev zlib-dev jpeg-dev rclone
|
||||
COPY dist/unifi-protect-backup-0.2.0.tar.gz sdist.tar.gz
|
||||
RUN pip install sdist.tar.gz
|
||||
|
||||
COPY dist/unifi_protect_backup-0.8.0.tar.gz sdist.tar.gz
|
||||
|
||||
RUN \
|
||||
echo "**** install build packages ****" && \
|
||||
apk add --no-cache --virtual=build-dependencies \
|
||||
gcc \
|
||||
musl-dev \
|
||||
jpeg-dev \
|
||||
zlib-dev \
|
||||
python3-dev \
|
||||
cargo && \
|
||||
echo "**** install packages ****" && \
|
||||
apk add --no-cache \
|
||||
rclone \
|
||||
ffmpeg \
|
||||
py3-pip \
|
||||
python3 && \
|
||||
echo "**** install unifi-protect-backup ****" && \
|
||||
pip install --no-cache-dir sdist.tar.gz && \
|
||||
echo "**** cleanup ****" && \
|
||||
apk del --purge \
|
||||
build-dependencies && \
|
||||
rm -rf \
|
||||
/tmp/* \
|
||||
/app/sdist.tar.gz
|
||||
|
||||
# Settings
|
||||
ENV UFP_USERNAME=unifi_protect_user
|
||||
ENV UFP_PASSWORD=unifi_protect_password
|
||||
ENV UFP_ADDRESS=127.0.0.1
|
||||
ENV UFP_PORT=443
|
||||
ENV UFP_SSL_VERIFY=true
|
||||
ENV RCLONE_RETENTION=7d
|
||||
ENV RCLONE_DESTINATION=my_remote:/unifi_protect_backup
|
||||
ENV RCLONE_DESTINATION=local:/data
|
||||
ENV VERBOSITY="v"
|
||||
ENV TZ=UTC
|
||||
ENV IGNORE_CAMERAS=""
|
||||
ENV SQLITE_PATH=/config/database/events.sqlite
|
||||
|
||||
VOLUME [ "/root/.config/rclone/" ]
|
||||
COPY docker_root/ /
|
||||
|
||||
CMD ["sh", "-c", "unifi-protect-backup -${VERBOSITY}"]
|
||||
VOLUME [ "/config" ]
|
||||
VOLUME [ "/data" ]
|
||||
|
||||
137
README.md
137
README.md
@@ -23,12 +23,13 @@ retention period.
|
||||
## Features
|
||||
|
||||
- Listens to events in real-time via the Unifi Protect websocket API
|
||||
- Ensures any previous and/or missed events within the retention period are also backed up
|
||||
- Supports uploading to a [wide range of storage systems using `rclone`](https://rclone.org/overview/)
|
||||
- Performs nightly pruning of old clips
|
||||
- Automatic pruning of old clips
|
||||
|
||||
## Requirements
|
||||
- Python 3.9+
|
||||
- Unifi Protect version 1.20 or higher (as per [`pyunifiproect`](https://github.com/briis/pyunifiprotect))
|
||||
- Unifi Protect version 1.20 or higher (as per [`pyunifiprotect`](https://github.com/briis/pyunifiprotect))
|
||||
- `rclone` installed with at least one remote configured.
|
||||
|
||||
## Installation
|
||||
@@ -36,26 +37,41 @@ retention period.
|
||||
1. Install `rclone`. Instructions for your platform can be found here: https://rclone.org/install/#quickstart
|
||||
2. Configure the `rclone` remote you want to backup to. Instructions can be found here: https://rclone.org/docs/#configure
|
||||
3. `pip install unifi-protect-backup`
|
||||
4. Optional: Install `ffprobe` so that `unifi-protect-backup` can check the length of the clips it downloads
|
||||
|
||||
|
||||
### Account Setup
|
||||
In order to connect to your unifi protect instance, you will first need to setup a local admin account:
|
||||
|
||||
* Login to your *Local Portal* on your UniFiOS device, and click on *Users*
|
||||
* Open the `Roles` tab and click `Add Role` in the top right.
|
||||
* Give the role a name like `unifi protect backup` and give it `Full Management` permissions for the unifi protect app.
|
||||
* Now switch to the `User` tab and click `Add User` in the top right, and fill out the form. Specific Fields to pay attention to:
|
||||
* Role: Must be the role created in the last step
|
||||
* Account Type: *Local Access Only*
|
||||
* Click *Add* in at the bottom Right.
|
||||
* Select the newly created user in the list, and navigate to the `Assignments` tab in the left-hand pane, and ensure all cameras are ticked.
|
||||
|
||||
|
||||
## Usage
|
||||
|
||||
:warning: **Potential Data Loss**: Be very careful when setting the `rclone-destination`, at midnight every day it will
|
||||
delete any files older than `retention`. It is best to give `unifi-protect-backup` its own directory.
|
||||
|
||||
```
|
||||
Usage: unifi-protect-backup [OPTIONS]
|
||||
|
||||
A Python based tool for backing up Unifi Protect event clips as they occur.
|
||||
|
||||
Options:
|
||||
--version Show the version and exit.
|
||||
--address TEXT Address of Unifi Protect instance
|
||||
[required]
|
||||
--port INTEGER Port of Unifi Protect instance
|
||||
--port INTEGER Port of Unifi Protect instance [default:
|
||||
443]
|
||||
--username TEXT Username to login to Unifi Protect instance
|
||||
[required]
|
||||
--password TEXT Password for Unifi Protect user [required]
|
||||
--verify-ssl / --no-verify-ssl Set if you do not have a valid HTTPS
|
||||
Certificate for your instance
|
||||
Certificate for your instance [default:
|
||||
verify-ssl]
|
||||
--rclone-destination TEXT `rclone` destination path in the format
|
||||
{rclone remote}:{path on remote}. E.g.
|
||||
`gdrive:/backups/unifi_protect` [required]
|
||||
@@ -64,36 +80,51 @@ Options:
|
||||
of `rclone`
|
||||
(https://rclone.org/filtering/#max-age-don-
|
||||
t-transfer-any-file-older-than-this)
|
||||
[default: 7d]
|
||||
--rclone-args TEXT Optional extra arguments to pass to `rclone
|
||||
rcat` directly. Common usage for this would
|
||||
be to set a bandwidth limit, for example.
|
||||
--detection-types TEXT A comma separated list of which types of
|
||||
detections to backup. Valid options are:
|
||||
`motion`, `person`, `vehicle`, `ring`
|
||||
[default: motion,person,vehicle,ring]
|
||||
--ignore-camera TEXT IDs of cameras for which events should not
|
||||
be backed up. Use multiple times to ignore
|
||||
multiple IDs. If being set as an environment
|
||||
variable the IDs should be separated by
|
||||
whitespace.
|
||||
--file-structure-format TEXT A Python format string used to generate the
|
||||
file structure/name on the rclone remote.For
|
||||
details of the fields available, see the
|
||||
projects `README.md` file. [default: {camer
|
||||
a_name}/{event.start:%Y-%m-%d}/{event.end:%Y
|
||||
-%m-%dT%H-%M-%S} {detection_type}.mp4]
|
||||
-v, --verbose How verbose the logging output should be.
|
||||
|
||||
|
||||
None: Only log info messages created by
|
||||
`unifi-protect-backup`, and all warnings
|
||||
|
||||
|
||||
-v: Only log info & debug messages
|
||||
created by `unifi-protect-backup`, and
|
||||
all warnings
|
||||
|
||||
|
||||
-vv: Log info & debug messages created
|
||||
by `unifi-protect-backup`, command
|
||||
output, and all warnings
|
||||
|
||||
|
||||
-vvv Log debug messages created by
|
||||
`unifi-protect-backup`, command output,
|
||||
all info messages, and all warnings
|
||||
|
||||
|
||||
-vvvv: Log debug messages created by
|
||||
`unifi-protect-backup` command output,
|
||||
all info messages, all warnings, and
|
||||
websocket data
|
||||
|
||||
|
||||
-vvvvv: Log websocket data, command
|
||||
output, all debug messages, all info
|
||||
messages and all warnings [x>=0]
|
||||
--sqlite_path TEXT Path to the SQLite database to use/create
|
||||
--help Show this message and exit.
|
||||
```
|
||||
|
||||
@@ -106,30 +137,100 @@ always take priority over environment variables):
|
||||
- `UFP_SSL_VERIFY`
|
||||
- `RCLONE_RETENTION`
|
||||
- `RCLONE_DESTINATION`
|
||||
- `RCLONE_ARGS`
|
||||
- `IGNORE_CAMERAS`
|
||||
- `DETECTION_TYPES`
|
||||
- `FILE_STRUCTURE_FORMAT`
|
||||
- `SQLITE_PATH`
|
||||
|
||||
## File path formatting
|
||||
|
||||
By default, the application will save clips in the following structure on the provided rclone remote:
|
||||
```
|
||||
{camera_name}/{event.start:%Y-%m-%d}/{event.end:%Y-%m-%dT%H-%M-%S} {detection_type}.mp4
|
||||
```
|
||||
If you wish for the clips to be structured differently you can do this using the `--file-structure-format`
|
||||
option. It uses standard [python format string syntax](https://docs.python.org/3/library/string.html#formatstrings).
|
||||
|
||||
The following fields are provided to the format string:
|
||||
- *event:* The `Event` object as per https://github.com/briis/pyunifiprotect/blob/master/pyunifiprotect/data/nvr.py
|
||||
- *duration_seconds:* The duration of the event in seconds
|
||||
- *detection_type:* A nicely formatted list of the event detection type and the smart detection types (if any)
|
||||
- *camera_name:* The name of the camera that generated this event
|
||||
|
||||
You can optionally format the `event.start`/`event.end` timestamps as per the [`strftime` format](https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior) by appending it after a `:` e.g to get just the date without the time: `{event.start:%Y-%m-%d}`
|
||||
|
||||
|
||||
## Docker Container
|
||||
You can run this tool as a container if you prefer with the following command.
|
||||
Remember to change the variable to make your setup.
|
||||
|
||||
|
||||
### Backing up locally
|
||||
By default, if no rclone config is provided clips will be backed up to `/data`.
|
||||
|
||||
```
|
||||
docker run \
|
||||
-e UFP_USERNAME='USERNAME' \
|
||||
-e UFP_PASSWORD='PASSWORD' \
|
||||
-e UFP_ADDRESS='UNIFI_PROTECT_IP' \
|
||||
-e UFP_SSL_VERIFY='false' \
|
||||
-e RCLONE_DESTINATION='my_remote:/unifi_protect_backup' \
|
||||
-v '/path/to/rclone.conf':'/root/.config/rclone/rclone.conf' \
|
||||
-v '/path/to/save/clips':'/data' \
|
||||
-v '/path/to/save/database':/config/database/ \
|
||||
ghcr.io/ep1cman/unifi-protect-backup
|
||||
```
|
||||
|
||||
### Backing up to cloud storage
|
||||
In order to backup to cloud storage you need to provide a `rclone.conf` file.
|
||||
|
||||
If you do not already have a `rclone.conf` file you can create one as follows:
|
||||
```
|
||||
$ docker run -it --rm -v $PWD:/root/.config/rclone/ ghcr.io/ep1cman/unifi-protect-backup rclone config
|
||||
$ docker run -it --rm -v $PWD:/root/.config/rclone --entrypoint rclone ghcr.io/ep1cman/unifi-protect-backup config
|
||||
```
|
||||
Follow the interactive configuration proceed, this will create a `rclone.conf`
|
||||
file in your current directory.
|
||||
|
||||
Finally, start the container:
|
||||
```
|
||||
docker run \
|
||||
-e UFP_USERNAME='USERNAME' \
|
||||
-e UFP_PASSWORD='PASSWORD' \
|
||||
-e UFP_ADDRESS='UNIFI_PROTECT_IP' \
|
||||
-e UFP_SSL_VERIFY='false' \
|
||||
-e RCLONE_DESTINATION='my_remote:/unifi_protect_backup' \
|
||||
-v '/path/to/save/clips':'/data' \
|
||||
-v `/path/to/rclone.conf':'/config/rclone/rclone.conf' \
|
||||
-v '/path/to/save/database':/config/database/ \
|
||||
ghcr.io/ep1cman/unifi-protect-backup
|
||||
```
|
||||
|
||||
### Debugging
|
||||
|
||||
If you need to debug your rclone setup, you can invoke rclone directly like so:
|
||||
|
||||
```
|
||||
docker run \
|
||||
--rm \
|
||||
-v /path/to/rclone.conf:/config/rclone/rclone.conf \
|
||||
-e RCLONE_CONFIG='/config/rclone/rclone.conf' \
|
||||
--entrypoint rclone \
|
||||
ghcr.io/ep1cman/unifi-protect-backup \
|
||||
{rclone subcommand as per: https://rclone.org/docs/#subcommands}
|
||||
```
|
||||
|
||||
For example to check that your config file is being read properly and list the configured remotes:
|
||||
```
|
||||
docker run \
|
||||
--rm \
|
||||
-v /path/to/rclone.conf:/config/rclone/rclone.conf \
|
||||
-e RCLONE_CONFIG='/config/rclone/rclone.conf' \
|
||||
--entrypoint rclone \
|
||||
ghcr.io/ep1cman/unifi-protect-backup \
|
||||
listremotes
|
||||
```
|
||||
This will create a `rclone.conf` file in your current directory
|
||||
|
||||
## Credits
|
||||
|
||||
- Heavily utilises [`pyunifiproect`](https://github.com/briis/pyunifiprotect) by [@briis](https://github.com/briis/)
|
||||
- Heavily utilises [`pyunifiprotect`](https://github.com/briis/pyunifiprotect) by [@briis](https://github.com/briis/)
|
||||
- All the cloud functionality is provided by [`rclone`](https://rclone.org/)
|
||||
- This package was created with [Cookiecutter](https://github.com/audreyr/cookiecutter) and the [waynerv/cookiecutter-pypackage](https://github.com/waynerv/cookiecutter-pypackage) project template.
|
||||
|
||||
2
docker_root/defaults/rclone.conf
Normal file
2
docker_root/defaults/rclone.conf
Normal file
@@ -0,0 +1,2 @@
|
||||
[local]
|
||||
type = local
|
||||
23
docker_root/etc/cont-init.d/30-config
Normal file
23
docker_root/etc/cont-init.d/30-config
Normal file
@@ -0,0 +1,23 @@
|
||||
#!/usr/bin/with-contenv bash
|
||||
|
||||
mkdir -p /config/rclone
|
||||
|
||||
# For backwards compatibility
|
||||
[[ -f "/root/.config/rclone/rclone.conf" ]] && \
|
||||
echo "DEPRECATED: Copying rclone conf from /root/.config/rclone/rclone.conf, please change your mount to /config/rclone.conf"
|
||||
cp \
|
||||
/root/.config/rclone/rclone.conf \
|
||||
/config/rclone/rclone.conf
|
||||
|
||||
# default config file
|
||||
[[ ! -f "/config/rclone/rclone.conf" ]] && \
|
||||
mkdir -p /config/rclone && \
|
||||
cp \
|
||||
/defaults/rclone.conf \
|
||||
/config/rclone/rclone.conf
|
||||
|
||||
chown -R abc:abc \
|
||||
/config
|
||||
|
||||
chown -R abc:abc \
|
||||
/data
|
||||
6
docker_root/etc/services.d/unifi-protect-backup/run
Normal file
6
docker_root/etc/services.d/unifi-protect-backup/run
Normal file
@@ -0,0 +1,6 @@
|
||||
#!/usr/bin/with-contenv bash
|
||||
|
||||
|
||||
export RCLONE_CONFIG=/config/rclone/rclone.conf
|
||||
exec \
|
||||
s6-setuidgid abc unifi-protect-backup -${VERBOSITY}
|
||||
994
poetry.lock
generated
994
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,7 +1,7 @@
|
||||
[tool]
|
||||
[tool.poetry]
|
||||
name = "unifi-protect-backup"
|
||||
version = "0.2.0"
|
||||
version = "0.8.0"
|
||||
homepage = "https://github.com/ep1cman/unifi-protect-backup"
|
||||
description = "Python tool to backup unifi event clips in realtime."
|
||||
authors = ["sebastian.goscik <sebastian@goscik.com>"]
|
||||
@@ -24,7 +24,7 @@ packages = [
|
||||
python = ">=3.9.0,<4.0"
|
||||
click = "8.0.1"
|
||||
|
||||
black = { version = "^21.5b2", optional = true}
|
||||
black = { version = "^22.10.0", optional = true}
|
||||
isort = { version = "^5.8.0", optional = true}
|
||||
flake8 = { version = "^3.9.2", optional = true}
|
||||
flake8-docstrings = { version = "^1.6.0", optional = true }
|
||||
@@ -39,8 +39,14 @@ pre-commit = {version = "^2.12.0", optional = true}
|
||||
toml = {version = "^0.10.2", optional = true}
|
||||
bump2version = {version = "^1.0.1", optional = true}
|
||||
tox-asdf = {version = "^0.1.0", optional = true}
|
||||
pyunifiprotect = "^3.2.1"
|
||||
aiocron = "^1.8"
|
||||
pyunifiprotect = "^4.0.11"
|
||||
ipdb = {version = "^0.13.9", optional = true}
|
||||
types-pytz = {version = "^2021.3.5", optional = true}
|
||||
types-cryptography = {version = "^3.3.18", optional = true}
|
||||
aiosqlite = "^0.17.0"
|
||||
python-dateutil = "^2.8.2"
|
||||
aiorun = "^2022.11.1"
|
||||
pylint = {version = "^2.15.6", extras = ["dev"]}
|
||||
|
||||
[tool.poetry.extras]
|
||||
test = [
|
||||
@@ -50,10 +56,12 @@ test = [
|
||||
"mypy",
|
||||
"flake8",
|
||||
"flake8-docstrings",
|
||||
"pytest-cov"
|
||||
"pytest-cov",
|
||||
"types-pytz",
|
||||
"types-cryptography"
|
||||
]
|
||||
|
||||
dev = ["tox", "pre-commit", "virtualenv", "pip", "twine", "toml", "bump2version", "tox-asdf"]
|
||||
dev = ["tox", "pre-commit", "virtualenv", "pip", "twine", "toml", "bump2version", "tox-asdf", "ipdb"]
|
||||
|
||||
[tool.poetry.scripts]
|
||||
unifi-protect-backup = 'unifi_protect_backup.cli:main'
|
||||
|
||||
@@ -2,6 +2,11 @@
|
||||
|
||||
__author__ = """sebastian.goscik"""
|
||||
__email__ = 'sebastian@goscik.com'
|
||||
__version__ = '0.2.0'
|
||||
__version__ = '0.8.0'
|
||||
|
||||
from .unifi_protect_backup import UnifiProtectBackup
|
||||
# from .unifi_protect_backup import UnifiProtectBackup
|
||||
from .downloader import VideoDownloader
|
||||
from .uploader import VideoUploader
|
||||
from .event_listener import EventListener
|
||||
from .purge import Purge
|
||||
from .missing_event_checker import MissingEventChecker
|
||||
|
||||
@@ -3,18 +3,36 @@
|
||||
import asyncio
|
||||
|
||||
import click
|
||||
from aiorun import run
|
||||
|
||||
from unifi_protect_backup import UnifiProtectBackup
|
||||
from unifi_protect_backup import __version__
|
||||
from unifi_protect_backup.unifi_protect_backup import UnifiProtectBackup
|
||||
|
||||
DETECTION_TYPES = ["motion", "person", "vehicle", "ring"]
|
||||
|
||||
|
||||
def _parse_detection_types(ctx, param, value):
|
||||
# split columns by ',' and remove whitespace
|
||||
types = [t.strip() for t in value.split(',')]
|
||||
|
||||
# validate passed columns
|
||||
for t in types:
|
||||
if t not in DETECTION_TYPES:
|
||||
raise click.BadOptionUsage("detection-types", f"`{t}` is not an available detection type.", ctx)
|
||||
|
||||
return types
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.version_option(__version__)
|
||||
@click.option('--address', required=True, envvar='UFP_ADDRESS', help='Address of Unifi Protect instance')
|
||||
@click.option('--port', default=443, envvar='UFP_PORT', help='Port of Unifi Protect instance')
|
||||
@click.option('--port', default=443, envvar='UFP_PORT', show_default=True, help='Port of Unifi Protect instance')
|
||||
@click.option('--username', required=True, envvar='UFP_USERNAME', help='Username to login to Unifi Protect instance')
|
||||
@click.option('--password', required=True, envvar='UFP_PASSWORD', help='Password for Unifi Protect user')
|
||||
@click.option(
|
||||
'--verify-ssl/--no-verify-ssl',
|
||||
default=True,
|
||||
show_default=True,
|
||||
envvar='UFP_SSL_VERIFY',
|
||||
help="Set if you do not have a valid HTTPS Certificate for your instance",
|
||||
)
|
||||
@@ -28,10 +46,27 @@ from unifi_protect_backup import UnifiProtectBackup
|
||||
@click.option(
|
||||
'--retention',
|
||||
default='7d',
|
||||
show_default=True,
|
||||
envvar='RCLONE_RETENTION',
|
||||
help="How long should event clips be backed up for. Format as per the `--max-age` argument of "
|
||||
"`rclone` (https://rclone.org/filtering/#max-age-don-t-transfer-any-file-older-than-this)",
|
||||
)
|
||||
@click.option(
|
||||
'--rclone-args',
|
||||
default='',
|
||||
envvar='RCLONE_ARGS',
|
||||
help="Optional extra arguments to pass to `rclone rcat` directly. Common usage for this would "
|
||||
"be to set a bandwidth limit, for example.",
|
||||
)
|
||||
@click.option(
|
||||
'--detection-types',
|
||||
envvar='DETECTION_TYPES',
|
||||
default=','.join(DETECTION_TYPES),
|
||||
show_default=True,
|
||||
help="A comma separated list of which types of detections to backup. "
|
||||
f"Valid options are: {', '.join([f'`{t}`' for t in DETECTION_TYPES])}",
|
||||
callback=_parse_detection_types,
|
||||
)
|
||||
@click.option(
|
||||
'--ignore-camera',
|
||||
'ignore_cameras',
|
||||
@@ -40,6 +75,14 @@ from unifi_protect_backup import UnifiProtectBackup
|
||||
help="IDs of cameras for which events should not be backed up. Use multiple times to ignore "
|
||||
"multiple IDs. If being set as an environment variable the IDs should be separated by whitespace.",
|
||||
)
|
||||
@click.option(
|
||||
'--file-structure-format',
|
||||
envvar='FILE_STRUCTURE_FORMAT',
|
||||
default="{camera_name}/{event.start:%Y-%m-%d}/{event.end:%Y-%m-%dT%H-%M-%S} {detection_type}.mp4",
|
||||
show_default=True,
|
||||
help="A Python format string used to generate the file structure/name on the rclone remote."
|
||||
"For details of the fields available, see the projects `README.md` file.",
|
||||
)
|
||||
@click.option(
|
||||
'-v',
|
||||
'--verbose',
|
||||
@@ -61,11 +104,17 @@ all warnings, and websocket data
|
||||
-vvvvv: Log websocket data, command output, all debug messages, all info messages and all warnings
|
||||
""",
|
||||
)
|
||||
@click.option(
|
||||
'--sqlite_path',
|
||||
default='events.sqlite',
|
||||
envvar='SQLITE_PATH',
|
||||
help="Path to the SQLite database to use/create",
|
||||
)
|
||||
def main(**kwargs):
|
||||
"""A Python based tool for backing up Unifi Protect event clips as they occur."""
|
||||
loop = asyncio.get_event_loop()
|
||||
event_listener = UnifiProtectBackup(**kwargs)
|
||||
loop.run_until_complete(event_listener.start())
|
||||
run(event_listener.start())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
142
unifi_protect_backup/downloader.py
Normal file
142
unifi_protect_backup/downloader.py
Normal file
@@ -0,0 +1,142 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import shutil
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import pytz
|
||||
from aiohttp.client_exceptions import ClientPayloadError
|
||||
from pyunifiprotect import ProtectApiClient
|
||||
from pyunifiprotect.data.nvr import Event
|
||||
from pyunifiprotect.data.types import EventType
|
||||
|
||||
from unifi_protect_backup.utils import SubprocessException, VideoQueue, get_camera_name, human_readable_size
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def get_video_length(video: bytes) -> float:
|
||||
"""Uses ffprobe to get the length of the video file passed in as a byte stream"""
|
||||
cmd = 'ffprobe -v quiet -show_streams -select_streams v:0 -of json -'
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
cmd,
|
||||
stdin=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await proc.communicate(video)
|
||||
if proc.returncode == 0:
|
||||
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
|
||||
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
|
||||
|
||||
json_data = json.loads(stdout.decode())
|
||||
return float(json_data['streams'][0]['duration'])
|
||||
|
||||
else:
|
||||
raise SubprocessException(stdout.decode(), stderr.decode(), proc.returncode)
|
||||
|
||||
|
||||
class VideoDownloader:
|
||||
"""Downloads event video clips from Unifi Protect"""
|
||||
|
||||
def __init__(self, protect: ProtectApiClient, download_queue: asyncio.Queue, buffer_size: int = 256):
|
||||
self._protect: ProtectApiClient = protect
|
||||
self._download_queue: asyncio.Queue = download_queue
|
||||
self.video_queue = VideoQueue(buffer_size * 1024 * 1024)
|
||||
|
||||
# Check if `ffprobe` is available
|
||||
ffprobe = shutil.which('ffprobe')
|
||||
if ffprobe is not None:
|
||||
logger.debug(f"ffprobe found: {ffprobe}")
|
||||
self._has_ffprobe = True
|
||||
else:
|
||||
self._has_ffprobe = False
|
||||
|
||||
async def start(self):
|
||||
"""Main loop"""
|
||||
logger.info("Starting Downloader")
|
||||
while True:
|
||||
try:
|
||||
event = await self._download_queue.get()
|
||||
|
||||
# Fix timezones since pyunifiprotect sets all timestamps to UTC. Instead localize them to
|
||||
# the timezone of the unifi protect NVR.
|
||||
event.start = event.start.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone)
|
||||
event.end = event.end.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone)
|
||||
|
||||
logger.info(f"Downloading event: {event.id}")
|
||||
logger.debug(f"Remaining Download Queue: {self._download_queue.qsize()}")
|
||||
output_queue_current_size = human_readable_size(self.video_queue.qsize())
|
||||
output_queue_max_size = human_readable_size(self.video_queue.maxsize)
|
||||
logger.debug(f"Video Download Buffer: {output_queue_current_size}/{output_queue_max_size}")
|
||||
logger.debug(f" Camera: {await get_camera_name(self._protect, event.camera_id)}")
|
||||
if event.type == EventType.SMART_DETECT:
|
||||
logger.debug(f" Type: {event.type} ({', '.join(event.smart_detect_types)})")
|
||||
else:
|
||||
logger.debug(f" Type: {event.type}")
|
||||
logger.debug(f" Start: {event.start.strftime('%Y-%m-%dT%H-%M-%S')} ({event.start.timestamp()})")
|
||||
logger.debug(f" End: {event.end.strftime('%Y-%m-%dT%H-%M-%S')} ({event.end.timestamp()})")
|
||||
duration = (event.end - event.start).total_seconds()
|
||||
logger.debug(f" Duration: {duration}s")
|
||||
|
||||
# Unifi protect does not return full video clips if the clip is requested too soon.
|
||||
# There are two issues at play here:
|
||||
# - Protect will only cut a clip on an keyframe which happen every 5s
|
||||
# - Protect's pipeline needs a finite amount of time to make a clip available
|
||||
# So we will wait 1.5x the keyframe interval to ensure that there is always ample video
|
||||
# stored and Protect can return a full clip (which should be at least the length requested,
|
||||
# but often longer)
|
||||
time_since_event_ended = datetime.utcnow().replace(tzinfo=timezone.utc) - event.end
|
||||
sleep_time = (timedelta(seconds=5 * 1.5) - time_since_event_ended).total_seconds()
|
||||
if sleep_time > 0:
|
||||
logger.debug(f" Sleeping ({sleep_time}s) to ensure clip is ready to download...")
|
||||
await asyncio.sleep(sleep_time)
|
||||
|
||||
video = await self._download(event)
|
||||
if video is None:
|
||||
continue
|
||||
|
||||
# Get the actual length of the downloaded video using ffprobe
|
||||
if self._has_ffprobe:
|
||||
await self._check_video_length(video, duration)
|
||||
|
||||
await self.video_queue.put((event, video))
|
||||
logger.debug("Added to upload queue")
|
||||
|
||||
except Exception as e:
|
||||
logger.warn(f"Unexpected exception occurred, abandoning event {event.id}:")
|
||||
logger.exception(e)
|
||||
|
||||
async def _download(self, event: Event) -> bytes:
|
||||
"""Downloads the video clip for the given event"""
|
||||
logger.debug(" Downloading video...")
|
||||
for x in range(5):
|
||||
try:
|
||||
video = await self._protect.get_camera_video(event.camera_id, event.start, event.end)
|
||||
assert isinstance(video, bytes)
|
||||
break
|
||||
except (AssertionError, ClientPayloadError, TimeoutError) as e:
|
||||
logger.warn(f" Failed download attempt {x+1}, retying in 1s")
|
||||
logger.exception(e)
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
logger.warn(f"Download failed after 5 attempts, abandoning event {event.id}:")
|
||||
return
|
||||
|
||||
logger.debug(f" Downloaded video size: {human_readable_size(len(video))}s")
|
||||
return video
|
||||
|
||||
async def _check_video_length(self, video, duration):
|
||||
"""Check if the downloaded event is at least the length of the event, warn otherwise
|
||||
|
||||
It is expected for events to regularly be slightly longer than the event specified"""
|
||||
try:
|
||||
downloaded_duration = await get_video_length(video)
|
||||
msg = f" Downloaded video length: {downloaded_duration:.3f}s" f"({downloaded_duration - duration:+.3f}s)"
|
||||
if downloaded_duration < duration:
|
||||
logger.warning(msg)
|
||||
else:
|
||||
logger.debug(msg)
|
||||
except SubprocessException as e:
|
||||
logger.warn(" `ffprobe` failed")
|
||||
logger.exception(e)
|
||||
120
unifi_protect_backup/event_listener.py
Normal file
120
unifi_protect_backup/event_listener.py
Normal file
@@ -0,0 +1,120 @@
|
||||
import logging
|
||||
from time import sleep
|
||||
import asyncio
|
||||
from typing import List
|
||||
|
||||
from pyunifiprotect.data.websocket import WSAction, WSSubscriptionMessage
|
||||
from pyunifiprotect.data.nvr import Event
|
||||
from pyunifiprotect.data.types import EventType
|
||||
from pyunifiprotect.api import ProtectApiClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventListener:
|
||||
"""Listens to the unifi protect websocket for new events to backup"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
event_queue: asyncio.Queue,
|
||||
protect: ProtectApiClient,
|
||||
detection_types: List[str],
|
||||
ignore_cameras: List[str],
|
||||
):
|
||||
self._event_queue: asyncio.Queue = event_queue
|
||||
self._protect: ProtectApiClient = protect
|
||||
self._unsub = None
|
||||
self.detection_types: List[str] = detection_types
|
||||
self.ignore_cameras: List[str] = ignore_cameras
|
||||
|
||||
async def start(self):
|
||||
"""Main Loop"""
|
||||
logger.debug("Subscribed to websocket")
|
||||
self._unsub = self._protect.subscribe_websocket(self._websocket_callback)
|
||||
|
||||
while True:
|
||||
await asyncio.sleep(60)
|
||||
await self._check_websocket_and_reconnect()
|
||||
|
||||
def _websocket_callback(self, msg: WSSubscriptionMessage) -> None:
|
||||
"""Callback for "EVENT" websocket messages.
|
||||
|
||||
Filters the incoming events, and puts completed events onto the download queue
|
||||
|
||||
Args:
|
||||
msg (Event): Incoming event data
|
||||
"""
|
||||
logger.websocket_data(msg) # type: ignore
|
||||
|
||||
assert isinstance(msg.new_obj, Event)
|
||||
if msg.action != WSAction.UPDATE:
|
||||
return
|
||||
if msg.new_obj.camera_id in self.ignore_cameras:
|
||||
return
|
||||
if msg.new_obj.end is None:
|
||||
return
|
||||
if msg.new_obj.type not in [EventType.MOTION, EventType.SMART_DETECT, EventType.RING]:
|
||||
return
|
||||
if msg.new_obj.type is EventType.MOTION and "motion" not in self.detection_types:
|
||||
logger.extra_debug(f"Skipping unwanted motion detection event: {msg.new_obj.id}") # type: ignore
|
||||
return
|
||||
if msg.new_obj.type is EventType.RING and "ring" not in self.detection_types:
|
||||
logger.extra_debug(f"Skipping unwanted ring event: {msg.new_obj.id}") # type: ignore
|
||||
return
|
||||
elif msg.new_obj.type is EventType.SMART_DETECT:
|
||||
for event_smart_detection_type in msg.new_obj.smart_detect_types:
|
||||
if event_smart_detection_type not in self.detection_types:
|
||||
logger.extra_debug( # type: ignore
|
||||
f"Skipping unwanted {event_smart_detection_type} detection event: {msg.new_obj.id}"
|
||||
)
|
||||
return
|
||||
|
||||
# TODO: Will this even work? I think it will block the async loop
|
||||
while self._event_queue.full():
|
||||
logger.extra_debug("Event queue full, waiting 1s...")
|
||||
sleep(1)
|
||||
|
||||
self._event_queue.put_nowait(msg.new_obj)
|
||||
|
||||
# Unifi protect has started sending the event id in the websocket as a {event_id}-{camera_id} but when the
|
||||
# API is queried they only have {event_id}. Keeping track of these both of these would be complicated so
|
||||
# instead we fudge the ID here to match what the API returns
|
||||
if '-' in msg.new_obj.id:
|
||||
msg.new_obj.id = msg.new_obj.id.split('-')[0]
|
||||
|
||||
logger.debug(f"Adding event {msg.new_obj.id} to queue (Current download queue={self._event_queue.qsize()})")
|
||||
|
||||
async def _check_websocket_and_reconnect(self):
|
||||
"""Checks for websocket disconnect and triggers a reconnect"""
|
||||
|
||||
logger.extra_debug("Checking the status of the websocket...")
|
||||
if self._protect.check_ws():
|
||||
logger.extra_debug("Websocket is connected.")
|
||||
else:
|
||||
logger.warn("Lost connection to Unifi Protect.")
|
||||
|
||||
# Unsubscribe, close the session.
|
||||
self._unsub()
|
||||
await self._protect.close_session()
|
||||
|
||||
while True:
|
||||
logger.warn("Attempting reconnect...")
|
||||
|
||||
try:
|
||||
# Start the pyunifiprotect connection by calling `update`
|
||||
await self._protect.close_session()
|
||||
self._protect._bootstrap = None
|
||||
await self._protect.update(force=True)
|
||||
if self._protect.check_ws():
|
||||
self._unsub = self._protect.subscribe_websocket(self._websocket_callback)
|
||||
break
|
||||
else:
|
||||
logger.warn("Unable to establish connection to Unifi Protect")
|
||||
except Exception as e:
|
||||
logger.warn("Unexpected exception occurred while trying to reconnect:")
|
||||
logger.exception(e)
|
||||
|
||||
# Back off for a little while
|
||||
await asyncio.sleep(10)
|
||||
|
||||
logger.info("Re-established connection to Unifi Protect and to the websocket.")
|
||||
83
unifi_protect_backup/missing_event_checker.py
Normal file
83
unifi_protect_backup/missing_event_checker.py
Normal file
@@ -0,0 +1,83 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import List
|
||||
|
||||
import aiosqlite
|
||||
from dateutil.relativedelta import relativedelta
|
||||
|
||||
from pyunifiprotect import ProtectApiClient
|
||||
from pyunifiprotect.data.types import EventType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MissingEventChecker:
|
||||
"""Periodically checks if any unifi protect events exist within the retention period that are not backed up"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
protect: ProtectApiClient,
|
||||
db: aiosqlite.Connection,
|
||||
event_queue: asyncio.Queue,
|
||||
retention: relativedelta,
|
||||
detection_types: List[str],
|
||||
ignore_cameras: List[str],
|
||||
interval: int = 60 * 5,
|
||||
) -> None:
|
||||
self._protect: ProtectApiClient = protect
|
||||
self._db: aiosqlite.Connection = db
|
||||
self._event_queue: asyncio.Queue = event_queue
|
||||
self.retention: relativedelta = retention
|
||||
self.detection_types: List[str] = detection_types
|
||||
self.ignore_cameras: List[str] = ignore_cameras
|
||||
self.interval: int = interval
|
||||
|
||||
async def start(self):
|
||||
"""main loop"""
|
||||
logger.info("Starting Missing Event Checker")
|
||||
while True:
|
||||
try:
|
||||
logger.extra_debug("Running check for missing events...")
|
||||
# Get list of events that need to be backed up from unifi protect
|
||||
unifi_events = await self._protect.get_events(
|
||||
start=datetime.now() - self.retention,
|
||||
end=datetime.now(),
|
||||
types=[EventType.MOTION, EventType.SMART_DETECT, EventType.RING],
|
||||
)
|
||||
unifi_events = {event.id: event for event in unifi_events}
|
||||
|
||||
# Get list of events that have been backed up from the database
|
||||
|
||||
# events(id, type, camera_id, start, end)
|
||||
async with self._db.execute("SELECT * FROM events") as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
db_event_ids = {row[0] for row in rows}
|
||||
|
||||
# Prevent re-adding events currently in the download queue
|
||||
downloading_event_ids = {event.id for event in self._event_queue._queue}
|
||||
|
||||
missing_event_ids = set(unifi_events.keys()) - (db_event_ids | downloading_event_ids)
|
||||
|
||||
for event_id in missing_event_ids:
|
||||
event = unifi_events[event_id]
|
||||
if event.start is None or event.end is None:
|
||||
continue # This event is still on-going
|
||||
if event.type is EventType.MOTION and "motion" not in self.detection_types:
|
||||
continue
|
||||
if event.type is EventType.RING and "ring" not in self.detection_types:
|
||||
continue
|
||||
elif event.type is EventType.SMART_DETECT:
|
||||
for event_smart_detection_type in event.smart_detect_types:
|
||||
if event_smart_detection_type not in self.detection_types:
|
||||
continue
|
||||
else:
|
||||
logger.warning(
|
||||
f" Adding missing event to backup queue: {event.id} ({event.type}) ({event.start.strftime('%Y-%m-%dT%H-%M-%S')} - {event.end.strftime('%Y-%m-%dT%H-%M-%S')})"
|
||||
)
|
||||
await self._event_queue.put(event)
|
||||
except Exception as e:
|
||||
logger.warn(f"Unexpected exception occurred during missing event check:")
|
||||
logger.exception(e)
|
||||
|
||||
await asyncio.sleep(self.interval)
|
||||
75
unifi_protect_backup/purge.py
Normal file
75
unifi_protect_backup/purge.py
Normal file
@@ -0,0 +1,75 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
import aiosqlite
|
||||
from dateutil.relativedelta import relativedelta
|
||||
|
||||
from unifi_protect_backup.utils import parse_rclone_retention, run_command
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def wait_until(dt):
|
||||
# sleep until the specified datetime
|
||||
now = datetime.now()
|
||||
await asyncio.sleep((dt - now).total_seconds())
|
||||
|
||||
|
||||
async def delete_file(file_path):
|
||||
returncode, stdout, stderr = await run_command(f'rclone delete -vv "{file_path}"')
|
||||
if returncode != 0:
|
||||
logger.warn(f" Failed to delete file: '{file_path}'")
|
||||
|
||||
|
||||
async def tidy_empty_dirs(base_dir_path):
|
||||
returncode, stdout, stderr = await run_command(f'rclone rmdirs -vv --ignore-errors --leave-root "{base_dir_path}"')
|
||||
if returncode != 0:
|
||||
logger.warn(f" Failed to tidy empty dirs")
|
||||
|
||||
|
||||
class Purge:
|
||||
"""Deletes old files from rclone remotes"""
|
||||
|
||||
def __init__(self, db: aiosqlite.Connection, retention: relativedelta, rclone_destination: str, interval: int = 60):
|
||||
self._db: aiosqlite.Connection = db
|
||||
self.retention: relativedelta = retention
|
||||
self.rclone_destination: str = rclone_destination
|
||||
self.interval: int = interval
|
||||
|
||||
async def start(self):
|
||||
"""Main loop - runs forever"""
|
||||
while True:
|
||||
try:
|
||||
deleted_a_file = False
|
||||
|
||||
# For every event older than the retention time
|
||||
retention_oldest_time = time.mktime((datetime.now() - self.retention).timetuple())
|
||||
async with self._db.execute(
|
||||
f"SELECT * FROM events WHERE end < {retention_oldest_time}"
|
||||
) as event_cursor:
|
||||
async for event_id, event_type, camera_id, event_start, event_end in event_cursor:
|
||||
|
||||
logger.info(f"Purging event: {event_id}.")
|
||||
|
||||
# For every backup for this event
|
||||
async with self._db.execute(f"SELECT * FROM backups WHERE id = '{event_id}'") as backup_cursor:
|
||||
async for _, remote, file_path in backup_cursor:
|
||||
logger.debug(f" Deleted: {remote}:{file_path}")
|
||||
await delete_file(f"{remote}:{file_path}")
|
||||
deleted_a_file = True
|
||||
|
||||
# delete event from database
|
||||
# entries in the `backups` table are automatically deleted by sqlite triggers
|
||||
await self._db.execute(f"DELETE FROM events WHERE id = '{event_id}'")
|
||||
await self._db.commit()
|
||||
|
||||
if deleted_a_file:
|
||||
await tidy_empty_dirs(self.rclone_destination)
|
||||
|
||||
except Exception as e:
|
||||
logger.warn(f"Unexpected exception occurred during purge:")
|
||||
logger.exception(e)
|
||||
|
||||
await asyncio.sleep(self.interval)
|
||||
@@ -1,39 +1,23 @@
|
||||
"""Main module."""
|
||||
import asyncio
|
||||
import logging
|
||||
import pathlib
|
||||
import os
|
||||
import shutil
|
||||
from cmath import log
|
||||
from pprint import pprint
|
||||
from time import sleep
|
||||
from typing import Callable, List, Optional
|
||||
|
||||
import aiocron
|
||||
import aiohttp
|
||||
import aiosqlite
|
||||
from pyunifiprotect import ProtectApiClient
|
||||
from pyunifiprotect.data.nvr import Event
|
||||
from pyunifiprotect.data.types import EventType, ModelType
|
||||
from pyunifiprotect.data.websocket import WSAction, WSSubscriptionMessage
|
||||
from pyunifiprotect.data.types import ModelType
|
||||
|
||||
from unifi_protect_backup import EventListener, MissingEventChecker, Purge, VideoDownloader, VideoUploader
|
||||
from unifi_protect_backup.utils import SubprocessException, parse_rclone_retention, run_command
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RcloneException(Exception):
|
||||
"""Exception class for when rclone does not exit with `0`."""
|
||||
|
||||
def __init__(self, stdout, stderr, returncode):
|
||||
"""Exception class for when rclone does not exit with `0`.
|
||||
|
||||
Args:
|
||||
stdout (str): What rclone output to stdout
|
||||
stderr (str): What rclone output to stderr
|
||||
returncode (str): The return code of the rclone process
|
||||
"""
|
||||
super().__init__()
|
||||
self.stdout: str = stdout
|
||||
self.stderr: str = stderr
|
||||
self.returncode: int = returncode
|
||||
|
||||
def __str__(self):
|
||||
"""Turns excpetion into a human readable form."""
|
||||
return f"Return Code: {self.returncode}\nStdout:\n{self.stdout}\nStderr:\n{self.stderr}"
|
||||
# TODO: https://github.com/cjrh/aiorun#id6 (smart shield)
|
||||
|
||||
|
||||
def add_logging_level(levelName: str, levelNum: int, methodName: Optional[str] = None) -> None:
|
||||
@@ -119,10 +103,46 @@ def setup_logging(verbosity: int) -> None:
|
||||
logging.DEBUG - 2,
|
||||
)
|
||||
|
||||
format = "{asctime} [{levelname}]:{name: <20}:\t{message}"
|
||||
format = "{asctime} [{levelname:^11s}] {name:<42} :\t{message}"
|
||||
date_format = "%Y-%m-%d %H:%M:%S"
|
||||
style = '{'
|
||||
|
||||
logger = logging.getLogger("unifi_protect_backup")
|
||||
|
||||
sh = logging.StreamHandler()
|
||||
formatter = logging.Formatter(format, date_format, style)
|
||||
sh.setFormatter(formatter)
|
||||
|
||||
def decorate_emit(fn):
|
||||
# add methods we need to the class
|
||||
def new(*args):
|
||||
levelno = args[0].levelno
|
||||
if levelno >= logging.CRITICAL:
|
||||
color = '\x1b[31;1m' # RED
|
||||
elif levelno >= logging.ERROR:
|
||||
color = '\x1b[31;1m' # RED
|
||||
elif levelno >= logging.WARNING:
|
||||
color = '\x1b[33;1m' # YELLOW
|
||||
elif levelno >= logging.INFO:
|
||||
color = '\x1b[32;1m' # GREEN
|
||||
elif levelno >= logging.DEBUG:
|
||||
color = '\x1b[36;1m' # CYAN
|
||||
elif levelno >= logging.EXTRA_DEBUG:
|
||||
color = '\x1b[35;1m' # MAGENTA
|
||||
else:
|
||||
color = '\x1b[0m'
|
||||
# add colored *** in the beginning of the message
|
||||
|
||||
args[0].levelname = f"{color}{args[0].levelname:^11s}\x1b[0m"
|
||||
|
||||
return fn(*args)
|
||||
|
||||
return new
|
||||
|
||||
sh.emit = decorate_emit(sh.emit)
|
||||
logger.addHandler(sh)
|
||||
logger.propagate = False
|
||||
|
||||
if verbosity == 0:
|
||||
logging.basicConfig(level=logging.WARN, format=format, style=style, datefmt=date_format)
|
||||
logger.setLevel(logging.INFO)
|
||||
@@ -138,24 +158,20 @@ def setup_logging(verbosity: int) -> None:
|
||||
elif verbosity == 4:
|
||||
logging.basicConfig(level=logging.INFO, format=format, style=style, datefmt=date_format)
|
||||
logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore
|
||||
elif verbosity == 5:
|
||||
elif verbosity >= 5:
|
||||
logging.basicConfig(level=logging.DEBUG, format=format, style=style, datefmt=date_format)
|
||||
logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore
|
||||
|
||||
|
||||
def human_readable_size(num):
|
||||
"""Turns a number into a human readable number with ISO/IEC 80000 binary prefixes.
|
||||
|
||||
Based on: https://stackoverflow.com/a/1094933
|
||||
|
||||
Args:
|
||||
num (int): The number to be converted into human readable format
|
||||
"""
|
||||
for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]:
|
||||
if abs(num) < 1024.0:
|
||||
return f"{num:3.1f}{unit}"
|
||||
num /= 1024.0
|
||||
raise ValueError("`num` too large, ran out of prefixes")
|
||||
async def create_database(path: str):
|
||||
"""Creates sqlite database and creates the events abd backups tables"""
|
||||
db = await aiosqlite.connect(path)
|
||||
await db.execute("CREATE TABLE events(id PRIMARY KEY, type, camera_id, start REAL, end REAL)")
|
||||
await db.execute(
|
||||
"CREATE TABLE backups(id REFERENCES events(id) ON DELETE CASCADE, remote, path, PRIMARY KEY (id, remote))"
|
||||
)
|
||||
await db.commit()
|
||||
return db
|
||||
|
||||
|
||||
class UnifiProtectBackup:
|
||||
@@ -163,15 +179,6 @@ class UnifiProtectBackup:
|
||||
|
||||
Listens to the Unifi Protect websocket for events. When a completed motion or smart detection
|
||||
event is detected, it will download the clip and back it up using rclone
|
||||
|
||||
Attributes:
|
||||
retention (str): How long should event clips be backed up for. Format as per the
|
||||
`--max-age` argument of `rclone`
|
||||
(https://rclone.org/filtering/#max-age-don-t-transfer-any-file-older-than-this)
|
||||
ignore_cameras (List[str]): List of camera IDs for which to not backup events
|
||||
verbose (int): How verbose to setup logging, see :func:`setup_logging` for details.
|
||||
_download_queue (asyncio.Queue): Queue of events that need to be backed up
|
||||
_unsub (Callable): Unsubscribe from the websocket callback
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -182,8 +189,12 @@ class UnifiProtectBackup:
|
||||
verify_ssl: bool,
|
||||
rclone_destination: str,
|
||||
retention: str,
|
||||
rclone_args: str,
|
||||
detection_types: List[str],
|
||||
ignore_cameras: List[str],
|
||||
file_structure_format: str,
|
||||
verbose: int,
|
||||
sqlite_path: str = "events.sqlite",
|
||||
port: int = 443,
|
||||
):
|
||||
"""Will configure logging settings and the Unifi Protect API (but not actually connect).
|
||||
@@ -200,8 +211,13 @@ class UnifiProtectBackup:
|
||||
retention (str): How long should event clips be backed up for. Format as per the
|
||||
`--max-age` argument of `rclone`
|
||||
(https://rclone.org/filtering/#max-age-don-t-transfer-any-file-older-than-this)
|
||||
ignore_cameras (List[str]): List of camera IDs for which to not backup events
|
||||
rclone_args (str): A bandwidth limit which is passed to the `--bwlimit` argument of
|
||||
`rclone` (https://rclone.org/docs/#bwlimit-bandwidth-spec)
|
||||
detection_types (List[str]): List of which detection types to backup.
|
||||
ignore_cameras (List[str]): List of camera IDs for which to not backup events.
|
||||
file_structure_format (str): A Python format string for output file path.
|
||||
verbose (int): How verbose to setup logging, see :func:`setup_logging` for details.
|
||||
sqlite_path (str): Path where to find/create sqlite database
|
||||
"""
|
||||
setup_logging(verbose)
|
||||
|
||||
@@ -217,23 +233,39 @@ class UnifiProtectBackup:
|
||||
logger.debug(f" {verify_ssl=}")
|
||||
logger.debug(f" {rclone_destination=}")
|
||||
logger.debug(f" {retention=}")
|
||||
logger.debug(f" {rclone_args=}")
|
||||
logger.debug(f" {ignore_cameras=}")
|
||||
logger.debug(f" {verbose=}")
|
||||
logger.debug(f" {detection_types=}")
|
||||
logger.debug(f" {file_structure_format=}")
|
||||
logger.debug(f" {sqlite_path=}")
|
||||
|
||||
self.rclone_destination = rclone_destination
|
||||
self.retention = retention
|
||||
self.retention = parse_rclone_retention(retention)
|
||||
self.rclone_args = rclone_args
|
||||
self.file_structure_format = file_structure_format
|
||||
|
||||
self.address = address
|
||||
self.port = port
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.verify_ssl = verify_ssl
|
||||
|
||||
self._protect = ProtectApiClient(
|
||||
address,
|
||||
port,
|
||||
username,
|
||||
password,
|
||||
verify_ssl=verify_ssl,
|
||||
self.address,
|
||||
self.port,
|
||||
self.username,
|
||||
self.password,
|
||||
verify_ssl=self.verify_ssl,
|
||||
subscribed_models={ModelType.EVENT},
|
||||
)
|
||||
self.ignore_cameras = ignore_cameras
|
||||
self._download_queue: asyncio.Queue = asyncio.Queue()
|
||||
self._unsub: Callable[[], None]
|
||||
self.detection_types = detection_types
|
||||
self._has_ffprobe = False
|
||||
self._sqlite_path = sqlite_path
|
||||
self._db = None
|
||||
|
||||
async def start(self):
|
||||
"""Bootstrap the backup process and kick off the main loop.
|
||||
@@ -241,224 +273,104 @@ class UnifiProtectBackup:
|
||||
You should run this to start the realtime backup of Unifi Protect clips as they are created
|
||||
|
||||
"""
|
||||
logger.info("Starting...")
|
||||
try:
|
||||
logger.info("Starting...")
|
||||
|
||||
# Ensure rclone is installed and properly configured
|
||||
logger.info("Checking rclone configuration...")
|
||||
await self._check_rclone()
|
||||
# Ensure `rclone` is installed and properly configured
|
||||
logger.info("Checking rclone configuration...")
|
||||
await self._check_rclone()
|
||||
|
||||
# Start the pyunifiprotect connection by calling `update`
|
||||
logger.info("Connecting to Unifi Protect...")
|
||||
await self._protect.update()
|
||||
logger.info("Found cameras:")
|
||||
for camera in self._protect.bootstrap.cameras.values():
|
||||
logger.info(f" - {camera.id}: {camera.name}")
|
||||
# Start the pyunifiprotect connection by calling `update`
|
||||
logger.info("Connecting to Unifi Protect...")
|
||||
await self._protect.update()
|
||||
|
||||
# Subscribe to the websocket
|
||||
self._unsub = self._protect.subscribe_websocket(self._websocket_callback)
|
||||
# Get a mapping of camera ids -> names
|
||||
logger.info("Found cameras:")
|
||||
for camera in self._protect.bootstrap.cameras.values():
|
||||
logger.info(f" - {camera.id}: {camera.name}")
|
||||
|
||||
# Set up a "purge" task to run at midnight each day to delete old recordings and empty directories
|
||||
logger.info("Setting up purge task...")
|
||||
tasks = []
|
||||
|
||||
@aiocron.crontab("0 0 * * *")
|
||||
async def rclone_purge_old():
|
||||
logger.info("Deleting old files...")
|
||||
cmd = f"rclone delete -vv --min-age {self.retention} '{self.rclone_destination}'"
|
||||
cmd += f" && rclone rmdirs -vv --leave-root '{self.rclone_destination}'"
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await proc.communicate()
|
||||
if proc.returncode == 0:
|
||||
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
|
||||
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
|
||||
logger.info("Successfully deleted old files")
|
||||
if not os.path.exists(self._sqlite_path):
|
||||
logger.info("Database doesn't exist, creating a new one")
|
||||
self._db = await create_database(self._sqlite_path)
|
||||
else:
|
||||
logger.warn("Failed to purge old files")
|
||||
logger.warn(f"stdout:\n{stdout.decode()}")
|
||||
logger.warn(f"stderr:\n{stderr.decode()}")
|
||||
self._db = await aiosqlite.connect(self._sqlite_path)
|
||||
|
||||
# Launches the main loop
|
||||
logger.info("Listening for events...")
|
||||
await self._backup_events()
|
||||
event_queue = asyncio.Queue()
|
||||
|
||||
logger.info("Stopping...")
|
||||
# Enable foreign keys in the database
|
||||
await self._db.execute("PRAGMA foreign_keys = ON;")
|
||||
|
||||
# Unsubscribes from the websocket
|
||||
self._unsub()
|
||||
# Create downloader task
|
||||
# This will download video files to its buffer
|
||||
downloader = VideoDownloader(self._protect, event_queue) # TODO: Make buffer size configurable
|
||||
tasks.append(asyncio.create_task(downloader.start()))
|
||||
|
||||
# Create upload task
|
||||
# This will upload the videos in the downloader's buffer to the rclone remotes and log it in the database
|
||||
uploader = VideoUploader(
|
||||
self._protect,
|
||||
downloader.video_queue,
|
||||
self.rclone_destination,
|
||||
self.rclone_args,
|
||||
self.file_structure_format,
|
||||
self._db,
|
||||
)
|
||||
tasks.append(asyncio.create_task(uploader.start()))
|
||||
|
||||
# Create event listener task
|
||||
# This will connect to the unifi protect websocket and listen for events. When one is detected it will
|
||||
# be added to the queue of events to download
|
||||
event_listener = EventListener(event_queue, self._protect, self.detection_types, self.ignore_cameras)
|
||||
tasks.append(asyncio.create_task(event_listener.start()))
|
||||
|
||||
# Create purge task
|
||||
# This will, every midnight, purge old backups from the rclone remotes and database
|
||||
purge = Purge(self._db, self.retention, self.rclone_destination)
|
||||
tasks.append(asyncio.create_task(purge.start()))
|
||||
|
||||
# Create missing event task
|
||||
# This will check all the events within the retention period, if any have been missed and not backed up
|
||||
# they will be added to the event queue
|
||||
missing = MissingEventChecker(
|
||||
self._protect, self._db, event_queue, self.retention, self.detection_types, self.ignore_cameras
|
||||
)
|
||||
tasks.append(asyncio.create_task(missing.start()))
|
||||
|
||||
logger.info("Starting...")
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
if self._protect is not None:
|
||||
await self._protect.close_session()
|
||||
if self._db is not None:
|
||||
await self._db.close()
|
||||
|
||||
async def _check_rclone(self) -> None:
|
||||
"""Check if rclone is installed and the specified remote is configured.
|
||||
|
||||
Raises:
|
||||
RcloneException: If rclone is not installed or it failed to list remotes
|
||||
SubprocessException: If rclone is not installed or it failed to list remotes
|
||||
ValueError: The given rclone destination is for a remote that is not configured
|
||||
|
||||
"""
|
||||
rclone = shutil.which('rclone')
|
||||
logger.debug(f"rclone found: {rclone}")
|
||||
if not rclone:
|
||||
raise RuntimeError("`rclone` is not installed on this system")
|
||||
logger.debug(f"rclone found: {rclone}")
|
||||
|
||||
cmd = "rclone listremotes -vv"
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await proc.communicate()
|
||||
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
|
||||
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
|
||||
if proc.returncode != 0:
|
||||
raise RcloneException(stdout.decode(), stderr.decode(), proc.returncode)
|
||||
returncode, stdout, stderr = await run_command("rclone listremotes -vv")
|
||||
if returncode != 0:
|
||||
raise SubprocessException(stdout, stderr, returncode)
|
||||
|
||||
# Check if the destination is for a configured remote
|
||||
for line in stdout.splitlines():
|
||||
if self.rclone_destination.startswith(line.decode()):
|
||||
if self.rclone_destination.startswith(line):
|
||||
break
|
||||
else:
|
||||
remote = self.rclone_destination.split(":")[0]
|
||||
raise ValueError(f"rclone does not have a remote called `{remote}`")
|
||||
|
||||
def _websocket_callback(self, msg: WSSubscriptionMessage) -> None:
|
||||
"""Callback for "EVENT" websocket messages.
|
||||
|
||||
Filters the incoming events, and puts completed events onto the download queue
|
||||
|
||||
Args:
|
||||
msg (Event): Incoming event data
|
||||
"""
|
||||
logger.websocket_data(msg) # type: ignore
|
||||
|
||||
# We are only interested in updates that end motion/smartdetection event
|
||||
assert isinstance(msg.new_obj, Event)
|
||||
if msg.action != WSAction.UPDATE:
|
||||
return
|
||||
if msg.new_obj.camera_id in self.ignore_cameras:
|
||||
return
|
||||
if msg.new_obj.end is None:
|
||||
return
|
||||
if msg.new_obj.type not in {EventType.MOTION, EventType.SMART_DETECT}:
|
||||
return
|
||||
self._download_queue.put_nowait(msg.new_obj)
|
||||
logger.debug(f"Adding event {msg.new_obj.id} to queue (Current queue={self._download_queue.qsize()})")
|
||||
|
||||
async def _backup_events(self) -> None:
|
||||
"""Main loop for backing up events.
|
||||
|
||||
Waits for an event in the queue, then downloads the corresponding clip and uploads it using rclone.
|
||||
If errors occur it will simply log the errors and wait for the next event. In a future release,
|
||||
retries will be added.
|
||||
|
||||
"""
|
||||
while True:
|
||||
event = await self._download_queue.get()
|
||||
destination = self.generate_file_path(event)
|
||||
|
||||
logger.info(f"Backing up event: {event.id}")
|
||||
logger.debug(f"Remaining Queue: {self._download_queue.qsize()}")
|
||||
logger.debug(f" Camera: {self._protect.bootstrap.cameras[event.camera_id].name}")
|
||||
logger.debug(f" Type: {event.type}")
|
||||
logger.debug(f" Start: {event.start.strftime('%Y-%m-%dT%H-%M-%S')}")
|
||||
logger.debug(f" End: {event.end.strftime('%Y-%m-%dT%H-%M-%S')}")
|
||||
logger.debug(f" Duration: {event.end-event.start}")
|
||||
|
||||
try:
|
||||
# Download video
|
||||
for x in range(5):
|
||||
try:
|
||||
logger.debug(" Downloading video...")
|
||||
video = await self._protect.get_camera_video(event.camera_id, event.start, event.end)
|
||||
assert isinstance(video, bytes)
|
||||
break
|
||||
except (AssertionError, aiohttp.client_exceptions.ClientPayloadError) as e:
|
||||
logger.warn(f" Failed download attempt {x+1}, retying in 1s")
|
||||
logger.exception(e)
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
logger.warn(f"Download failed after 5 attempts, abandoning event {event.id}:")
|
||||
continue
|
||||
|
||||
for x in range(5):
|
||||
try:
|
||||
await self._upload_video(video, destination)
|
||||
break
|
||||
except RcloneException as e:
|
||||
logger.warn(f" Failed upload attempt {x+1}, retying in 1s")
|
||||
logger.exception(e)
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
logger.warn(f"Upload failed after 5 attempts, abandoning event {event.id}:")
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.warn(f"Unexpected exception occured, abandoning event {event.id}:")
|
||||
logger.exception(e)
|
||||
|
||||
async def _upload_video(self, video: bytes, destination: pathlib.Path):
|
||||
"""Upload video using rclone.
|
||||
|
||||
In order to avoid writing to disk, the video file data is piped directly
|
||||
to the rclone process and uploaded using the `rcat` function of rclone.
|
||||
|
||||
Args:
|
||||
video (bytes): The data to be written to the file
|
||||
destination (pathlib.Path): Where rclone should write the file
|
||||
|
||||
Raises:
|
||||
RuntimeError: If rclone returns a non-zero exit code
|
||||
"""
|
||||
logger.debug(" Uploading video via rclone...")
|
||||
logger.debug(f" To: {destination}")
|
||||
logger.debug(f" Size: {human_readable_size(len(video))}")
|
||||
|
||||
cmd = f"rclone rcat -vv '{destination}'"
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
cmd,
|
||||
stdin=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await proc.communicate(video)
|
||||
if proc.returncode == 0:
|
||||
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
|
||||
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
|
||||
else:
|
||||
raise RcloneException(stdout.decode(), stderr.decode(), proc.returncode)
|
||||
|
||||
logger.info("Backed up successfully!")
|
||||
|
||||
def generate_file_path(self, event: Event) -> pathlib.Path:
|
||||
"""Generates the rclone destination path for the provided event.
|
||||
|
||||
Generates paths in the following structure:
|
||||
::
|
||||
rclone_destination
|
||||
|- Camera Name
|
||||
|- {Date}
|
||||
|- {start timestamp} {event type} ({detections}).mp4
|
||||
|
||||
Args:
|
||||
event: The event for which to create an output path
|
||||
|
||||
Returns:
|
||||
pathlib.Path: The rclone path the event should be backed up to
|
||||
|
||||
"""
|
||||
path = pathlib.Path(self.rclone_destination)
|
||||
assert isinstance(event.camera_id, str)
|
||||
path /= self._protect.bootstrap.cameras[event.camera_id].name # directory per camera
|
||||
path /= event.start.strftime("%Y-%m-%d") # Directory per day
|
||||
|
||||
file_name = f"{event.start.strftime('%Y-%m-%dT%H-%M-%S')} {event.type}"
|
||||
|
||||
if event.smart_detect_types:
|
||||
detections = " ".join(event.smart_detect_types)
|
||||
file_name += f" ({detections})"
|
||||
file_name += ".mp4"
|
||||
|
||||
path /= file_name
|
||||
|
||||
return path
|
||||
# Ensure the base directory exists
|
||||
await run_command(f"rclone mkdir -vv {self.rclone_destination}")
|
||||
|
||||
146
unifi_protect_backup/uploader.py
Normal file
146
unifi_protect_backup/uploader.py
Normal file
@@ -0,0 +1,146 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import pathlib
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
import aiosqlite
|
||||
from pyunifiprotect.data.nvr import Event
|
||||
from pyunifiprotect import ProtectApiClient
|
||||
|
||||
from unifi_protect_backup.utils import get_camera_name, SubprocessException, VideoQueue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VideoUploader:
|
||||
"""Uploads videos from the video_queue to the provided rclone destination
|
||||
|
||||
Keeps a log of what its uploaded in `db`
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
protect: ProtectApiClient,
|
||||
video_queue: VideoQueue,
|
||||
rclone_destination: str,
|
||||
rclone_args: str,
|
||||
file_structure_format: str,
|
||||
db: aiosqlite.Connection,
|
||||
):
|
||||
self._protect: ProtectApiClient = protect
|
||||
self._video_queue: VideoQueue = video_queue
|
||||
self._rclone_destination: str = rclone_destination
|
||||
self._rclone_args: str = rclone_args
|
||||
self._file_structure_format: str = file_structure_format
|
||||
self._db: aiosqlite.Connection = db
|
||||
|
||||
async def start(self):
|
||||
"""Main loop
|
||||
|
||||
Runs forever looking for video data in the video queue and then uploads it using rclone, finally it updates the database
|
||||
"""
|
||||
|
||||
logger.info("Starting Uploader")
|
||||
while True:
|
||||
try:
|
||||
event, video = await self._video_queue.get()
|
||||
logger.info(f"Uploading event: {event.id}")
|
||||
logger.debug(f" Remaining Upload Queue: {self._video_queue.qsize_files()}")
|
||||
|
||||
destination = await self._generate_file_path(event)
|
||||
logger.debug(f" Destination: {destination}")
|
||||
|
||||
await self._upload_video(video, destination, self._rclone_args)
|
||||
await self._update_database(event, destination)
|
||||
|
||||
logger.debug(f"Uploaded")
|
||||
|
||||
except Exception as e:
|
||||
logger.warn(f"Unexpected exception occurred, abandoning event {event.id}:")
|
||||
logger.exception(e)
|
||||
|
||||
async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str):
|
||||
"""Upload video using rclone.
|
||||
|
||||
In order to avoid writing to disk, the video file data is piped directly
|
||||
to the rclone process and uploaded using the `rcat` function of rclone.
|
||||
|
||||
Args:
|
||||
video (bytes): The data to be written to the file
|
||||
destination (pathlib.Path): Where rclone should write the file
|
||||
rclone_args (str): Optional extra arguments to pass to `rclone`
|
||||
|
||||
Raises:
|
||||
RuntimeError: If rclone returns a non-zero exit code
|
||||
"""
|
||||
cmd = f'rclone rcat -vv {rclone_args} "{destination}"'
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
cmd,
|
||||
stdin=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await proc.communicate(video)
|
||||
if proc.returncode == 0:
|
||||
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
|
||||
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
|
||||
else:
|
||||
raise SubprocessException(stdout.decode(), stderr.decode(), proc.returncode)
|
||||
|
||||
async def _update_database(self, event: Event, destination: str):
|
||||
"""
|
||||
Add the backed up event to the database along with where it was backed up to
|
||||
"""
|
||||
await self._db.execute(
|
||||
f"""INSERT INTO events VALUES
|
||||
('{event.id}', '{event.type}', '{event.camera_id}', '{event.start.timestamp()}', '{event.end.timestamp()}')
|
||||
"""
|
||||
)
|
||||
|
||||
remote, file_path = str(destination).split(":")
|
||||
await self._db.execute(
|
||||
f"""INSERT INTO backups VALUES
|
||||
('{event.id}', '{remote}', '{file_path}')
|
||||
"""
|
||||
)
|
||||
|
||||
await self._db.commit()
|
||||
|
||||
async def _generate_file_path(self, event: Event) -> pathlib.Path:
|
||||
"""Generates the rclone destination path for the provided event.
|
||||
|
||||
Generates rclone destination path for the given even based upon the format string
|
||||
in `self.file_structure_format`.
|
||||
|
||||
Provides the following fields to the format string:
|
||||
event: The `Event` object as per
|
||||
https://github.com/briis/pyunifiprotect/blob/master/pyunifiprotect/data/nvr.py
|
||||
duration_seconds: The duration of the event in seconds
|
||||
detection_type: A nicely formatted list of the event detection type and the smart detection types (if any)
|
||||
camera_name: The name of the camera that generated this event
|
||||
|
||||
Args:
|
||||
event: The event for which to create an output path
|
||||
|
||||
Returns:
|
||||
pathlib.Path: The rclone path the event should be backed up to
|
||||
|
||||
"""
|
||||
assert isinstance(event.camera_id, str)
|
||||
assert isinstance(event.start, datetime)
|
||||
assert isinstance(event.end, datetime)
|
||||
|
||||
format_context = {
|
||||
"event": event,
|
||||
"duration_seconds": (event.end - event.start).total_seconds(),
|
||||
"detection_type": f"{event.type} ({' '.join(event.smart_detect_types)})"
|
||||
if event.smart_detect_types
|
||||
else f"{event.type}",
|
||||
"camera_name": await get_camera_name(self._protect, event.camera_id),
|
||||
}
|
||||
|
||||
file_path = self._file_structure_format.format(**format_context)
|
||||
file_path = re.sub(r'[^\w\-_\.\(\)/ ]', '', file_path) # Sanitize any invalid chars
|
||||
|
||||
return pathlib.Path(f"{self._rclone_destination}/{file_path}")
|
||||
191
unifi_protect_backup/utils.py
Normal file
191
unifi_protect_backup/utils.py
Normal file
@@ -0,0 +1,191 @@
|
||||
import logging
|
||||
import re
|
||||
import asyncio
|
||||
|
||||
from dateutil.relativedelta import relativedelta
|
||||
|
||||
from pyunifiprotect import ProtectApiClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def human_readable_size(num: float):
|
||||
"""Turns a number into a human readable number with ISO/IEC 80000 binary prefixes.
|
||||
|
||||
Based on: https://stackoverflow.com/a/1094933
|
||||
|
||||
Args:
|
||||
num (int): The number to be converted into human readable format
|
||||
"""
|
||||
for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]:
|
||||
if abs(num) < 1024.0:
|
||||
return f"{num:3.1f}{unit}"
|
||||
num /= 1024.0
|
||||
raise ValueError("`num` too large, ran out of prefixes")
|
||||
|
||||
|
||||
async def get_camera_name(protect: ProtectApiClient, id: str):
|
||||
"""
|
||||
Returns the name for the camera with the given ID
|
||||
|
||||
If the camera ID is not know, it tries refreshing the cached data
|
||||
"""
|
||||
try:
|
||||
return protect.bootstrap.cameras[id].name
|
||||
except KeyError:
|
||||
# Refresh cameras
|
||||
logger.debug(f"Unknown camera id: '{id}', checking API")
|
||||
|
||||
await protect.update(force=True)
|
||||
|
||||
try:
|
||||
name = protect.bootstrap.cameras[id].name
|
||||
except KeyError:
|
||||
logger.debug(f"Unknown camera id: '{id}'")
|
||||
raise
|
||||
|
||||
logger.debug(f"Found camera - {id}: {name}")
|
||||
return name
|
||||
|
||||
|
||||
class SubprocessException(Exception):
|
||||
def __init__(self, stdout, stderr, returncode):
|
||||
"""Exception class for when rclone does not exit with `0`.
|
||||
|
||||
Args:
|
||||
stdout (str): What rclone output to stdout
|
||||
stderr (str): What rclone output to stderr
|
||||
returncode (str): The return code of the rclone process
|
||||
"""
|
||||
super().__init__()
|
||||
self.stdout: str = stdout
|
||||
self.stderr: str = stderr
|
||||
self.returncode: int = returncode
|
||||
|
||||
def __str__(self):
|
||||
"""Turns exception into a human readable form."""
|
||||
return f"Return Code: {self.returncode}\nStdout:\n{self.stdout}\nStderr:\n{self.stderr}"
|
||||
|
||||
|
||||
def parse_rclone_retention(retention: str) -> relativedelta:
|
||||
"""
|
||||
Parses the rclone `retention` parameter into a relativedelta which can then be used
|
||||
to calculate datetimes
|
||||
"""
|
||||
|
||||
matches = {k: int(v) for v, k in re.findall(r"([\d]+)(ms|s|m|h|d|w|M|y)", retention)}
|
||||
return relativedelta(
|
||||
microseconds=matches.get("ms", 0) * 1000,
|
||||
seconds=matches.get("s", 0),
|
||||
minutes=matches.get("m", 0),
|
||||
hours=matches.get("h", 0),
|
||||
days=matches.get("d", 0),
|
||||
weeks=matches.get("w", 0),
|
||||
months=matches.get("M", 0),
|
||||
years=matches.get("Y", 0),
|
||||
)
|
||||
|
||||
|
||||
async def run_command(cmd: str):
|
||||
"""
|
||||
Runs the given command returning the exit code, stdout and stderr
|
||||
"""
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await proc.communicate()
|
||||
stdout = stdout.decode().replace('\n', '\n\t').strip()
|
||||
stderr = stderr.decode().replace('\n', '\n\t').strip()
|
||||
|
||||
if proc.returncode != 0:
|
||||
logger.warn(f"Failed to run: '{cmd}")
|
||||
logger.warn(f"stdout:\n{stdout}")
|
||||
logger.warn(f"stderr:\n{stderr}")
|
||||
else:
|
||||
logger.extra_debug(f"stdout:\n{stdout}")
|
||||
logger.extra_debug(f"stderr:\n{stderr}")
|
||||
|
||||
return proc.returncode, stdout, stderr
|
||||
|
||||
|
||||
class VideoQueue(asyncio.Queue):
|
||||
"""A queue that limits the number of bytes it can store rather than discrete entries"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._bytes_sum = 0
|
||||
|
||||
def qsize(self):
|
||||
"""Number of items in the queue."""
|
||||
return self._bytes_sum
|
||||
|
||||
def qsize_files(self):
|
||||
"""Number of items in the queue."""
|
||||
return super().qsize()
|
||||
|
||||
def _get(self):
|
||||
data = self._queue.popleft()
|
||||
self._bytes_sum -= len(data[1])
|
||||
return data
|
||||
|
||||
def _put(self, item: bytes):
|
||||
self._queue.append(item)
|
||||
self._bytes_sum += len(item[1])
|
||||
|
||||
def full(self, item: bytes = None):
|
||||
"""Return True if there are maxsize bytes in the queue.
|
||||
|
||||
optionally if `item` is provided, it will return False if there is enough space to
|
||||
fit it, otherwise it will return True
|
||||
|
||||
Note: if the Queue was initialized with maxsize=0 (the default),
|
||||
then full() is never True.
|
||||
"""
|
||||
if self._maxsize <= 0:
|
||||
return False
|
||||
else:
|
||||
if item is None:
|
||||
return self.qsize() >= self._maxsize
|
||||
else:
|
||||
return self.qsize() + len(item[1]) >= self._maxsize
|
||||
|
||||
async def put(self, item: bytes):
|
||||
"""Put an item into the queue.
|
||||
|
||||
Put an item into the queue. If the queue is full, wait until a free
|
||||
slot is available before adding item.
|
||||
"""
|
||||
while self.full(item):
|
||||
putter = self._loop.create_future()
|
||||
self._putters.append(putter)
|
||||
try:
|
||||
await putter
|
||||
except:
|
||||
putter.cancel() # Just in case putter is not done yet.
|
||||
try:
|
||||
# Clean self._putters from canceled putters.
|
||||
self._putters.remove(putter)
|
||||
except ValueError:
|
||||
# The putter could be removed from self._putters by a
|
||||
# previous get_nowait call.
|
||||
pass
|
||||
if not self.full(item) and not putter.cancelled():
|
||||
# We were woken up by get_nowait(), but can't take
|
||||
# the call. Wake up the next in line.
|
||||
self._wakeup_next(self._putters)
|
||||
raise
|
||||
return self.put_nowait(item)
|
||||
|
||||
def put_nowait(self, item: bytes):
|
||||
"""Put an item into the queue without blocking.
|
||||
|
||||
If no free slot is immediately available, raise QueueFull.
|
||||
"""
|
||||
if self.full(item):
|
||||
raise asyncio.QueueFull
|
||||
self._put(item)
|
||||
self._unfinished_tasks += 1
|
||||
self._finished.clear()
|
||||
self._wakeup_next(self._getters)
|
||||
Reference in New Issue
Block a user