Source code for file_manager.work_file_manager

import arcpy
import re
from datetime import datetime

import env_setup.global_config
from typing import Any


[docs] class WorkFileManager: """ What: This class handles the creation and deletion of work files used in other classes or processes. It is designed to make it easy to switch between writing to disk and in-memory, and to delete/stop deleting work files to better troubleshoot issues. This class is not intended to be used to create final outputs of logics or processes. How: The same instance of WorkFileManager can create and manage different structures containing files. for each call of the setup_work_file_paths is designed to take a single structure. Each file path generated by the WorkFileManager is tracked by the created_paths attribute so if you do not need stage the deletion of the files you can simply call the delete_created_files method without any parameters. Args: unique_id (int): Used to generate unique file names, can be self value or an iterated number. root_file (str): The core file name used to generate unique file names. write_to_memory (bool): Defaults to True, write to memory if True, write to disk if False. keep_files (bool): Defaults to False, delete work files if True, keep work files if False. """ general_files_directory_name = env_setup.global_config.general_files_name lyrx_directory_name = env_setup.global_config.lyrx_directory_name _global_counter = 0 # mmdd_HHMM makes session prefix sortable: newer instances appear lower when sorted _session_prefix = datetime.now().strftime("%m%d_%H%M") def __init__( self, unique_id: int, root_file: str = None, write_to_memory: bool = True, keep_files: bool = False, ): """ Initializes the WorkFileManager with the desired parameters. Args: See class docstring. """ WorkFileManager._global_counter += 1 self.unique_id = f"id{self._session_prefix}_{WorkFileManager._global_counter}" self.root_file = root_file self.write_to_memory = write_to_memory self.keep_files = keep_files self.created_paths = [] if not self.write_to_memory and not self.root_file: raise ValueError( "Need to specify root_file path to write to disk for work files." ) if self.keep_files and not self.root_file: raise ValueError( "Need to specify root_file path and write to disk to keep work files." ) self.file_location = "memory/" if self.write_to_memory else f"{self.root_file}_" def _modify_path(self) -> tuple[str, str]: """ What: Modifies the given path by removing the unwanted portion up to the scale directory. Returns: tuple[str,str]: The modified path. """ # Define regex pattern to find the scale directory (ends with a digit followed by \\) match = re.search(r"\\\w+\d0\\", self.root_file) if not match: raise ValueError("Scale directory pattern not found in the path.") if self.write_to_memory: raise ValueError( "Other file types than gdb are not supported in memory mode." ) # Extract the root up to the scale directory scale_path = self.root_file[: match.end()] remaining_path = self.root_file[match.end() :] origin_file_name = remaining_path.split("\\", 1)[-1] return scale_path, origin_file_name def _build_file_path( self, file_name: str, file_type: str = "gdb", index: int = None, ) -> str: """ Generates a file path based on the file name, type, and an optional index. Args: file_name (str): The name of the file. file_type (str): The type of file to generate the path for. index (int, optional): An optional index to append for uniqueness. Returns: str: A string representing the file path. """ suffix = f"___{index}" if index is not None else "" if file_type == "gdb": path = f"{self.file_location}{file_name}_{self.unique_id}{suffix}" else: scale_path, origin_file_name = self._modify_path() if file_type == "lyrx": print("lyrx file path detected:") path = rf"{scale_path}{self.lyrx_directory_name}\{origin_file_name}_{file_name}_{self.unique_id}{suffix}.lyrx" print(f"Path: {path}") print( f"Scale path: {scale_path}\nOrigin file name: {origin_file_name}\n" ) else: path = rf"{scale_path}{self.general_files_directory_name}\{origin_file_name}_{file_name}_{self.unique_id}{suffix}.{file_type}" if path in self.created_paths: raise ValueError( f"Duplicate path detected: {path}. " "This may lead to unexpected behavior. Ensure unique file names or indices." ) self.created_paths.append(path) return path
[docs] def generate_output( self, instance: object, name: str, iteration_index: int, ) -> str: """ What: Generates a unique file path for a given base name and iteration index. Designed to allow users of WorkFileManager to generate indexed outputs in a loop. Args: instance (object): The caller instance to update attributes on if needed. name (str): The base name of the work file. iteration_index (int): The current iteration index for uniqueness. instance (object): The caller instance to update attributes on if needed. Returns: str: The generated file path. """ return self.setup_work_file_paths( instance=instance, file_structure=name, index=iteration_index, )
[docs] def setup_work_file_paths( self, instance: object, file_structure: Any, keys_to_update: str = None, add_key: str = None, file_type: str = "gdb", index: int = None, ) -> Any: """ What: Generates file paths for supported structures and sets them as attributes on the instance. Currently tested and supported structures include: - str - list[str] - dict[str, str] - list[dict[str, str]] Args: instance (object): The instance to set the file paths as attributes on. file_structure (Any): The input structure to process and return. keys_to_update (str, optional): Keys to update if file_structure is a dictionary. add_key (str, optional): An additional key to add to the dictionary. file_type (str, optional): The type of file path to generate. Defaults to "gdb". index (int, optional): Index to ensure uniqueness in file names when processing lists of dicts. Returns: Any: The same structure as file_structure, updated with generated file paths. """ def process_item(item, idx=None): """Processes a single item, determining its type and handling it accordingly.""" if isinstance(item, str): return process_string(item, idx) elif isinstance(item, dict): return process_dict(item, idx) elif isinstance(item, list): return process_list(item) else: raise TypeError(f"Unsupported file structure type: {type(item)}") def process_string(item, idx=None): """Processes a string item.""" # Update the instance attribute if it exists for attr_name, attr_value in instance.__dict__.items(): if attr_value == item: updated_path = self._build_file_path(item, file_type, index=idx) setattr(instance, attr_name, updated_path) return updated_path return self._build_file_path(item, file_type, index=idx) def process_list(items): """Processes a list structure.""" if all(isinstance(item, dict) for item in items): # List of dictionaries return [process_dict(item, idx) for idx, item in enumerate(items)] elif all(isinstance(item, str) for item in items): # List of unique strings return [process_string(item) for item in items] else: # Mixed list return [process_item(item, idx) for idx, item in enumerate(items)] def process_dict(dictionary, idx=None): """Processes a dictionary structure.""" updated_dict = {} for key, value in dictionary.items(): if keys_to_update == "ALL" or ( keys_to_update and key in keys_to_update ): updated_dict[key] = process_item(value, idx) else: updated_dict[key] = value if add_key: updated_dict[add_key] = self._build_file_path( add_key, file_type, index=idx ) return updated_dict # Determine the type of the top-level structure and process accordingly if isinstance(file_structure, str): return process_string(file_structure, idx=index) else: return process_item(file_structure)
[docs] def setup_dynamic_file_paths( self, base_name: str, count: int, file_type: str = "gdb", ) -> list[str]: """ Generates a list of file paths for a dynamic number of files based on a base name. Args: base_name (str): The base name to use for generating file paths. count (int): The number of file paths to generate. file_type (str, optional): The file type for the generated paths. Defaults to "gdb". Returns: list[str]: A list of generated file paths. """ dynamic_paths = [] for idx in range(count): path = self._build_file_path(base_name, file_type, index=idx) dynamic_paths.append(path) return dynamic_paths
[docs] def delete_created_files( self, delete_targets: list[str] = None, exceptions: list[str] = None, delete_files: list[str] = None, ): """ What: Deletes the created paths, defaults to deleting all created paths, but can target or exclude specific paths. Args: delete_targets (list[str], optional): List of paths to delete. Defaults to None. exceptions (list[str], optional): List of paths to exclude from deletion. Defaults to None. delete_files (bool, optional): Whether to delete files. Defaults to None, which uses `self.keep_files`. """ # Default to `self.keep_files` if `delete_files` is not explicitly provided if delete_files is None: delete_files = not self.keep_files if not delete_files: print("Deletion is disabled. No files deleted.") return # Use all tracked paths if delete_targets is not provided targets = delete_targets or self.created_paths # Apply exceptions, if provided if exceptions: targets = [path for path in targets if path not in exceptions] for path in targets: self._delete_file(path)
[docs] @staticmethod def list_contents(data: Any, title: str = "Contents"): """ Pretty prints the contents of a data structure (list, dict, or other serializable objects). Args: data (Any): The data structure to print (list, dict, or other serializable objects). title (str, optional): A title to display before printing. Defaults to "Contents". """ print(f"\n{f' Start of: {title} ':=^120}") if isinstance(data, (list, dict)): import pprint pprint.pprint(data, indent=4) else: print(data) print(f"{f' End of: {title} ':=^120}\n")
@staticmethod def _delete_file(file_path: str): """ Deletes a file from disk. """ try: if arcpy.Exists(file_path): arcpy.management.Delete(file_path) print(f"Deleted: {file_path}") else: print(f"File did not exist: {file_path}") except arcpy.ExecuteError as e: print(f"Error deleting file {file_path}: {e}")
[docs] @staticmethod def apply_to_structure(data, func, **key_map): """ What: Applies a function to elements within a supported data structure. Designed to work with dictionaries, lists of dictionaries, and extensible for other structures. How: Maps specified keys in the data structure to the function's parameters and applies the function to each valid element. Args: data (Union[dict, list[dict]]): The data structure to process. func (callable): The function to apply. The keys in `key_map` should match the function parameters. **key_map (str): Mapping of function parameter names to keys in the data structure. Raises: TypeError: If the data type is unsupported. KeyError: If a required key is missing in a dictionary. """ def process_item(item): """Helper function to process a single dictionary.""" try: func(**{param: item[key] for param, key in key_map.items()}) except KeyError as e: raise KeyError(f"Missing key {e} in dictionary: {item}") if isinstance(data, dict): process_item(data) elif isinstance(data, list): if all(isinstance(item, dict) for item in data): for item in data: process_item(item) else: raise TypeError( "List must contain only dictionaries. " f"Found invalid item in list: {data}" ) else: raise TypeError( f"Unsupported data type: {type(data)}. " "Expected a dictionary or a list of dictionaries." )
[docs] @staticmethod def extract_key_by_alias(data: list[dict], unique_alias: str, key: str) -> str: """ Extracts the value of a specific key from the dictionary with the specified alias. Args: data (list[dict]): The input list of dictionaries. unique_alias (str): The alias identifying the target dictionary. key (str): The key to extract the value from. Returns: str: The value associated with the key in the specified dictionary. Raises: ValueError: If no dictionary with the specified alias is found. KeyError: If the key does not exist in the identified dictionary. """ for item in data: if item.get("unique_alias") == unique_alias: if key in item: return item[key] raise KeyError( f"Key '{key}' not found in dictionary with alias '{unique_alias}'." ) raise ValueError(f"No dictionary with alias '{unique_alias}' found.")
[docs] @staticmethod def extract_key_all(data: list[dict], key: str) -> list[str]: """ Extracts all values for a specific key across all dictionaries. """ return [item[key] for item in data if key in item]
[docs] @staticmethod def set_key_by_alias( data: list[dict], unique_alias: str, key: str, new_value: str ) -> None: """ Sets the value of a key in the dictionary with the specified alias. Adds the key if it does not exist. Args: data (list[dict]): The input list of dictionaries. unique_alias (str): The alias identifying the target dictionary. key (str): The key to set or update. new_value (str): The value to set for the key. Raises: ValueError: If no dictionary with the specified alias is found. """ for item in data: if item.get("unique_alias") == unique_alias: item[key] = new_value return raise ValueError(f"No dictionary with alias '{unique_alias}' found.")