Compare commits

..

6 Commits

Author SHA1 Message Date
mxmehl 381a3bdcf7 fix: find correct executable 2025-07-30 10:32:36 +02:00
mxmehl 7b184700c4 rewrap vault output two two whitespaces 2025-05-13 21:06:30 +02:00
mxmehl 1a27b50fb2 catch GPG error 2024-02-29 09:54:38 +01:00
mxmehl b524b2f15d handle list values for single hosts 2023-12-29 11:25:05 +01:00
mxmehl e08aaaaa4e allow string/hosts to be written as input, not directly as argument 2023-12-07 21:42:20 +01:00
mxmehl f351949166 sort functions 2023-12-06 16:38:29 +01:00
+90 -64
View File
@@ -9,6 +9,8 @@
import argparse import argparse
import json import json
import os import os
import re
import shutil
import subprocess import subprocess
import sys import sys
@@ -25,6 +27,8 @@ encrypt_flags.add_argument(
"--string", "--string",
help="String that shall be encrypted", help="String that shall be encrypted",
dest="encrypt_string", dest="encrypt_string",
nargs="?",
const="",
) )
encrypt_flags.add_argument( encrypt_flags.add_argument(
"-f", "-f",
@@ -46,6 +50,8 @@ decrypt_flags.add_argument(
"Also supports 'all'" "Also supports 'all'"
), ),
dest="decrypt_host", dest="decrypt_host",
nargs="?",
const="",
) )
decrypt_flags.add_argument( decrypt_flags.add_argument(
"-f", "-f",
@@ -102,18 +108,6 @@ def ask_for_confirm(question: str) -> bool:
return answer == "y" return answer == "y"
def encrypt_string(password: str) -> str:
"""Encrypt string with ansible-vault"""
result = subprocess.run(
["ansible-vault", "encrypt_string"],
input=password,
text=True,
capture_output=True,
check=False,
)
return result.stdout.strip()
def format_data(data: dict) -> str: def format_data(data: dict) -> str:
"""Format data nicely in columns""" """Format data nicely in columns"""
if len(data) > 1: if len(data) > 1:
@@ -122,11 +116,87 @@ def format_data(data: dict) -> str:
formatted_strings = [f"{key.ljust(max_key_length)}: {value}" for key, value in data.items()] formatted_strings = [f"{key.ljust(max_key_length)}: {value}" for key, value in data.items()]
else: else:
# If only one host, return the single value # If only one host, return the single value
formatted_strings = list(data.values()) formatted_strings = [f"{value}" for _, value in data.items()]
return "\n".join(formatted_strings) return "\n".join(formatted_strings)
def rewrap_text(text: str) -> str:
"""Replace lines starting with exactly 8 spaces with 2 spaces"""
return re.sub(r"(?m)^ {8}", "", text)
def executable(command: str) -> str:
"""Return the path to an executable command"""
path = shutil.which(command)
if not path:
sys.exit(f"ERROR: {command} is not installed or not found in PATH.")
return path
def encrypt_string(password: str) -> str:
"""Encrypt string with ansible-vault"""
result = subprocess.run(
[executable("ansible-vault"), "encrypt_string"],
input=password,
text=True,
capture_output=True,
check=False,
)
return rewrap_text(result.stdout.strip())
def encrypt_file(filename: str) -> str:
"""Encrypt a file with ansible-vault"""
if not os.path.exists(filename):
sys.exit(f"ERROR: File '{filename}' does not exist")
encrypted_return = subprocess.run(
[executable("ansible-vault"), "encrypt", filename], check=False, capture_output=True
)
if encrypted_return.returncode != 0:
sys.exit(
f"ERROR: Could not encrypt file '{filename}'. This is the error:"
f"\n{encrypted_return.stderr.decode()}"
)
return f"Encrypted '{filename}' successfully"
def decrypt_string(host, var) -> str:
"""Decrypt/print a variable from one or multiple hosts"""
# Run ansible msg for variable
# Send return as JSON
ansible_command = [executable("ansible"), host, "-m", "debug", "-a", f"var={var}"]
ansible_env = {
"ANSIBLE_LOAD_CALLBACK_PLUGINS": "1",
"ANSIBLE_STDOUT_CALLBACK": "json",
}
try:
result = subprocess.run(
ansible_command, env=ansible_env, capture_output=True, text=True, check=True
)
except subprocess.CalledProcessError as e:
sys.exit(f"Decrypting the variable failed: {e.stderr}")
except FileNotFoundError:
sys.exit(f"ERROR: {executable} is not installed or not found in PATH.")
# Parse JSON
try:
ansible_output = json.loads(result.stdout)["plays"][0]["tasks"][0]["hosts"]
except IndexError:
sys.exit(f"ERROR: Host '{host}' not found.")
# Attempt to create a :-separated list of host/values
output = {}
for hostname, values in ansible_output.items():
output[hostname] = convert_ansible_errors(values[var])
return format_data(output)
def decrypt_file(filename: str) -> str: def decrypt_file(filename: str) -> str:
"""Decrypt file with ansible-vault""" """Decrypt file with ansible-vault"""
@@ -134,7 +204,9 @@ def decrypt_file(filename: str) -> str:
sys.exit(f"ERROR: File '{filename}' does not exist") sys.exit(f"ERROR: File '{filename}' does not exist")
decrypted_content = subprocess.run( decrypted_content = subprocess.run(
["ansible-vault", "decrypt", "--output", "-", filename], check=False, capture_output=True [executable("ansible-vault"), "decrypt", "--output", "-", filename],
check=False,
capture_output=True,
) )
if decrypted_content.returncode != 0: if decrypted_content.returncode != 0:
@@ -146,7 +218,7 @@ def decrypt_file(filename: str) -> str:
print(decrypted_content.stdout.decode().strip()) print(decrypted_content.stdout.decode().strip())
if ask_for_confirm("Shall I write the encrypted content as seen above to the file?"): if ask_for_confirm("Shall I write the encrypted content as seen above to the file?"):
decrypted_content = subprocess.run( decrypted_content = subprocess.run(
["ansible-vault", "decrypt", filename], check=True, capture_output=True [executable("ansible-vault"), "decrypt", filename], check=True, capture_output=True
) )
return f"Decrypted '{filename}' successfully" return f"Decrypted '{filename}' successfully"
@@ -157,7 +229,7 @@ def allvars(host: str) -> str:
"""Decrypt/print all variables from one or multiple hosts""" """Decrypt/print all variables from one or multiple hosts"""
# Run ansible var for all host vars as seen from localhost # Run ansible var for all host vars as seen from localhost
# Send return as JSON # Send return as JSON
ansible_command = ["ansible", "localhost", "-m", "debug", "-a", "var=hostvars"] ansible_command = [executable("ansible"), "localhost", "-m", "debug", "-a", "var=hostvars"]
ansible_env = { ansible_env = {
"ANSIBLE_LOAD_CALLBACK_PLUGINS": "1", "ANSIBLE_LOAD_CALLBACK_PLUGINS": "1",
"ANSIBLE_STDOUT_CALLBACK": "json", "ANSIBLE_STDOUT_CALLBACK": "json",
@@ -181,52 +253,6 @@ def allvars(host: str) -> str:
return json.dumps(ansible_output, indent=2) return json.dumps(ansible_output, indent=2)
def encrypt_file(filename: str) -> str:
"""Encrypt a file with ansible-vault"""
if not os.path.exists(filename):
sys.exit(f"ERROR: File '{filename}' does not exist")
encrypted_return = subprocess.run(
["ansible-vault", "encrypt", filename], check=False, capture_output=True
)
if encrypted_return.returncode != 0:
sys.exit(
f"ERROR: Could not encrypt file '{filename}'. This is the error:"
f"\n{encrypted_return.stderr.decode()}"
)
return f"Encrypted '{filename}' successfully"
def decrypt_string(host, var) -> str:
"""Decrypt/print a variable from one or multiple hosts"""
# Run ansible msg for variable
# Send return as JSON
ansible_command = ["ansible", host, "-m", "debug", "-a", f"var={var}"]
ansible_env = {
"ANSIBLE_LOAD_CALLBACK_PLUGINS": "1",
"ANSIBLE_STDOUT_CALLBACK": "json",
}
result = subprocess.run(
ansible_command, env=ansible_env, capture_output=True, text=True, check=False
)
# Parse JSON
try:
ansible_output = json.loads(result.stdout)["plays"][0]["tasks"][0]["hosts"]
except IndexError:
sys.exit(f"ERROR: Host '{host}' not found.")
# Attempt to create a :-separated list of host/values
output = {}
for hostname, values in ansible_output.items():
output[hostname] = convert_ansible_errors(values[var])
return format_data(output)
def main(): def main():
"""Main function""" """Main function"""
args = parser.parse_args() args = parser.parse_args()
@@ -234,7 +260,7 @@ def main():
# ENCRYPTION # ENCRYPTION
if args.command == "encrypt": if args.command == "encrypt":
if args.encrypt_string: if args.encrypt_string is not None:
password = input("Enter string: ") if not args.encrypt_string else args.encrypt_string password = input("Enter string: ") if not args.encrypt_string else args.encrypt_string
output = encrypt_string(password) output = encrypt_string(password)
elif args.encrypt_file: elif args.encrypt_file:
@@ -242,7 +268,7 @@ def main():
output = encrypt_file(filename) output = encrypt_file(filename)
# DECRYPTION # DECRYPTION
elif args.command == "decrypt": elif args.command == "decrypt":
if args.decrypt_host: if args.decrypt_host is not None:
host = input("Enter host: ") if not args.decrypt_host else args.decrypt_host host = input("Enter host: ") if not args.decrypt_host else args.decrypt_host
var = input("Enter variable: ") if not args.decrypt_var else args.decrypt_var var = input("Enter variable: ") if not args.decrypt_var else args.decrypt_var
output = decrypt_string(host, var) output = decrypt_string(host, var)