Compare commits
6 Commits
23c6f4e7bf
..
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
381a3bdcf7
|
|||
|
7b184700c4
|
|||
| 1a27b50fb2 | |||
| b524b2f15d | |||
| e08aaaaa4e | |||
|
f351949166
|
+92
-66
@@ -9,6 +9,8 @@
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
@@ -25,6 +27,8 @@ encrypt_flags.add_argument(
|
||||
"--string",
|
||||
help="String that shall be encrypted",
|
||||
dest="encrypt_string",
|
||||
nargs="?",
|
||||
const="",
|
||||
)
|
||||
encrypt_flags.add_argument(
|
||||
"-f",
|
||||
@@ -42,10 +46,12 @@ decrypt_flags.add_argument(
|
||||
"-H",
|
||||
"--host",
|
||||
help=(
|
||||
"Host name from Ansible inventory for which you want to get a specific variable."
|
||||
"Host name from Ansible inventory for which you want to get a specific variable. "
|
||||
"Also supports 'all'"
|
||||
),
|
||||
dest="decrypt_host",
|
||||
nargs="?",
|
||||
const="",
|
||||
)
|
||||
decrypt_flags.add_argument(
|
||||
"-f",
|
||||
@@ -68,7 +74,7 @@ parser_allvars.add_argument(
|
||||
"-H",
|
||||
"--host",
|
||||
help=(
|
||||
"Host name from Ansible inventory for which you want to get all variables."
|
||||
"Host name from Ansible inventory for which you want to get all variables. "
|
||||
"Also supports 'all'"
|
||||
),
|
||||
dest="allvars_host",
|
||||
@@ -102,18 +108,6 @@ def ask_for_confirm(question: str) -> bool:
|
||||
return answer == "y"
|
||||
|
||||
|
||||
def encrypt_string(password: str) -> str:
|
||||
"""Encrypt string with ansible-vault"""
|
||||
result = subprocess.run(
|
||||
["ansible-vault", "encrypt_string"],
|
||||
input=password,
|
||||
text=True,
|
||||
capture_output=True,
|
||||
check=False,
|
||||
)
|
||||
return result.stdout.strip()
|
||||
|
||||
|
||||
def format_data(data: dict) -> str:
|
||||
"""Format data nicely in columns"""
|
||||
if len(data) > 1:
|
||||
@@ -122,11 +116,87 @@ def format_data(data: dict) -> str:
|
||||
formatted_strings = [f"{key.ljust(max_key_length)}: {value}" for key, value in data.items()]
|
||||
else:
|
||||
# If only one host, return the single value
|
||||
formatted_strings = list(data.values())
|
||||
formatted_strings = [f"{value}" for _, value in data.items()]
|
||||
|
||||
return "\n".join(formatted_strings)
|
||||
|
||||
|
||||
def rewrap_text(text: str) -> str:
|
||||
"""Replace lines starting with exactly 8 spaces with 2 spaces"""
|
||||
return re.sub(r"(?m)^ {8}", "", text)
|
||||
|
||||
|
||||
def executable(command: str) -> str:
|
||||
"""Return the path to an executable command"""
|
||||
path = shutil.which(command)
|
||||
if not path:
|
||||
sys.exit(f"ERROR: {command} is not installed or not found in PATH.")
|
||||
return path
|
||||
|
||||
|
||||
def encrypt_string(password: str) -> str:
|
||||
"""Encrypt string with ansible-vault"""
|
||||
result = subprocess.run(
|
||||
[executable("ansible-vault"), "encrypt_string"],
|
||||
input=password,
|
||||
text=True,
|
||||
capture_output=True,
|
||||
check=False,
|
||||
)
|
||||
return rewrap_text(result.stdout.strip())
|
||||
|
||||
|
||||
def encrypt_file(filename: str) -> str:
|
||||
"""Encrypt a file with ansible-vault"""
|
||||
|
||||
if not os.path.exists(filename):
|
||||
sys.exit(f"ERROR: File '{filename}' does not exist")
|
||||
|
||||
encrypted_return = subprocess.run(
|
||||
[executable("ansible-vault"), "encrypt", filename], check=False, capture_output=True
|
||||
)
|
||||
|
||||
if encrypted_return.returncode != 0:
|
||||
sys.exit(
|
||||
f"ERROR: Could not encrypt file '{filename}'. This is the error:"
|
||||
f"\n{encrypted_return.stderr.decode()}"
|
||||
)
|
||||
|
||||
return f"Encrypted '{filename}' successfully"
|
||||
|
||||
|
||||
def decrypt_string(host, var) -> str:
|
||||
"""Decrypt/print a variable from one or multiple hosts"""
|
||||
# Run ansible msg for variable
|
||||
# Send return as JSON
|
||||
ansible_command = [executable("ansible"), host, "-m", "debug", "-a", f"var={var}"]
|
||||
ansible_env = {
|
||||
"ANSIBLE_LOAD_CALLBACK_PLUGINS": "1",
|
||||
"ANSIBLE_STDOUT_CALLBACK": "json",
|
||||
}
|
||||
try:
|
||||
result = subprocess.run(
|
||||
ansible_command, env=ansible_env, capture_output=True, text=True, check=True
|
||||
)
|
||||
except subprocess.CalledProcessError as e:
|
||||
sys.exit(f"Decrypting the variable failed: {e.stderr}")
|
||||
except FileNotFoundError:
|
||||
sys.exit(f"ERROR: {executable} is not installed or not found in PATH.")
|
||||
|
||||
# Parse JSON
|
||||
try:
|
||||
ansible_output = json.loads(result.stdout)["plays"][0]["tasks"][0]["hosts"]
|
||||
except IndexError:
|
||||
sys.exit(f"ERROR: Host '{host}' not found.")
|
||||
|
||||
# Attempt to create a :-separated list of host/values
|
||||
output = {}
|
||||
for hostname, values in ansible_output.items():
|
||||
output[hostname] = convert_ansible_errors(values[var])
|
||||
|
||||
return format_data(output)
|
||||
|
||||
|
||||
def decrypt_file(filename: str) -> str:
|
||||
"""Decrypt file with ansible-vault"""
|
||||
|
||||
@@ -134,7 +204,9 @@ def decrypt_file(filename: str) -> str:
|
||||
sys.exit(f"ERROR: File '{filename}' does not exist")
|
||||
|
||||
decrypted_content = subprocess.run(
|
||||
["ansible-vault", "decrypt", "--output", "-", filename], check=False, capture_output=True
|
||||
[executable("ansible-vault"), "decrypt", "--output", "-", filename],
|
||||
check=False,
|
||||
capture_output=True,
|
||||
)
|
||||
|
||||
if decrypted_content.returncode != 0:
|
||||
@@ -146,7 +218,7 @@ def decrypt_file(filename: str) -> str:
|
||||
print(decrypted_content.stdout.decode().strip())
|
||||
if ask_for_confirm("Shall I write the encrypted content as seen above to the file?"):
|
||||
decrypted_content = subprocess.run(
|
||||
["ansible-vault", "decrypt", filename], check=True, capture_output=True
|
||||
[executable("ansible-vault"), "decrypt", filename], check=True, capture_output=True
|
||||
)
|
||||
return f"Decrypted '{filename}' successfully"
|
||||
|
||||
@@ -157,7 +229,7 @@ def allvars(host: str) -> str:
|
||||
"""Decrypt/print all variables from one or multiple hosts"""
|
||||
# Run ansible var for all host vars as seen from localhost
|
||||
# Send return as JSON
|
||||
ansible_command = ["ansible", "localhost", "-m", "debug", "-a", "var=hostvars"]
|
||||
ansible_command = [executable("ansible"), "localhost", "-m", "debug", "-a", "var=hostvars"]
|
||||
ansible_env = {
|
||||
"ANSIBLE_LOAD_CALLBACK_PLUGINS": "1",
|
||||
"ANSIBLE_STDOUT_CALLBACK": "json",
|
||||
@@ -181,52 +253,6 @@ def allvars(host: str) -> str:
|
||||
return json.dumps(ansible_output, indent=2)
|
||||
|
||||
|
||||
def encrypt_file(filename: str) -> str:
|
||||
"""Encrypt a file with ansible-vault"""
|
||||
|
||||
if not os.path.exists(filename):
|
||||
sys.exit(f"ERROR: File '{filename}' does not exist")
|
||||
|
||||
encrypted_return = subprocess.run(
|
||||
["ansible-vault", "encrypt", filename], check=False, capture_output=True
|
||||
)
|
||||
|
||||
if encrypted_return.returncode != 0:
|
||||
sys.exit(
|
||||
f"ERROR: Could not encrypt file '{filename}'. This is the error:"
|
||||
f"\n{encrypted_return.stderr.decode()}"
|
||||
)
|
||||
|
||||
return f"Encrypted '{filename}' successfully"
|
||||
|
||||
|
||||
def decrypt_string(host, var) -> str:
|
||||
"""Decrypt/print a variable from one or multiple hosts"""
|
||||
# Run ansible msg for variable
|
||||
# Send return as JSON
|
||||
ansible_command = ["ansible", host, "-m", "debug", "-a", f"var={var}"]
|
||||
ansible_env = {
|
||||
"ANSIBLE_LOAD_CALLBACK_PLUGINS": "1",
|
||||
"ANSIBLE_STDOUT_CALLBACK": "json",
|
||||
}
|
||||
result = subprocess.run(
|
||||
ansible_command, env=ansible_env, capture_output=True, text=True, check=False
|
||||
)
|
||||
|
||||
# Parse JSON
|
||||
try:
|
||||
ansible_output = json.loads(result.stdout)["plays"][0]["tasks"][0]["hosts"]
|
||||
except IndexError:
|
||||
sys.exit(f"ERROR: Host '{host}' not found.")
|
||||
|
||||
# Attempt to create a :-separated list of host/values
|
||||
output = {}
|
||||
for hostname, values in ansible_output.items():
|
||||
output[hostname] = convert_ansible_errors(values[var])
|
||||
|
||||
return format_data(output)
|
||||
|
||||
|
||||
def main():
|
||||
"""Main function"""
|
||||
args = parser.parse_args()
|
||||
@@ -234,7 +260,7 @@ def main():
|
||||
|
||||
# ENCRYPTION
|
||||
if args.command == "encrypt":
|
||||
if args.encrypt_string:
|
||||
if args.encrypt_string is not None:
|
||||
password = input("Enter string: ") if not args.encrypt_string else args.encrypt_string
|
||||
output = encrypt_string(password)
|
||||
elif args.encrypt_file:
|
||||
@@ -242,7 +268,7 @@ def main():
|
||||
output = encrypt_file(filename)
|
||||
# DECRYPTION
|
||||
elif args.command == "decrypt":
|
||||
if args.decrypt_host:
|
||||
if args.decrypt_host is not None:
|
||||
host = input("Enter host: ") if not args.decrypt_host else args.decrypt_host
|
||||
var = input("Enter variable: ") if not args.decrypt_var else args.decrypt_var
|
||||
output = decrypt_string(host, var)
|
||||
|
||||
Reference in New Issue
Block a user