This repository has been archived on 2026-04-16. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
ansible-vault-tools/ansible-vault-tools.py

276 lines
8.0 KiB
Python
Raw Normal View History

2023-11-23 10:28:11 +01:00
#!/usr/bin/env python3
"""Encrypt or decrypt using Ansible-vault and Ansible"""
# SPDX-FileCopyrightText: 2023 Max Mehl <https://mehl.mx>
#
# SPDX-License-Identifier: Apache-2.0
2023-12-06 12:51:34 +01:00
# pylint: disable=invalid-name
2023-11-23 10:28:11 +01:00
import argparse
2023-12-06 12:51:34 +01:00
import json
import os
import re
2023-12-06 12:51:34 +01:00
import subprocess
import sys
2023-11-23 10:28:11 +01:00
parser = argparse.ArgumentParser(description=__doc__)
subparsers = parser.add_subparsers(title="commands", dest="command", required=True)
# Encrypt arguments
parser_encrypt = subparsers.add_parser(
"encrypt",
2023-12-06 13:39:52 +01:00
help="Encrypt a string or file using ansible-vault",
2023-11-23 10:28:11 +01:00
)
2023-12-06 12:51:34 +01:00
encrypt_flags = parser_encrypt.add_mutually_exclusive_group(required=True)
encrypt_flags.add_argument(
2023-11-23 10:28:11 +01:00
"-s",
"--string",
help="String that shall be encrypted",
dest="encrypt_string",
nargs="?",
const="",
2023-11-23 10:28:11 +01:00
)
2023-12-06 12:51:34 +01:00
encrypt_flags.add_argument(
"-f",
"--file",
help="File that shall be encrypted",
dest="encrypt_file",
)
2023-11-23 10:28:11 +01:00
# Decrypt arguments
parser_decrypt = subparsers.add_parser(
"decrypt",
2023-12-06 13:39:52 +01:00
help="Decrypt a string or file using ansible-vault",
2023-11-23 10:28:11 +01:00
)
2023-12-06 12:51:34 +01:00
decrypt_flags = parser_decrypt.add_mutually_exclusive_group(required=True)
decrypt_flags.add_argument(
2023-11-23 10:28:11 +01:00
"-H",
"--host",
2023-12-06 13:39:52 +01:00
help=(
"Host name from Ansible inventory for which you want to get a specific variable. "
2023-12-06 13:39:52 +01:00
"Also supports 'all'"
),
2023-11-23 10:28:11 +01:00
dest="decrypt_host",
nargs="?",
const="",
2023-11-23 10:28:11 +01:00
)
2023-12-06 12:51:34 +01:00
decrypt_flags.add_argument(
"-f",
"--file",
help="File that shall be decrypted",
dest="decrypt_file",
)
2023-11-23 10:28:11 +01:00
parser_decrypt.add_argument(
"-v",
"--var",
help="Variable you want to print",
dest="decrypt_var",
)
2023-12-06 13:39:52 +01:00
# All variables
parser_allvars = subparsers.add_parser(
"allvars",
help="Print all variables of a host",
)
parser_allvars.add_argument(
"-H",
"--host",
help=(
"Host name from Ansible inventory for which you want to get all variables. "
2023-12-06 13:39:52 +01:00
"Also supports 'all'"
),
dest="allvars_host",
)
2023-11-23 10:28:11 +01:00
2023-12-06 12:11:28 +01:00
def convert_ansible_errors(error: str) -> str:
"""Convert typical Ansible errors to more user-friendly messages"""
2023-12-06 13:43:15 +01:00
if "VARIABLE IS NOT DEFINED" in error:
2023-12-06 12:11:28 +01:00
return "(undefined variable)"
# If no conversion was possible, return the original error
return error
2023-12-06 12:51:34 +01:00
def ask_for_confirm(question: str) -> bool:
"""Ask for confirmation.
Args:
question (str): The question to ask the user.
Returns:
bool: True if the user confirms with 'y', False otherwise.
"""
while True:
answer = input(f"{question} [y/n]: ").lower().strip()
if answer in ("y", "n"):
break
print("Invalid input. Please enter 'y' or 'n'.")
return answer == "y"
2023-12-06 16:38:23 +01:00
def format_data(data: dict) -> str:
"""Format data nicely in columns"""
if len(data) > 1:
max_key_length = max(len(key) for key in data.keys())
formatted_strings = [f"{key.ljust(max_key_length)}: {value}" for key, value in data.items()]
else:
# If only one host, return the single value
2023-12-29 11:25:05 +01:00
formatted_strings = [f"{value}" for _, value in data.items()]
2023-12-06 16:38:23 +01:00
return "\n".join(formatted_strings)
def rewrap_text(text: str) -> str:
"""Replace lines starting with exactly 8 spaces with 2 spaces"""
return re.sub(r"(?m)^ {8}", "", text)
2023-12-06 13:39:52 +01:00
def encrypt_string(password: str) -> str:
2023-11-23 10:28:11 +01:00
"""Encrypt string with ansible-vault"""
result = subprocess.run(
["ansible-vault", "encrypt_string"],
input=password,
text=True,
capture_output=True,
2023-12-06 12:51:34 +01:00
check=False,
2023-11-23 10:28:11 +01:00
)
return rewrap_text(result.stdout.strip())
2023-11-23 10:28:11 +01:00
2023-12-06 16:38:23 +01:00
def encrypt_file(filename: str) -> str:
"""Encrypt a file with ansible-vault"""
2023-12-06 12:11:28 +01:00
2023-12-06 16:38:23 +01:00
if not os.path.exists(filename):
sys.exit(f"ERROR: File '{filename}' does not exist")
2023-12-06 12:11:28 +01:00
2023-12-06 16:38:23 +01:00
encrypted_return = subprocess.run(
["ansible-vault", "encrypt", filename], check=False, capture_output=True
)
if encrypted_return.returncode != 0:
sys.exit(
f"ERROR: Could not encrypt file '{filename}'. This is the error:"
f"\n{encrypted_return.stderr.decode()}"
)
return f"Encrypted '{filename}' successfully"
def decrypt_string(host, var) -> str:
"""Decrypt/print a variable from one or multiple hosts"""
# Run ansible msg for variable
# Send return as JSON
ansible_command = ["ansible", host, "-m", "debug", "-a", f"var={var}"]
ansible_env = {
"ANSIBLE_LOAD_CALLBACK_PLUGINS": "1",
"ANSIBLE_STDOUT_CALLBACK": "json",
}
2024-02-29 09:54:38 +01:00
try:
result = subprocess.run(
ansible_command, env=ansible_env, capture_output=True, text=True, check=True
)
except subprocess.CalledProcessError as e:
sys.exit(f"Decrypting the variable failed: {e.stderr}")
2023-12-06 16:38:23 +01:00
# Parse JSON
try:
ansible_output = json.loads(result.stdout)["plays"][0]["tasks"][0]["hosts"]
except IndexError:
sys.exit(f"ERROR: Host '{host}' not found.")
# Attempt to create a :-separated list of host/values
output = {}
for hostname, values in ansible_output.items():
output[hostname] = convert_ansible_errors(values[var])
return format_data(output)
2023-12-06 12:11:28 +01:00
2023-12-06 12:59:04 +01:00
def decrypt_file(filename: str) -> str:
2023-12-06 12:51:34 +01:00
"""Decrypt file with ansible-vault"""
if not os.path.exists(filename):
sys.exit(f"ERROR: File '{filename}' does not exist")
decrypted_content = subprocess.run(
["ansible-vault", "decrypt", "--output", "-", filename], check=False, capture_output=True
)
if decrypted_content.returncode != 0:
sys.exit(
f"ERROR: Could not decrypt file '{filename}'. This is the error:"
f"\n{decrypted_content.stderr.decode()}"
)
print(decrypted_content.stdout.decode().strip())
if ask_for_confirm("Shall I write the encrypted content as seen above to the file?"):
decrypted_content = subprocess.run(
["ansible-vault", "decrypt", filename], check=True, capture_output=True
)
2023-12-06 12:59:04 +01:00
return f"Decrypted '{filename}' successfully"
2023-12-06 13:02:42 +01:00
return f"File '{filename}' was not changed"
2023-12-06 12:59:04 +01:00
2023-12-06 13:39:52 +01:00
def allvars(host: str) -> str:
"""Decrypt/print all variables from one or multiple hosts"""
# Run ansible var for all host vars as seen from localhost
# Send return as JSON
ansible_command = ["ansible", "localhost", "-m", "debug", "-a", "var=hostvars"]
ansible_env = {
"ANSIBLE_LOAD_CALLBACK_PLUGINS": "1",
"ANSIBLE_STDOUT_CALLBACK": "json",
}
result = subprocess.run(
ansible_command, env=ansible_env, capture_output=True, text=True, check=False
)
# Reduce JSON
ansible_output = json.loads(result.stdout)["plays"][0]["tasks"][0]["hosts"]["localhost"][
"hostvars"
]
# If only a specific host was requested, reduce the output to that host
if host != "all":
try:
ansible_output = ansible_output[host]
except KeyError:
sys.exit(f"ERROR: Host '{host}' not found.")
return json.dumps(ansible_output, indent=2)
2023-11-23 10:28:11 +01:00
def main():
"""Main function"""
args = parser.parse_args()
2023-12-06 12:51:34 +01:00
output = ""
2023-11-23 10:28:11 +01:00
2023-12-06 12:51:34 +01:00
# ENCRYPTION
2023-11-23 10:28:11 +01:00
if args.command == "encrypt":
if args.encrypt_string is not None:
2023-12-06 12:51:34 +01:00
password = input("Enter string: ") if not args.encrypt_string else args.encrypt_string
output = encrypt_string(password)
elif args.encrypt_file:
filename = input("Enter filename: ") if not args.encrypt_file else args.encrypt_file
2023-12-06 12:59:04 +01:00
output = encrypt_file(filename)
2023-12-06 12:51:34 +01:00
# DECRYPTION
2023-11-23 10:28:11 +01:00
elif args.command == "decrypt":
if args.decrypt_host is not None:
2023-12-06 12:51:34 +01:00
host = input("Enter host: ") if not args.decrypt_host else args.decrypt_host
var = input("Enter variable: ") if not args.decrypt_var else args.decrypt_var
output = decrypt_string(host, var)
elif args.decrypt_file:
filename = input("Enter filename: ") if not args.decrypt_file else args.decrypt_file
2023-12-06 12:59:04 +01:00
output = decrypt_file(filename)
2023-12-06 13:39:52 +01:00
# ALLVARS
elif args.command == "allvars":
host = input("Enter host: ") if not args.allvars_host else args.allvars_host
output = allvars(host)
2023-12-06 12:51:34 +01:00
if output:
print(output)
2023-11-23 10:28:11 +01:00
if __name__ == "__main__":
main()