convert to poetry project, fix mypy issues

This commit is contained in:
2023-09-21 17:20:55 +02:00
parent 8c54fd318a
commit cfed5c4807
8 changed files with 636 additions and 26 deletions

3
seafile_mirror/__init__.py Executable file
View File

@@ -0,0 +1,3 @@
# SPDX-FileCopyrightText: 2023 Max Mehl <https://mehl.mx>
#
# SPDX-License-Identifier: Apache-2.0

59
seafile_mirror/_cachedb.py Executable file
View File

@@ -0,0 +1,59 @@
# SPDX-FileCopyrightText: 2023 Max Mehl <https://mehl.mx>
#
# SPDX-License-Identifier: Apache-2.0
"""Functions for cache DB for seafile mirror"""
import json
import logging
from pathlib import Path
def db_read(cachefile) -> dict:
"""Get the cache database file as a dict"""
dbpath = Path(cachefile)
# If DB file exists, return JSON as dict
if dbpath.is_file():
with open(cachefile, "r", encoding="UTF-8") as dbread:
logging.debug("Reading cache file '%s' from disk", cachefile)
cachedb = json.load(dbread)
else:
logging.debug("Cache file '%s' does not exist on disk", cachefile)
cachedb = {}
# Add/Update key containing the local cache file so we can easily access it
cachedb["_cachefile"] = cachefile
return cachedb
def db_write(dbdict):
"""Update/create the cache database file with a dict"""
with open(dbdict["_cachefile"], "w", encoding="UTF-8") as dbwrite:
logging.debug("Writing cache file '%s' to disk", dbdict["_cachefile"])
json.dump(dbdict, dbwrite, indent=2)
# Append newline to file
dbwrite.write("\n")
def db_update(dbdict, libid, **kwargs):
"""Update the cached key/values for a specific library, and write the cache file"""
# Create dict entry for library if it doesn't exist yet
if libid not in dbdict:
dbdict[libid] = {}
for key, value in kwargs.items():
logging.debug(
"Updating '%s' of library '%s' in in-memory cache dictionary", key, libid
)
dbdict[libid][key] = value
db_write(dbdict)
def db_get_library_key(dbdict, libid, key):
"""Get value of requested key from the cache dictionary"""
if libid in dbdict and key in dbdict[libid]:
return dbdict[libid][key]
return None

52
seafile_mirror/_helpers.py Executable file
View File

@@ -0,0 +1,52 @@
# SPDX-FileCopyrightText: 2023 Max Mehl <https://mehl.mx>
#
# SPDX-License-Identifier: Apache-2.0
"""Misc helper functions for seafile mirror"""
import logging
import socket
import sys
def get_lock(process_name):
"""Get the lock for this program to avoid double-execution"""
# Without holding a reference to our socket somewhere it gets garbage
# collected when the function exits
# pylint: disable=protected-access
get_lock._lock_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
# The null byte (\0) means the socket is created
# in the abstract namespace instead of being created
# on the file system itself.
# Works only in Linux
# pylint: disable=protected-access
get_lock._lock_socket.bind("\0" + process_name)
except socket.error:
logging.critical("This script is already executed in another instance. Abort.")
sys.exit(1)
def findstring(text, string):
"""Check if a certain string exists in an output"""
if text.find(string) >= 0:
return True
return False
def countlines(string: str) -> int:
"""Count number of lines in a variable"""
return len(string.splitlines())
def convert_bytes(size):
"""Convert bytes to KB, MB etc depending on size"""
power = 1024
level = 0
labels = {0 : 'B', 1: 'KB', 2: 'MB', 3: 'GB', 4: 'TB'}
while size > power:
size /= power
level += 1
return f"{round(size, 2)} {labels[level]}"

266
seafile_mirror/_seafile.py Executable file
View File

