Commit 22802797 authored by Nigel Kukard's avatar Nigel Kukard
Browse files

Initial commit


Signed-off-by: Nigel Kukard's avatarNigel Kukard <nkukard@lbsd.net>
parents
.mypy_cache/
.pytest_cache/
__pycache__/
*.pyc
\ No newline at end of file
This diff is collapsed.
# AllWorldIT Backstep
AWIT-Backstep is a backup system that allows for multiple methods of backup and multiple datasources along with automated backups.
\ No newline at end of file
"""AWIT Backstep main package."""
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# Copyright (C) 2019-2020, AllWorldIT.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import argparse
import configparser
# import fcntl
import os
import re
import sys
from typing import Dict, List, Optional, Sequence
from .datasources import DatasourcePluginBase
from .notifiers import NotifierList, NotifierPluginBase
from .plugin import PluginCollection
__VERSION__ = '0.2.0'
PID_FILE = '/run/awit-backstep.lock'
class BackstepArgumentParserException(RuntimeError):
"""Exception raised when parsing arguments."""
class BackstepArgumentParser(argparse.ArgumentParser):
"""
Backstep argument parser.
We need to override the error handling so it works for the configuration file options too instead of printing out help.
"""
def error(self, message):
"""Error exception generator."""
raise BackstepArgumentParserException(message)
def error_cmdline(self, message):
"""Invoke the super error handler."""
super().error(message)
class Backstep(): # noqa: R0902
"""Main Backstep application object."""
# Internal plugin collections
_datasource_plugins: PluginCollection
_method_plugins: PluginCollection
_notifier_plugins: PluginCollection
# Mapping of the datasource and notifier name to the package name
_datasource_mapping: Dict[str, str]
_notifier_mapping: Dict[str, str]
# Mapping of actions (first positional argument), to package names
_actions: Dict[str, str]
# This is the datasource and notifier list we're currently using
_datasource: DatasourcePluginBase
_notifiers: NotifierList
# Argument parser
_parser: BackstepArgumentParser
def __init__(self):
"""Initialize object."""
# Grab our plugins
self._datasource_plugins = PluginCollection(["awit_backstep.datasources"])
self._method_plugins = PluginCollection(["awit_backstep.methods"])
self._notifier_plugins = PluginCollection(["awit_backstep.notifiers"])
# Map in datasources and notifiers
self._datasource_mapping = {}
self._notifier_mapping = {}
# We start off with no command actions...
self._actions = {}
# Active datasource and notifier list we're using
self._datasource = None
self._notifiers = NotifierList()
# Start off with a clean argument parser
self._parser = BackstepArgumentParser()
# Add mappings between the name and package name
self._map_datasources()
self._map_notifiers()
# Register argument parsers
self._register_parsers()
def run(self):
"""Run Backstep."""
# Parse args
try:
command_args = self._parse_args()
except BackstepArgumentParserException as err:
# Print out the normal commandline error (and exit)
self._parser.error_cmdline(err.args[0])
# Only enable traceback in DEBUG mode
if command_args['verbose'] != 'DEBUG':
sys.tracebacklimit = 0
# Try lock
# try:
# lockfile_handle = open(PID_FILE, 'w')
# fcntl.lockf(lockfile_handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
# except OSError as e:
# raise RuntimeError(f'Error locking, only one instance of AWIT-Backstep should run at a time: {e}') from None
# Execute plugin responsible for this action
if command_args['action'] == 'auto-backup':
self._parse_config_file(command_args['config'])
elif command_args['action'] == 'show-plugins':
self._show_plugins()
else:
self.execute_plugin(command_args['action'], command_args)
def _register_parsers(self):
"""Register parsers and parse the commandline arguments."""
# Create argument parser
subparsers = self._parser.add_subparsers()
# Create argument parser for datasource
datasource_parser = argparse.ArgumentParser(add_help=False)
datasource_parser.add_argument('--datasource', required=True, choices=self._datasource_mapping.keys(),
help='Datasource plugin to use')
# Create argument parser for notifier
notifier_parser = argparse.ArgumentParser(add_help=False)
# Add automated backup parser
autobackup_parser = subparsers.add_parser('auto-backup', description='Command: auto-backup')
autobackup_parser.add_argument('--action', action='store_const', const='auto-backup',
default='auto-backup', help=argparse.SUPPRESS)
autobackup_parser.add_argument('--config', required=True, metavar='CONFIG_FILE',
help='Automated backup configuration file')
# Add automated backup parser
showplugins_parser = subparsers.add_parser('show-plugins', description='Command: show-plugins')
showplugins_parser.add_argument('--action', action='store_const', const='show-plugins',
default='show-plugins', help=argparse.SUPPRESS)
# Call each notifier plugin to register additonal parser options
self._notifier_plugins.call('register_parser', notifier_parser=notifier_parser)
# Call each method plugin to register parsers
for plugin_name, plugin_actions in self._method_plugins.call('register_parser',
subparsers=subparsers,
datasource_parser=datasource_parser,
notifier_parser=notifier_parser).items():
# Loop with each command returned and add a link to the plugin action
for action in plugin_actions:
self._actions[action] = plugin_name
# Add logging level option
self._parser.add_argument('-v', dest='verbose', default='INFO', type=str,
choices=['DEBUG', 'INFO', 'NOTICE', 'WARNING', 'ERROR', 'CRITICAL'],
help='Logging level')
def _parse_args(self, args: Optional[Sequence[str]] = None, notifiers: Optional[NotifierList] = None):
"""Register parsers and parse the commandline arguments."""
# Parse commandline arguments
command_args = vars(self._parser.parse_args(args))
# If we don't have an action, we should display our usage
if 'action' not in command_args:
self._parser.print_usage()
sys.exit(1)
# Save the datasource if one was specified on the commandline
if 'datasource' in command_args:
self._datasource = self._datasource_from_name(command_args['datasource'])
if notifiers:
_use_notifiers = notifiers
else:
_use_notifiers = self._notifiers
# Check which notifiers we'll be using
self._notifier_plugins.call('use_if_enabled', command_args=command_args, notifiers=_use_notifiers)
# Set notifier verbosity
_use_notifiers.verbosity = command_args['verbose']
return command_args
def _parse_config_file(self, config_file: str): # noqa: C901
"""Parse config file into command_args structure."""
if not os.path.exists(config_file):
raise RuntimeError(f'ERROR: Config file "{config_file}" not found')
self._notifiers.info(f'Using configuration from file "{config_file}')
parser = configparser.ConfigParser(allow_no_value=True)
# Try load config file
try:
parser.read(config_file)
except configparser.Error as err:
raise RuntimeError(f'ERROR: Failed to parse config file: {err}')
# Loop with each section, which are our BACKUP_NAME's below
for section in parser.sections():
# This is the arguments we'll pass to our own parser
auto_args: List[str] = []
# This is the list of backup items added to the end of auto_args
backup_items: List[str] = []
self._notifiers.debug(f' Backup: {section}')
# Loop with each key/value pair and generate arguments
for key, value in parser.items(section):
# Replace variables
value = value.replace('$BACKUP_NAME', section)
# Check for things handled specially
if key == 'method':
auto_args.insert(0, value)
continue
if key == 'backup-items':
backup_items = re.split(r'\n+', value.strip())
continue
# If the value is 'enabled' we remove it and just add the key as an option
if value == 'enabled':
auto_args.append(f'--{key}')
else:
auto_args.extend([f'--{key}', value])
# Log what we found
self._notifiers.debug(f' {key} = {value}')
# Add archive name
auto_args.extend([f'--backup-name', section])
# Add backup items to the end of the arg list
auto_args.extend(backup_items)
self._notifiers.debug(f'Parsed config file args: {auto_args}')
# Work out a list of notifiers for this backup...
notifiers = NotifierList()
# Try and parse the arguments we built
try:
command_args = self._parse_args(args=auto_args, notifiers=notifiers)
except BackstepArgumentParserException as err:
# Clean up message (replace XXX-- with XXX=)
message = re.sub(r'--([0-9a-z\-]+)', r'\1=', err.args[0])
# Change wording slightly
message = re.sub(r'arguments(:| are)', r'options\1', message)
print(f'ERROR: auto-backup failed => {message}')
sys.exit(1)
self._notifiers.notice(f'>>> Running automated backup for: {section}')
self._notifiers.debug(f'ARGS: {command_args}')
# Execute the plugin
self.execute_plugin(action=command_args['action'], command_args=command_args, notifiers=notifiers)
def _map_datasources(self):
"""Map our datasources."""
for plugin_name, datasource in self._datasource_plugins.call('register_datasource').items():
self._datasource_mapping[datasource] = plugin_name
def _map_notifiers(self):
"""Map our notifiers."""
for plugin_name, notifier in self._notifier_plugins.call('register_notifier').items():
self._notifier_mapping[notifier] = plugin_name
def _datasource_from_name(self, name: str) -> DatasourcePluginBase:
"""
Return a datasource from its' name.
:param name: Name of the datasource
:type name: str
"""
# Make sure the datasource exists
if name not in self._datasource_mapping:
raise RuntimeError(f'ERROR: Datasource "{name}" not found')
# Return the datasource based on the mapping to package name and resolution to object
return self._datasource_plugins.get(self._datasource_mapping[name])
def _notifier_from_name(self, name: str) -> NotifierPluginBase:
"""
Return a notifier from its' name.
:param name: Name of the notifier
:type name: str
"""
# Make sure the notifier exists
if name not in self._notifier_mapping:
raise RuntimeError(f'ERROR: Notifier "{name}" not found')
# Return the notifier based on the mapping to package name and resolution to object
return self._notifier_plugins.get(self._notifier_mapping[name])
def _show_plugins(self):
"""Display a plugin summary of what is loaded and what not."""
print(f'PLUGINS STATUS')
print(f'\n Notifiers:')
for plugin, status in self._notifier_plugins.plugin_status.items():
print(f' {plugin}: {status}')
print(f'\n Datasources:')
for plugin, status in self._datasource_plugins.plugin_status.items():
print(f' {plugin}: {status}')
print(f'\n Methods:')
for plugin, status in self._method_plugins.plugin_status.items():
print(f' {plugin}: {status}')
def execute_plugin(self, action: str, command_args: Dict[str, str], notifiers: Optional[NotifierList] = None):
"""
Execute plugin action.
:param action: Action name to call, this is registered on the commandline as a positional argument
:type action: str
:param command_args: Arguments to pass to the plugin
:type command_args: Dict[str, str]
:param notifiers: Optional notifiers to use
:type notifiers: NotifierList
"""
# Grab the action we're going to execute
method = self._method_plugins.get(self._actions[action])
# Check which notifiers we're going to use
if notifiers:
# The ones we got as an override
_use_notifiers = notifiers
else:
# Or internal
_use_notifiers = self._notifiers
# Run the backup method
method.run(command=action, notifiers=_use_notifiers, command_args=command_args, datasource=self._datasource)
# Main entry point from the commandline
def main():
"""Entry point function for the commandline."""
print(f'AWIT-Backstep v{__VERSION__} - Copyright 2019-2020, AllWorldIT')
backstep = Backstep()
backstep.run()
"""AWIT Backstep datasources."""
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# Copyright (C) 2019-2020, AllWorldIT.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import List, Optional
from ..notifiers import NotifierList
from ..plugin import Plugin
class DatasourceError(RuntimeError):
"""Datasource error exception."""
class BackupArchive():
"""
Create a backup archive.
:param name: Name of the backup archive
:type name: str
:param paths: Optional list of filesystem paths to initialize the archive with
:type paths: List[str]
"""
_name: str
_paths: List[str]
_notifiers: Optional[NotifierList]
def __init__(self, name: str, paths: Optional[List[str]] = None, notifiers: Optional[NotifierList] = None):
"""
Create a backup archive.
:param name: Name of the backup archive
:type name: str
:param paths: Optional list of filesystem paths to initialize the archive with
:type paths: List[str]
"""
self._name = name
self._paths = []
self._notifiers = notifiers
if paths:
self.add_paths(paths)
def add_path(self, path: str):
"""
Add a path to the backup archive.
:param path: Path to add
:type path: str
"""
self._paths.append(path)
def add_paths(self, paths: List[str]):
"""
Add a list of paths to the backup archive.
:param paths: Path to add
:type paths: List[str]
"""
self._paths.extend(paths)
def pre_backup(self):
"""Prepare the backup archive for backup."""
def post_backup(self):
"""Cleanup the backup archive after backup."""
@property
def name(self):
"""Return the name of this archive."""
return self._name
@property
def paths(self):
"""Return the paths associated with this archive."""
return self._paths
@property
def notifiers(self):
"""Return our notifiers."""
return self._notifiers
@notifiers.setter
def notifiers(self, notifiers: NotifierList):
"""Set our notifiers."""
self._notifiers = notifiers
class BackupSet():
"""Backup set class."""
_archives: List[BackupArchive]
def __init__(self):
"""Backup set class."""
self._archives = []
def add_archive(self, archive: BackupArchive):
"""
Add an archive to the backup set.
:param archive: Backup archive to add
:type archive: BackupArchive
"""
self._archives.append(archive)
@property
def archives(self):
"""Return the archives associated with this backup set."""
return self._archives
class DatasourcePluginBase(Plugin):
"""Datasource plugin."""
_notifiers: NotifierList
_name: str
def get_backup_set(self, backup_items: List[str], backup_name: Optional[str]) -> BackupSet:
"""Parse backup items into a list of paths."""
raise NotImplementedError
def register_datasource(self, **kwargs) -> str: # noqa: W0613
"""Register the datasource."""
return self.name
@property
def name(self) -> str:
"""
Name property.
:returns: Name of notifier
:rtype: str
"""
return self._name
@property
def notifiers(self):
"""Return our notifiers."""
return self._notifiers
@notifiers.setter
def notifiers(self, notifiers: NotifierList):
"""Set our notifiers."""
self._notifiers = notifiers
"""AWIT Backstep filesystem datasource."""
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# Copyright (C) 2019-2020, AllWorldIT.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import List, Optional
from . import BackupArchive, BackupSet, DatasourcePluginBase
__VERSION__ = '0.0.1'
class FilesystemPlugin(DatasourcePluginBase):
"""Filesystem Plugin."""
_name = 'filesystem'
def get_backup_set(self, backup_items: List[str], backup_name: Optional[str]) -> BackupSet:
"""Parse the backup items into backup archives and paths."""
# Create backup set
backup_set = BackupSet()
# Work out archive name...
if backup_name:
archive_name = f'{backup_name}-filesystem'
else:
archive_name = 'filesystem'
filesystem_archive = BackupArchive(name=archive_name, paths=backup_items)
backup_set.add_archive(filesystem_archive)
return backup_set
"""AWIT Backstep LibvirtDomain datasource."""
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# Copyright (C) 2019-2020, AllWorldIT.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import subprocess # nosec
from datetime import datetime
from typing import Dict, List, Optional
from xml.etree import ElementTree # nosec
import libvirt
from ..notifiers import NotifierList
from ..util import chunks
from . import BackupArchive, BackupSet, DatasourceError, DatasourcePluginBase
__VERSION__ = '0.1.0'
class LibvirtBackupArchive(BackupArchive):
"""Libvirt domain class."""
_snapshot_files: Dict[str,