From e3cad0c66d792aa27aad3b9cfda9c11fe252b427 Mon Sep 17 00:00:00 2001 From: Max Mehl Date: Wed, 6 Dec 2023 12:51:34 +0100 Subject: [PATCH] support decryption of a file --- ansible-vault-tools.py | 102 +++++++++++++++++++++++++++++++++++------ 1 file changed, 87 insertions(+), 15 deletions(-) diff --git a/ansible-vault-tools.py b/ansible-vault-tools.py index 5acf2ea..6189883 100755 --- a/ansible-vault-tools.py +++ b/ansible-vault-tools.py @@ -1,13 +1,16 @@ #!/usr/bin/env python3 """Encrypt or decrypt using Ansible-vault and Ansible""" - # SPDX-FileCopyrightText: 2023 Max Mehl # # SPDX-License-Identifier: Apache-2.0 -import subprocess -import json +# pylint: disable=invalid-name + import argparse +import json +import os +import subprocess +import sys parser = argparse.ArgumentParser(description=__doc__) subparsers = parser.add_subparsers(title="commands", dest="command", required=True) @@ -16,23 +19,37 @@ parser_encrypt = subparsers.add_parser( "encrypt", help="Encrypt a string using ansible-vault", ) -parser_encrypt.add_argument( +encrypt_flags = parser_encrypt.add_mutually_exclusive_group(required=True) +encrypt_flags.add_argument( "-s", "--string", help="String that shall be encrypted", dest="encrypt_string", ) +encrypt_flags.add_argument( + "-f", + "--file", + help="File that shall be encrypted", + dest="encrypt_file", +) # Decrypt arguments parser_decrypt = subparsers.add_parser( "decrypt", help="Print a variable of one or multiple hosts and decrypt it if necessary", ) -parser_decrypt.add_argument( +decrypt_flags = parser_decrypt.add_mutually_exclusive_group(required=True) +decrypt_flags.add_argument( "-H", "--host", help="Host name from Ansible inventory for which you want to get the variable", dest="decrypt_host", ) +decrypt_flags.add_argument( + "-f", + "--file", + help="File that shall be decrypted", + dest="decrypt_file", +) parser_decrypt.add_argument( "-v", "--var", @@ -50,6 +67,24 @@ def convert_ansible_errors(error: str) -> str: return error +def ask_for_confirm(question: str) -> bool: + """Ask for confirmation. + + Args: + question (str): The question to ask the user. + + Returns: + bool: True if the user confirms with 'y', False otherwise. + """ + while True: + answer = input(f"{question} [y/n]: ").lower().strip() + if answer in ("y", "n"): + break + print("Invalid input. Please enter 'y' or 'n'.") + + return answer == "y" + + def encrypt_string(password): """Encrypt string with ansible-vault""" result = subprocess.run( @@ -57,6 +92,7 @@ def encrypt_string(password): input=password, text=True, capture_output=True, + check=False, ) return result.stdout.strip() @@ -74,7 +110,30 @@ def format_data(data: dict) -> str: return "\n".join(formatted_strings) -def decrypt_string(host, var): +def decrypt_file(filename) -> None: + """Decrypt file with ansible-vault""" + + if not os.path.exists(filename): + sys.exit(f"ERROR: File '{filename}' does not exist") + + decrypted_content = subprocess.run( + ["ansible-vault", "decrypt", "--output", "-", filename], check=False, capture_output=True + ) + + if decrypted_content.returncode != 0: + sys.exit( + f"ERROR: Could not decrypt file '{filename}'. This is the error:" + f"\n{decrypted_content.stderr.decode()}" + ) + + print(decrypted_content.stdout.decode().strip()) + if ask_for_confirm("Shall I write the encrypted content as seen above to the file?"): + decrypted_content = subprocess.run( + ["ansible-vault", "decrypt", filename], check=True, capture_output=True + ) + + +def decrypt_string(host, var) -> str: """Decrypt/print a variable from one or multiple hosts""" # Run ansible msg for variable # Send return as JSON @@ -83,14 +142,15 @@ def decrypt_string(host, var): "ANSIBLE_LOAD_CALLBACK_PLUGINS": "1", "ANSIBLE_STDOUT_CALLBACK": "json", } - result = subprocess.run(ansible_command, env=ansible_env, capture_output=True, text=True) + result = subprocess.run( + ansible_command, env=ansible_env, capture_output=True, text=True, check=False + ) # Parse JSON try: ansible_output = json.loads(result.stdout)["plays"][0]["tasks"][0]["hosts"] except IndexError: - print(f"ERROR: Host '{host}' not found.") - return "" + sys.exit(f"ERROR: Host '{host}' not found.") # Attempt to create a :-separated list of host/values output = {} @@ -103,16 +163,28 @@ def decrypt_string(host, var): def main(): """Main function""" args = parser.parse_args() + output = "" + # ENCRYPTION if args.command == "encrypt": - password = input("Enter string: ") if not args.encrypt_string else args.encrypt_string - vaultpw = encrypt_string(password) + if args.encrypt_string: + password = input("Enter string: ") if not args.encrypt_string else args.encrypt_string + output = encrypt_string(password) + elif args.encrypt_file: + filename = input("Enter filename: ") if not args.encrypt_file else args.encrypt_file + # TODO + # DECRYPTION elif args.command == "decrypt": - host = input("Enter host: ") if not args.decrypt_host else args.decrypt_host - var = input("Enter variable: ") if not args.decrypt_var else args.decrypt_var - vaultpw = decrypt_string(host, var) + if args.decrypt_host: + host = input("Enter host: ") if not args.decrypt_host else args.decrypt_host + var = input("Enter variable: ") if not args.decrypt_var else args.decrypt_var + output = decrypt_string(host, var) + elif args.decrypt_file: + filename = input("Enter filename: ") if not args.decrypt_file else args.decrypt_file + decrypt_file(filename) - print(vaultpw) + if output: + print(output) if __name__ == "__main__":