@@ -0,0 +1,266 @@
# SPDX-FileCopyrightText: 2023 Max Mehl <https://mehl.mx>
#
# SPDX-License-Identifier: Apache-2.0
"""Functions specific to Seafile for seafile mirror"""
import datetime
import logging
import subprocess
import sys
from time import sleep
from ._cachedb import db_get_library_key, db_update
# Constants
# Seafile CLI command
CMD = "seaf-cli"
def sf_runcmd(auth: list, *arguments: str) -> str:
"""Run a seaf-cli command and return the output (stdout)"""
# Build command list
# base command (seaf-cli)
cmdargs = [CMD]
# Arguments
for arg in arguments:
cmdargs.append(arg)
# Optional authentication as list
# "-s", server, "-u", user, "-p", password
if auth:
cmdargs.extend(["-s", auth[0], "-u", auth[1], "-p", auth[2]])
# run command
ret = subprocess.run(cmdargs, capture_output=True, check=False)
# check for errors
if ret.returncode != 0:
logging.error("The command '%s' returned an error: %s", ret.args, ret.stderr)
return ""
return ret.stdout.decode("UTF-8")
def sf_parse(output: str, fromcommand: str) -> list:
"""Parse the output of `list` and `status`, return a list of dicts"""
libs = []
# Read line by line, skip first line
for lib in output.splitlines()[1:]:
# If list, split is by " ", and structure is name,id,dir
if fromcommand == "list":
lib_info = [x.strip() for x in lib.split(" ")]
lib_dict = {"name": lib_info[0], "id": lib_info[1], "dir": lib_info[2]}
libs.append(lib_dict)
# If status, split is by "\t", and structure is name,status,progress
elif fromcommand == "status":
lib_tmp = [x.strip() for x in lib.split("\t")]
lib_dict = {"name": lib_tmp[0], "status": lib_tmp[1]}
# Add progress if it exists
try:
lib_dict["progress"] = lib_tmp[2]
except IndexError:
lib_dict["progress"] = ""
libs.append(lib_dict)
return libs
def sf_getstatus(libname: str) -> dict:
"""Return the current status of a library (name, status, progress)"""
# Get output of `status` and parse it
libsstatus_cmd = sf_runcmd([], "status")
libsstatus = sf_parse(libsstatus_cmd, "status")
# In the list of statuses, get the one for the requested library. None if no match
status = next((item for item in libsstatus if item["name"] == libname), None)
# Handle if the library does not appear in `status`. Usually directly after starting to sync it
if not status:
logging.debug("Status for %s cannot be retrieved", libname)
# Construct a status dict
status = {"name": libname, "status": None, "progress": None}
return status
def sf_desync_all(cache):
"""Desync all libraries that are in `list` and `status`"""
# Firstly, go through libslist
libslist = sf_runcmd(None, "list")
libslist = sf_parse(libslist, "list")
# If libraries found in `list`, desync them
if libslist:
logging.warning(
"There are still %s local synced libraries. Desyncing them...",
len(libslist),
)
for lib in libslist:
# Check if the cache status of the lib is still `started`. If so,
# update the lastsync date
if db_get_library_key(cache, lib["id"], "status") == "started":
logging.debug(
"Library %s is synced but the cache file hasn't been updated "
"with the last sync date. Setting it to 'now'",
lib["name"],
)
sf_bump_cache_status(cache, lib["id"], status="synced")
# Desync library
logging.debug("Desyncing library %s stored in %s", lib["name"], lib["dir"])
sf_runcmd(None, "desync", "-d", lib["dir"])
# Secondly, go through libstatus
# We cannot desync libraries that are in `status` but not `list`, so error out
libsstatus = sf_runcmd(None, "status")
libsstatus = sf_parse(libsstatus, "status")
# If libraries found in `status`, return informative errors and abort
if libsstatus:
logging.error(
"There are still %s local libraries currently downloading but not listed yet:",
len(libsstatus),
)
for lib in libsstatus:
logging.error(
"- %s with the current status '%s' and progress '%s'",
lib["name"],
lib["status"],
lib["progress"],
)
logging.critical(
"Exiting application because we cannot resync at least one library, see errors above."
)
sys.exit(1)
def sf_waitforsynced(libname) -> float:
"""Regularly check status of the library that started to sync"""
libsynced = False
syncwaitmins: float = 0
nostatus, nostatus_limit = 0, 10
while libsynced is not True:
libstatus = sf_getstatus(libname)
# If we have some status information, act upon it
# If not, we already informed about being unable to retrieve it and also wait
if libstatus["status"]:
# Reset status fails
nostatus = 0
# If synchronised, inform and end loop
if libstatus["status"] == "synchronized":
logging.debug(
"Library %s has been fully synchronised after %s minutes",
libname,
round(syncwaitmins),
)
libsynced = True
# If not synchronised yet, report on status
else:
logging.debug(
"Library %s is not fully synchronised yet. "
"Current status: %s with progress: %s",
libname,
libstatus["status"],
libstatus["progress"],
)
# Status is None, which is fine a few times. But if it happens too often
# (`nostatus_limit`), we'll restart seaf-cli as it's probably hung up
else:
# Increment number of failed status retrievals
nostatus += 1
if nostatus >= nostatus_limit:
logging.warning(
"Library %s didn't appear in status %s times. Restarting seaf-cli daemon...",
libname,
nostatus_limit,
)
sf_runcmd([], "stop")
sf_runcmd([], "start")
# If library not synced yet or no status available, wait
if not libsynced:
# Decide how long to wait for next `status` check depending on how
# often we tried before
if syncwaitmins < 1:
# wait 15 seconds for 1 minute in total
sleep(15)
syncwaitmins += 0.25
elif syncwaitmins < 10:
# wait 60 seconds for 10 minutes in total
sleep(60)
syncwaitmins += 1
else:
# wait 120 seconds
sleep(120)
syncwaitmins += 2
return syncwaitmins
def sf_bump_cache_status(dbdict, libid, status, duration=0) -> None:
"""Update the sync state of a library in the cache database"""
logging.debug("Updating cache for library '%s' to status '%s'", libid, status)
# Library has been successfully synced
if status == "synced":
lastsync = datetime.datetime.now() - datetime.timedelta(minutes=duration + 2)
db_update(
dbdict,
libid,
status=status,
lastsync=lastsync.isoformat(),
)
# Library sync has just been started
if status == "started":
db_update(dbdict, libid, status=status)
def sf_lastsync_old_enough(dbdict, libid, force, resyncinterval) -> bool:
"""Find out if lastsync time of library is older than resyncinterval"""
# Get lastsync key from cache for this library
lastsync = db_get_library_key(dbdict, libid, "lastsync")
# Check if there actually has been an entry for the last sync
if lastsync:
# Convert to datetime object
lastsync = datetime.datetime.fromisoformat(lastsync)
# Test if time difference (hours) is smaller than resyncinterval
if datetime.datetime.now() - lastsync < datetime.timedelta(days=resyncinterval):
logging.debug(
"Last sync of library '%s' is newer than limit (%s days)",
libid,
resyncinterval,
)
if force:
logging.info(
"Last sync of library '%s' is newer than limit (%s days), "
"but sync is enforced.",
libid,
resyncinterval,
)
return True
return False
# time difference is larger than resyncinterval
logging.debug(
"Last sync of library '%s' is older than limit (%s days)",
libid,
resyncinterval,
)
return True
# The library has never been synced before (lastsync = None)
logging.debug(
"Library '%s' seems to not have been synced before",
libid,
)
return True

