refac: added some type checking and fixed a duplicate bug (apparently)

This commit is contained in:
Daan Selen
2025-09-25 09:49:19 +02:00
parent 82cc31e0f6
commit 9d2999476d
4 changed files with 31 additions and 11 deletions
+11 -4
View File
@@ -53,8 +53,11 @@ async def gather_targets(args: argparse.Namespace,
offline_list = [] offline_list = []
target_os = meshbook.get("target_os") target_os = meshbook.get("target_os")
ignore_categorisation = meshbook.get("ignore_categorisation", False)
target_tag = meshbook.get("target_tag") target_tag = meshbook.get("target_tag")
ignore_categorisation = meshbook.get("ignore_categorisation", False)
assert target_os is not None, "target_os must be provided"
assert target_tag is not None, "target_tag must be provided"
async def add_processed_devices(processed): async def add_processed_devices(processed):
"""Helper to update target and offline lists.""" """Helper to update target and offline lists."""
@@ -119,7 +122,7 @@ async def gather_targets(args: argparse.Namespace,
elif isinstance(pseudo_target, str) and pseudo_target.lower() == "all": elif isinstance(pseudo_target, str) and pseudo_target.lower() == "all":
for group in group_list.values(): for group in group_list.values():
await process_group_helper(group) await process_group_helper(group)
elif isintance(pseudo_target, str): elif isinstance(pseudo_target, str):
console.nice_print( console.nice_print(
args, args,
console.text_color.yellow + "The 'groups' key is being used, but only one string is given. Did you mean 'group'?", console.text_color.yellow + "The 'groups' key is being used, but only one string is given. Did you mean 'group'?",
@@ -303,7 +306,7 @@ async def main():
group_list = await transform.compile_group_list(session) group_list = await transform.compile_group_list(session)
compiled_device_list = await gather_targets(args, meshbook, group_list, os_categories) compiled_device_list = await gather_targets(args, meshbook, group_list, os_categories)
if len(compiled_device_list["target_list"]) == 0: if "target_list" not in compiled_device_list or len(compiled_device_list["target_list"]) == 0:
console.nice_print(args, console.nice_print(args,
console.text_color.red + "No targets found or targets unreachable, quitting.", True) console.text_color.red + "No targets found or targets unreachable, quitting.", True)
@@ -327,6 +330,10 @@ async def main():
case {"devices": candidate_target_name}: case {"devices": candidate_target_name}:
target_name = str(candidate_target_name) target_name = str(candidate_target_name)
case _:
target_name = ""
console.nice_print(args, console.nice_print(args,
console.text_color.yellow + "Executing meshbook on the target(s): " + console.text_color.green + target_name + console.text_color.yellow + ".") console.text_color.yellow + "Executing meshbook on the target(s): " + console.text_color.green + target_name + console.text_color.yellow + ".")
@@ -350,7 +357,7 @@ async def main():
except OSError as message: except OSError as message:
console.nice_print(args, console.nice_print(args,
console.text_color.red + message, True) console.text_color.red + f'{message}', True)
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())
+1
View File
@@ -14,6 +14,7 @@ class console:
italic = "\x1B[3m" italic = "\x1B[3m"
reset = "\x1B[0m" reset = "\x1B[0m"
@staticmethod
def nice_print(args: argparse.Namespace, message: str, final: bool=False): def nice_print(args: argparse.Namespace, message: str, final: bool=False):
''' '''
Helper function for terminal output, with a couple variables for the silent flag. Also clears terminal color each time. Helper function for terminal output, with a couple variables for the silent flag. Also clears terminal color each time.
+2 -1
View File
@@ -11,6 +11,7 @@ from modules.utilities import transform
intertask_delay = 1 intertask_delay = 1
class executor: class executor:
@staticmethod
async def execute_meshbook(args: argparse.Namespace, session: meshctrl.Session, compiled_device_list: dict, meshbook: dict, group_list: dict) -> None: async def execute_meshbook(args: argparse.Namespace, session: meshctrl.Session, compiled_device_list: dict, meshbook: dict, group_list: dict) -> None:
''' '''
Actual function that handles meshbook execution, also responsible for formatting the resulting JSON. Actual function that handles meshbook execution, also responsible for formatting the resulting JSON.
@@ -28,7 +29,7 @@ class executor:
if "powershell" in meshbook and meshbook["powershell"]: if "powershell" in meshbook and meshbook["powershell"]:
response = await session.run_command(nodeids=targets, command=task["command"],powershell=True,ignore_output=False,timeout=900) response = await session.run_command(nodeids=targets, command=task["command"],powershell=True,ignore_output=False,timeout=900)
else: else:
response = await session.run_command(nodeids=targets, command=task["command"],ignore_output=False,timeout=900) response = await session.run_command(nodeids=targets, command=task["command"],powershell=False,ignore_output=False,timeout=900)
task_batch = [] task_batch = []
for device in response: for device in response:
+17 -6
View File
@@ -10,6 +10,7 @@ Creation and compilation of the MeshCentral nodes list (list of all nodes availa
''' '''
class utilities: class utilities:
@staticmethod
async def load_config(args: argparse.Namespace, async def load_config(args: argparse.Namespace,
segment: str = 'meshcentral-account') -> dict: segment: str = 'meshcentral-account') -> dict:
''' '''
@@ -32,17 +33,20 @@ class utilities:
print(f'Segment "{segment}" not found in config file {conf_file}.') print(f'Segment "{segment}" not found in config file {conf_file}.')
os._exit(1) os._exit(1)
return config[segment] return dict(config[segment])
async def compile_book(meshbook_file: dict) -> dict: @staticmethod
async def compile_book(meshbook_file: str) -> dict:
''' '''
Simple function that opens the file and replaces placeholders through the next function. After that just return it. Simple function that opens the file and replaces placeholders through the next function. After that just return it.
''' '''
meshbook = open(meshbook_file, 'r') with open(meshbook_file, 'r') as f:
meshbook = f.read()
meshbook = await transform.replace_placeholders(yaml.safe_load(meshbook)) meshbook = await transform.replace_placeholders(yaml.safe_load(meshbook))
return meshbook return meshbook
@staticmethod
def get_os_variants(target_category: str, def get_os_variants(target_category: str,
os_map: dict) -> set: os_map: dict) -> set:
''' '''
@@ -65,17 +69,19 @@ class utilities:
return set() return set()
@staticmethod
async def filter_targets(devices: list[dict], async def filter_targets(devices: list[dict],
os_categories: dict, os_categories: dict,
target_os: str = None, target_os: str = "",
ignore_categorisation: bool = False, ignore_categorisation: bool = False,
target_tag: str = None) -> dict: target_tag: str = "") -> dict:
''' '''
Filters devices based on reachability and optional OS criteria, supporting nested OS categories. Filters devices based on reachability and optional OS criteria, supporting nested OS categories.
''' '''
valid_devices = [] valid_devices = []
offline_devices = [] offline_devices = []
allowed_os = set()
# Identify correct OS filtering scope # Identify correct OS filtering scope
for key in os_categories: for key in os_categories:
@@ -109,6 +115,7 @@ class utilities:
"offline_devices": offline_devices "offline_devices": offline_devices
} }
@staticmethod
async def process_device(device: str, async def process_device(device: str,
group_list: dict, group_list: dict,
os_categories: dict, os_categories: dict,
@@ -143,6 +150,7 @@ class utilities:
import shlex import shlex
class transform: class transform:
@staticmethod
def process_shell_response(shlex_enable: bool, meshbook_result: dict) -> dict: def process_shell_response(shlex_enable: bool, meshbook_result: dict) -> dict:
for task_name, task_data in meshbook_result.items(): for task_name, task_data in meshbook_result.items():
if task_name == "Offline": # Failsafe if task_name == "Offline": # Failsafe
@@ -164,6 +172,7 @@ class transform:
node_responses["result"] = clean_output node_responses["result"] = clean_output
return meshbook_result return meshbook_result
@staticmethod
async def translate_nodeid_to_name(target_id: str, group_list: dict) -> str: async def translate_nodeid_to_name(target_id: str, group_list: dict) -> str:
''' '''
Simple function that looks up nodeid to the human-readable name if existent - otherwise return None. Simple function that looks up nodeid to the human-readable name if existent - otherwise return None.
@@ -173,8 +182,9 @@ class transform:
for device in group_list[group]: for device in group_list[group]:
if device["device_id"] == target_id: if device["device_id"] == target_id:
return device["device_name"] return device["device_name"]
return None return ""
@staticmethod
async def replace_placeholders(meshbook: dict) -> dict: async def replace_placeholders(meshbook: dict) -> dict:
''' '''
Replace the placeholders in both name and command fields of the tasks. According to the variables defined in the variables list. Replace the placeholders in both name and command fields of the tasks. According to the variables defined in the variables list.
@@ -208,6 +218,7 @@ class transform:
return meshbook return meshbook
@staticmethod
async def compile_group_list(session: meshctrl.Session) -> dict: async def compile_group_list(session: meshctrl.Session) -> dict:
''' '''
Function that retrieves the devices from MeshCentral and compiles it into a efficient list. Function that retrieves the devices from MeshCentral and compiles it into a efficient list.