release version 1.4 (#21)

* chore: init working on history functions
rename classes

* refac: remove raw-result option. I cannot find a use ( I have found it later on )

* chore: add basic history directory checking

* chore: further expansion on the code. Now its christmas

* chore: rework some code and make a logging/history feature workable

---------

Co-authored-by: DaanSelen <dselen@systemec.nl>
This commit is contained in:
DaanSelen
2025-12-29 10:55:57 +01:00
committed by GitHub
parent 3a0fc215d7
commit 9716a2376c
6 changed files with 414 additions and 298 deletions
+45 -17
View File
@@ -1,26 +1,54 @@
# Public Python libraries
import argparse
from datetime import datetime
class console:
class Console:
class text_color:
black = "\033[30m"
red = "\033[31m"
green = "\033[32m"
yellow = "\033[33m"
blue = "\033[34m"
magenta = "\033[35m"
cyan = "\033[36m"
white = "\033[37m"
italic = "\x1B[3m"
reset = "\x1B[0m"
black = "\033[30m"
red = "\033[31m"
green = "\033[32m"
yellow = "\033[33m"
blue = "\033[34m"
magenta = "\033[35m"
cyan = "\033[36m"
white = "\033[37m"
italic = "\x1B[3m"
reset = "\x1B[0m"
@staticmethod
def nice_print(args: argparse.Namespace, message: str, final: bool=False):
def print_text(silent: bool, message: str, prefix_select: int = 0) -> None:
'''
Helper function for terminal output, with a couple variables for the silent flag. Also clears terminal color each time.
'''
if final:
print(message) # Assuming final message, there is no need for clearing.
elif not args.silent:
print(message + console.text_color.reset)
int tag_select legend:
0 / default = timestamp
1 = info
2 = warn
3 = err
4 = fatal
9 = nothing
'''
match prefix_select:
case 1:
tag_prefix = "[INFO] "
case 2:
tag_prefix = "[WARN] "
case 3:
tag_prefix = "[ERROR] "
case 4:
tag_prefix = "[FATAL] "
case 9:
tag_prefix = ""
case _:
tag_prefix = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} "
if not silent:
print(tag_prefix + message + Console.text_color.reset)
@staticmethod
def print_line(silent: bool, special: bool = False) -> None:
if not silent:
if special:
print("-=-" * 40)
else:
print(("-" * 40))
+13 -24
View File
@@ -5,26 +5,26 @@ import meshctrl
from time import sleep
# Local Python libraries/modules
from modules.console import console
from modules.utilities import transform
from modules.console import Console
from modules.utilities import Transform
intertask_delay = 1
class executor:
class Executor:
@staticmethod
async def execute_meshbook(args: argparse.Namespace, session: meshctrl.Session, compiled_device_list: dict, meshbook: dict, group_list: dict) -> None:
async def execute_meshbook(silent: bool, enable_shlex: bool, session: meshctrl.Session, compiled_device_list: dict, meshbook: dict, group_list: dict) -> dict:
'''
Actual function that handles meshbook execution, also responsible for formatting the resulting JSON.
'''
responses_list = {}
complete_log = {}
targets = compiled_device_list["target_list"]
offline = compiled_device_list["offline_list"]
round = 1
for task in meshbook["tasks"]:
console.nice_print(args,
console.text_color.green + str(round) + ". Running: " + task["name"])
Console.print_text(silent,
Console.text_color.green + str(round) + ". Running: " + task["name"])
if "powershell" in meshbook and meshbook["powershell"]:
response = await session.run_command(nodeids=targets, command=task["command"],powershell=True,ignore_output=False,timeout=1800)
@@ -36,10 +36,10 @@ class executor:
device_result = response[device]["result"]
response[device]["result"] = device_result.replace("Run commands completed.", "")
response[device]["device_id"] = device
response[device]["device_name"] = await transform.translate_nodeid_to_name(device, group_list)
response[device]["device_name"] = await Transform.translate_nodeid_to_name(device, group_list)
task_batch.append(response[device])
responses_list["task_" + str(round)] = {
complete_log["task_" + str(round)] = {
"task_name": task["name"],
"data": task_batch
}
@@ -47,20 +47,9 @@ class executor:
sleep(intertask_delay) # Sleep for x amount of time.
for index, device in enumerate(offline): # Replace Device_id with actual human readable name
device_name = await transform.translate_nodeid_to_name(device, group_list)
device_name = await Transform.translate_nodeid_to_name(device, group_list)
offline[index] = device_name
responses_list["Offline"] = offline
complete_log["Offline"] = offline
console.nice_print(args,
console.text_color.reset + ("-" * 40))
if args.indent:
if not args.raw_result:
responses_list = transform.process_shell_response(args.shlex, responses_list)
console.nice_print(args,
json.dumps(responses_list,indent=4), True)
else:
console.nice_print(args,
json.dumps(responses_list), True)
# Return the result
return Transform.process_shell_response(enable_shlex, complete_log)
+48
View File
@@ -0,0 +1,48 @@
import os
from datetime import datetime
from modules.console import Console
class History():
def __init__(self, silent: bool, history_directory: str, flush_history: bool) -> None:
'''
Init function to declare some stuff and make sure we are good to go, mostly the directory.
'''
self.silent = silent
self.history_directory = history_directory
if not os.path.exists(history_directory):
Console.print_text(silent, "Directory absent, trying to create it now...")
try:
os.mkdir(history_directory)
except PermissionError:
Console.print_text(silent, Console.text_color.red + f"Failed to create directory, permission error.")
return
history_items = os.listdir(history_directory)
if len(history_items) == 1:
Console.print_text(silent, f"There is {len(history_items)} history item.")
else:
Console.print_text(silent, f"There are {len(history_items)} history items.")
if flush_history:
self.remove_history(history_items)
def remove_history(self, history_items: list[str]) -> None:
if not os.access(self.history_directory, os.W_OK):
Console.print_text(self.silent, Console.text_color.red + "Unable to flush history logs, no write access.")
return
for item in history_items:
stitched_path = f"{self.history_directory}/{item}"
Console.print_text(self.silent, f"Removing: {item}.")
os.remove(stitched_path)
def write_history(self, history: dict) -> bool:
stitched_file = f"{self.history_directory}/meshbook_run_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}.log"
with open(stitched_file, "x") as f:
f.write(history)
+144 -12
View File
@@ -3,13 +3,14 @@ import argparse
from configparser import ConfigParser
import meshctrl
import os
import shlex
import yaml
'''
Creation and compilation of the MeshCentral nodes list (list of all nodes available to the user in the configuration) is handled in the following section.
'''
class utilities:
class Utilities:
@staticmethod
async def load_config(args: argparse.Namespace,
segment: str = 'meshcentral-account') -> dict:
@@ -43,9 +44,127 @@ class utilities:
with open(meshbook_file, 'r') as f:
meshbook = f.read()
meshbook = await transform.replace_placeholders(yaml.safe_load(meshbook))
meshbook = await Transform.replace_placeholders(yaml.safe_load(meshbook))
return meshbook
@staticmethod
async def gather_targets(args: argparse.Namespace,
meshbook: dict,
group_list: dict[str, list[dict]],
os_categories: dict) -> dict:
"""
Finds target devices based on meshbook criteria (device, devices, group or groups).
"""
group_list = {k.lower(): v for k, v in group_list.items()} # Normalize keys
target_list = []
offline_list = []
target_os = meshbook.get("target_os")
target_tag = meshbook.get("target_tag")
ignore_categorisation = meshbook.get("ignore_categorisation", False)
async def add_processed_devices(processed):
"""Helper to update target and offline lists."""
if processed:
target_list.extend(processed.get("valid_devices", []))
offline_list.extend(processed.get("offline_devices", []))
async def process_device_helper(device):
processed = await Utilities.process_device(
device,
group_list,
os_categories,
target_os,
ignore_categorisation,
target_tag
)
await add_processed_devices(processed)
async def process_group_helper(group):
processed = await Utilities.filter_targets(
group, os_categories, target_os, ignore_categorisation, target_tag
)
await add_processed_devices(processed)
'''
Groups receive the first priority, then device targets.
'''
match meshbook:
case {"group": pseudo_target}:
if isinstance(pseudo_target, str):
pseudo_target = pseudo_target.lower()
if pseudo_target in group_list:
await process_group_helper(group_list[pseudo_target])
elif pseudo_target not in group_list:
console.nice_print(
args,
console.text_color.yellow + "Targeted group not found on the MeshCentral server."
)
elif isinstance(pseudo_target, list):
console.nice_print(
args,
console.text_color.yellow + "Please use groups (Notice the plural with 'S') for multiple groups."
)
else:
console.nice_print(
args,
console.text_color.yellow + "The 'group' key is being used, but an unknown data type was found, please check your values."
)
case {"groups": pseudo_target}:
if isinstance(pseudo_target, list):
for sub_group in pseudo_target:
sub_group = sub_group.lower()
if sub_group in group_list:
await process_group_helper(group_list[sub_group])
elif isinstance(pseudo_target, str) and pseudo_target.lower() == "all":
for group in group_list.values():
await process_group_helper(group)
elif isinstance(pseudo_target, str):
console.nice_print(
args,
console.text_color.yellow + "The 'groups' key is being used, but only one string is given. Did you mean 'group'?"
)
else:
console.nice_print(
args,
console.text_color.yellow + "The 'groups' key is being used, but an unknown data type was found, please check your values."
)
case {"device": pseudo_target}:
if isinstance(pseudo_target, str):
await process_device_helper(pseudo_target)
elif isinstance(pseudo_target, list):
console.nice_print(
args,
console.text_color.yellow + "Please use devices (Notice the plural with 'S') for multiple devices."
)
else:
console.nice_print(
args,
console.text_color.yellow + "The 'device' key is being used, but an unknown data type was found, please check your values."
)
case {"devices": pseudo_target}:
if isinstance(pseudo_target, list):
for sub_device in pseudo_target:
await process_device_helper(sub_device)
elif isinstance(pseudo_target, str):
console.nice_print(
args,
console.text_color.yellow + "The 'devices' key is being used, but only one string is given. Did you mean 'device'?"
)
else:
console.nice_print(
args,
console.text_color.yellow + "The 'devices' key is being used, but an unknown data type was found, please check your values."
)
return {"target_list": target_list, "offline_list": offline_list}
@staticmethod
def get_os_variants(target_category: str,
os_map: dict) -> set:
@@ -60,7 +179,7 @@ class utilities:
os_set = set()
for sub_target_cat in value:
os_set.update(utilities.get_os_variants(sub_target_cat, value))
os_set.update(Utilities.get_os_variants(sub_target_cat, value))
return os_set
@@ -86,11 +205,11 @@ class utilities:
# Identify correct OS filtering scope
for key in os_categories:
if key == target_os:
allowed_os = utilities.get_os_variants(target_os, os_categories)
allowed_os = Utilities.get_os_variants(target_os, os_categories)
break # Stop searching once a match is found
if isinstance(os_categories[key], dict) and target_os in os_categories[key]:
allowed_os = utilities.get_os_variants(target_os, os_categories[key])
allowed_os = Utilities.get_os_variants(target_os, os_categories[key])
break # Stop searching once a match is found
for device in devices: # Filter out unwanted or unreachable devices.
@@ -137,7 +256,7 @@ class utilities:
# If matches found, filter them and add processed devices
if matched_devices:
processed = await utilities.filter_targets(
processed = await Utilities.filter_targets(
matched_devices, os_categories, target_os, ignore_categorisation, target_tag
)
return processed
@@ -145,18 +264,31 @@ class utilities:
# No matches found
return {"valid_devices": [], "offline_devices": []}
import shlex
class transform:
@staticmethod
def process_shell_response(shlex_enable: bool, meshbook_result: dict) -> dict:
def path_exist(path: str) -> bool:
return os.path.exists(path)
@staticmethod
def path_type(path: str) -> str:
if os.path.isfile(path):
return "File"
if os.path.isdir(path):
return "Dir"
if os.path.islink(path):
return "Link"
return "Undefined"
class Transform:
@staticmethod
def process_shell_response(enable_shlex: bool, meshbook_result: dict) -> dict:
for task_name, task_data in meshbook_result.items():
if task_name == "Offline": # Failsafe
if task_name == "Offline": # Failsafe do not parse Offline section, its simple
continue
for node_responses in task_data["data"]:
task_result = node_responses["result"].splitlines()
if shlex_enable:
if enable_shlex:
for index, line in enumerate(task_result):
line = shlex.split(line)
task_result[index] = line