224
seafile_mirror/seafile_mirror.py Executable file
View File

@@ -0,0 +1,224 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2023 Max Mehl <https://mehl.mx>
#
# SPDX-License-Identifier: Apache-2.0
"""Handle clean read-only (re-)syncs of Seafile libraries to mirror them"""
import argparse
import logging
import shutil
from pathlib import Path
from time import sleep
import yaml
from ._cachedb import db_read
from ._helpers import convert_bytes, findstring, get_lock
from ._seafile import (
sf_bump_cache_status,
sf_desync_all,
sf_lastsync_old_enough,
sf_runcmd,
sf_waitforsynced,
)
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("-c", "--configdir", required=True, help="The config directory")
parser.add_argument(
"-d",
"--dry",
action="store_true",
default=False,
help="Do not modify anything. Useful for being informed about which "
"libraries are due to be synced",
)
parser.add_argument(
"-f",
"--force",
action="store_true",
default=False,
help="Force re-sync of libraries even if they are newer than the configured limit",
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
default=False,
help="Print and log DEBUG messages",
)
def main():
"""Main function"""
args = parser.parse_args()
# Set files depending on configdir
configdir = args.configdir.rstrip("/") + "/"
configfile = configdir + "seafile_mirror.conf.yaml"
cachefile = configdir + ".seafile_mirror.db.json"
logfile = configdir + "seafile_mirror.log"
# Logging
log = logging.getLogger()
logging.basicConfig(
encoding="utf-8",
format="[%(asctime)s] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
# Log to file and stdout
handlers=[
logging.FileHandler(logfile),
logging.StreamHandler(),
],
)
# Set loglevel based on --verbose flag
if args.verbose:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
# Get lock for this process
get_lock("seafile_backup")
# Read configfile
with open(configfile, "r", encoding="UTF-8") as yamlfile:
config = yaml.safe_load(yamlfile)
# Populate cache dictionary
cache = db_read(cachefile)
# Check if there are still libraries in `list` or `status`. Desync them if
# possible. Do not run in dry-run
if not args.dry:
sf_desync_all(cache)
# Create list of libraries we handle(d) for final output
libsdone = {"libs": [], "bytes": 0, "time": 0}
# Go through users in config
for access in config:
# Setting variables for this server/user/pass combination
server = access["server"]
user = access["user"]
password = access["password"]
resyncinterval = access["resync_interval_days"]
authlist = [server, user, password]
logging.info(
"Checking all libraries for user %s on server %s for "
"whether they are due for a re-sync",
user,
server,
)
# Get remotely available libraries
remotelibs = sf_runcmd(authlist, "list-remote")
for lib in access["libs"]:
# Setting variables for this library
libdir = Path(lib["dir"])
libname = lib["name"]
libid = lib["id"]
# Set resync interval if there is a lib-specific setting. Otherwise default
libresyncinterval = (
lib["resync_interval_days"]
if "resync_interval_days" in lib
else resyncinterval
)
# Check if last sync of library is older than resync_interval_days
if sf_lastsync_old_enough(cache, libid, args.force, libresyncinterval):
logging.info(
"Starting to re-sync library %s (%s) to %s", libname, libid, libdir
)
else:
logging.info(
"Local mirror of library %s (%s) at %s is still recent enough. Skipping it.",
libname,
libid,
libdir,
)
continue
# Check if desired library exists remotely
if findstring(remotelibs, libid):
logging.debug("The library %s exists remotely. Continuing...", libname)
else:
# If the library does not exist remotely, we don't continue
# Otherwise, we would delete data which cannot be retrieved again!
logging.warning(
"The library %s does not exist remotely. Aborting resyncing this library.",
libname,
)
# Start next iteration of loop (next library)
continue
if args.dry:
logging.info(
"Running in dry run mode. Aborting resync of library %s which would happen now",
libname,
)
continue
# Delete libdir if it exists
if libdir.exists() and libdir.is_dir():
logging.debug("Deleting library directory %s", libdir)
shutil.rmtree(libdir)
else:
logging.debug("Library directory did not exist before: %s", libdir)
# Re-create directory
logging.debug("Creating library directory %s", libdir)
Path(libdir).mkdir(parents=True, exist_ok=True)
# Trigger sync of library
logging.debug("Starting to sync library %s to %s", libname, libdir)
sf_runcmd(authlist, "sync", "-l", libid, "-d", libdir)
sf_bump_cache_status(cache, libid, status="started")
# Sleep a second to populate `status`
sleep(1)
# Check regularly how the syncing progress is and wait for it to finish
syncduration = sf_waitforsynced(libname)
# Library is synchronised, now we desync it again
logging.debug(
"Desyncing library %s stored at %s after it has been synced",
libname,
libdir,
)
sf_runcmd(None, "desync", "-d", libdir)
# Get size of directory (libdir) in bytes
# Note: this is not fully equivalent with what `du` would show. It's
# caused by the fact that `du` considers filesystem block sizes
libdirsize = sum(
f.stat().st_size for f in libdir.glob("**/*") if f.is_file()
)
# Update libsdone and cache
libsdone["libs"].append(libname)
libsdone["bytes"] += libdirsize
libsdone["time"] += syncduration
sf_bump_cache_status(cache, libid, status="synced", duration=syncduration)
logging.info(
"Library %s (%s) has been re-synced to %s. Duration: %s minutes. Size: %s",
libname,
libid,
libdir,
round(syncduration),
convert_bytes(libdirsize),
)
logging.info(
"Fully re-synced the following libraries: %s. Total duration: %s minutes. Total size: %s",
", ".join(libsdone["libs"]),
round(libsdone["time"]),
convert_bytes(libsdone["bytes"]),
)
if __name__ == "__main__":
main()