Source code for esapp.saw._helpers

"""Helper functions for data conversion for SimAuto COM interface."""

import json
import logging
import os
import re
import tempfile
import uuid
from pathlib import Path, PureWindowsPath
from typing import List, Optional, Tuple, Union, Sequence

import pandas as pd
import pythoncom
import win32com
from win32com.client import VARIANT


# =============================================================================
# PowerWorld Command String Formatting Helpers
# =============================================================================


def load_ts_csv_results(base_path: Path, delete_files: bool = False) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Reads and parses transient stability results from CSV files generated by PowerWorld.
    
    Args:
        base_path: The base file path used in the TSGetResults command.
        delete_files: Whether to delete the found files after reading.
        
    Returns:
        Tuple[pd.DataFrame, pd.DataFrame]: (Metadata, Time-Series Data)
    """
    logger = logging.getLogger(__name__)
    
    # PowerWorld appends suffixes, so we search for the base name pattern.
    search_pattern = f"{base_path.stem}*.csv"
    found_files = list(base_path.parent.glob(search_pattern))
    
    meta = pd.DataFrame()
    data_frames = []
    
    try:
        if not found_files:
            return meta, pd.DataFrame()

        header_files = [f for f in found_files if "_header" in f.name.lower()]
        data_files = [f for f in found_files if f not in header_files]
        
        # --- Process Header ---
        if header_files:
            # Use the first header file found
            header_path = header_files[0]
            try:
                # Check if first line is just a title (e.g. "ObjectFields")
                with header_path.open('r') as f:
                    first_line = f.readline()
                
                # PowerWorld sometimes puts "ObjectFields" on the first line alone
                skip_rows = 1 if "ObjectFields" in first_line and "," not in first_line else 0
                
                meta = pd.read_csv(header_path, sep=',', skiprows=skip_rows)
                meta.columns = meta.columns.str.strip()
                
                # Standardize column names
                rename_map = {
                    'Column': 'ColHeader',
                    'Object': 'ObjectType',
                    'Variable': 'VariableName',
                    'Key 1': 'PrimaryKey',
                    'Key 2': 'SecondaryKey'
                }
                meta.rename(columns=rename_map, inplace=True)
                
                # Force ColHeader to be 0-based index strings to match data columns
                meta['ColHeader'] = [str(i) for i in range(len(meta))]
                    
            except Exception as e:
                logger.warning(f"Failed to read header file {header_path}: {e}")

        # --- Process Data ---
        for dpath in data_files:
            try:
                df = pd.read_csv(dpath, sep=',', header=None)
                
                if not df.empty:
                    # Rename time column (index 0) and data columns (1..N)
                    # Map 0 -> "time", 1 -> "0", 2 -> "1", ...
                    col_map = {0: "time"}
                    col_map.update({i: str(i-1) for i in range(1, len(df.columns))})
                    df.rename(columns=col_map, inplace=True)
                    data_frames.append(df)
            except Exception as e:
                logger.warning(f"Failed to read data file {dpath}: {e}")
        
        data = pd.concat(data_frames, ignore_index=True) if data_frames else pd.DataFrame()
        
        # Ensure time is float and sorted
        if 'time' in data.columns:
             data['time'] = pd.to_numeric(data['time'], errors='coerce')
             data.sort_values('time', inplace=True)
             
        return meta, data

    finally:
        if delete_files:
            for f in found_files:
                try:
                    f.unlink(missing_ok=True)
                except OSError as e:
                    logger.warning(f"Failed to unlink temp file {f}: {e}")


[docs] def get_temp_filepath(suffix: str = ".csv") -> str: """Generates a unique temporary filepath.""" temp_dir = tempfile.gettempdir() unique_name = f"esa_{uuid.uuid4()}{suffix}" return os.path.join(temp_dir, unique_name)
[docs] def format_list( items: Optional[Sequence], quote_items: bool = False, stringify: bool = False, ) -> str: """Format a Python sequence as a PowerWorld bracketed list. This is the standard way to pass array parameters to PowerWorld script commands. Parameters ---------- items : Optional[Sequence] The items to format. If None or empty, returns "[]". quote_items : bool, optional If True, wraps each item in double quotes. Use for string fields like names or labels. Defaults to False. stringify : bool, optional If True, converts each item to string using str(). Use for numeric values or mixed types. Defaults to False. Returns ------- str A bracketed list string like "[item1, item2, ...]" or "[]". Examples -------- >>> format_list(["BusNum", "BusName"]) '[BusNum, BusName]' >>> format_list(["Gen1", "Gen2"], quote_items=True) '["Gen1", "Gen2"]' >>> format_list([1.5, 2.0, 3.5], stringify=True) '[1.5, 2.0, 3.5]' >>> format_list(None) '[]' """ if not items: return "[]" if quote_items: formatted = [f'"{item}"' for item in items] elif stringify: formatted = [str(item) for item in items] else: formatted = list(items) return "[" + ", ".join(formatted) + "]"
[docs] def format_optional( value: Optional[str], quote: bool = True, empty_quoted: bool = False, ) -> str: """Format an optional string parameter for PowerWorld commands. Parameters ---------- value : Optional[str] The value to format. If None or empty string, returns empty or quoted empty. quote : bool, optional If True, wraps non-empty values in double quotes. Defaults to True. empty_quoted : bool, optional If True, returns '""' for empty values instead of "". Defaults to False. Returns ------- str The formatted parameter string. Examples -------- >>> format_optional("MyFilter") '"MyFilter"' >>> format_optional("") '' >>> format_optional("", empty_quoted=True) '""' >>> format_optional("SomeValue", quote=False) 'SomeValue' """ if not value: return '""' if empty_quoted else "" return f'"{value}"' if quote else value
[docs] def format_optional_numeric(value: Optional[Union[int, float]]) -> str: """Format an optional numeric parameter for PowerWorld commands. Parameters ---------- value : Optional[Union[int, float]] The numeric value to format. If None, returns an empty string. Returns ------- str The string representation of the value, or "" if None. Examples -------- >>> format_optional_numeric(3.14) '3.14' >>> format_optional_numeric(None) '' >>> format_optional_numeric(0) '0' """ return str(value) if value is not None else ""
[docs] def df_to_aux(fp, df, object_name: str): """Convert a dataframe to PW aux/axd data section. Parameters ---------- fp : file File handler. df : pandas.DataFrame DataFrame to convert. object_name : str PowerWorld object type. """ # write the header fields = ",".join(df.columns.tolist()) header = f"DATA ({object_name}, [{fields}])" header_chunks = header.split(",") i = 0 line_width = 0 max_width = 86 working_line = [] container = [] while True: if line_width + len(header_chunks[i]) <= max_width: working_line.append(header_chunks[i]) line_width += len(header_chunks[i]) i += 1 else: container.append(",".join(working_line)) working_line = [] line_width = 0 if i == len(header_chunks): if len(working_line): container.append(",".join(working_line)) break container = [ls + "," for ls in container[:-1]] + [container[-1]] container = [container[0]] + [" " + ls for ls in container[1:]] # add tab to each line # write the remaining part container.append("{") container.extend(json.dumps(row, separators=(" ", ": "))[1:-1] for row in df.values.tolist()) container.append("}\r\n") fp.write("\n".join(container))
[docs] def convert_to_windows_path(p): """Given a path, p, convert it to a Windows path.""" return str(PureWindowsPath(p))
[docs] def convert_list_to_variant(list_in: list) -> VARIANT: """Given a list, convert to a variant array.""" # noinspection PyUnresolvedReferences return VARIANT(pythoncom.VT_VARIANT | pythoncom.VT_ARRAY, list_in)
[docs] def convert_df_to_variant(df): """Given a DataFrame, convert to a variant array for Rect functions.""" data_as_list = df.values.tolist() return win32com.client.VARIANT(pythoncom.VT_ARRAY | pythoncom.VT_VARIANT, data_as_list)
[docs] def convert_nested_list_to_variant(list_in: list) -> List[VARIANT]: """Given a list of lists, convert to a variant array.""" return [convert_list_to_variant(sub_array) for sub_array in list_in]
[docs] def create_object_string(object_type: str, *keys) -> str: """ Helper to format a PowerWorld object string identifier. This function creates strings formatted like ``[BUS 1]`` or ``[BRANCH 1 2 1]`` which are used to identify objects in SimAuto script commands. Parameters ---------- object_type : str The type of object (e.g. "Bus", "Gen", "Branch"). *keys : Any The key values identifying the object. Returns ------- str Formatted string like ``[ObjectType key1 key2 ...]``. """ parts = [object_type.upper()] for key in keys: parts.append(str(key)) return f"[{' '.join(parts)}]"
def pack_args(*args) -> str: """ Formats arguments for a PowerWorld script command. Filters out trailing None values. Converts remaining None values to empty strings. Joins arguments with commas. """ args_list = list(args) while args_list and args_list[-1] is None: args_list.pop() return ", ".join("" if a is None else str(a) for a in args_list) # ============================================================================= # AUX Parsing / Building Helpers # ============================================================================= _SPLITTER = re.compile(r'(?:[^\s"]|"(?:\\.|[^"])*")+') def parse_aux_line(line: str) -> List[str]: """Parse one AUX data line, detecting bracket or space-delimited format. Bracket format example: ``[1, 100.0], [2, 200.0]`` Space-delimited example: ``101 "Gen 1" 50.0`` Only treats the line as bracket format when it consists entirely of ``[...]`` groups (ignoring whitespace/commas between them). Parameters ---------- line : str A single data line from inside a DATA or SUBDATA block. Returns ------- List[str] Parsed field values. """ line = line.strip() if not line: return [] # Bracket format: entire line is bracket groups separated by commas/spaces stripped = re.sub(r'\[.*?\]', '', line).replace(',', '').strip() if '[' in line and not stripped: return [m.group(1).strip() for m in re.finditer(r'\[(.*?)\]', line)] # Space-delimited (respecting quoted strings) return [x.replace('"', '') for x in _SPLITTER.findall(line)] def parse_aux_content(content: str, fieldlist: List[str], subdatalist: Optional[List[str]] = None) -> List[dict]: """Parse AUX file content into a list of record dicts. Supports both header formats produced by PowerWorld: * **Legacy**: ``DATA (ObjectType, [field1, field2]) { ... }`` * **Concise**: ``ObjectType (field1, field2) { ... }`` Parameters ---------- content : str Full text of the AUX file. fieldlist : List[str] Expected field names for the parent object. subdatalist : List[str], optional SubData section names to look for. Defaults to ``[]``. Returns ------- List[dict] Each dict has keys from *fieldlist* (scalar strings) and from *subdatalist* (lists of lists). Raises ------ ValueError If a ``<SUBDATA>`` tag is malformed (missing name). """ subdatalist = subdatalist or [] # Try Legacy format first, then Concise match = re.search( r'DATA\s*\(\s*\w+\s*,\s*\[.*?\]\s*\)\s*\{(.*)\}', content, re.DOTALL | re.IGNORECASE, ) if not match: match = re.search( r'\w+\s*\(.*?\)\s*\{(.*)\}', content, re.DOTALL, ) if not match: return [] records: List[dict] = [] curr: dict = {} sub_key: Optional[str] = None for line in match.group(1).strip().split('\n'): line = line.strip() if not line or line.startswith('//'): continue if line.upper().startswith('<SUBDATA'): m = re.search(r'<SUBDATA\s+(\w+)>', line, re.IGNORECASE) if not m: raise ValueError(f"Malformed SUBDATA tag: {line!r}") sub_key = m.group(1) elif line.upper().startswith('</SUBDATA>'): sub_key = None elif sub_key: curr.setdefault(sub_key, []).append(parse_aux_line(line)) else: if curr: records.append(curr) tokens = _SPLITTER.findall(line) curr = { k: v.replace('"', '') for k, v in zip(fieldlist, tokens) } for s in subdatalist: curr[s] = [] if curr: records.append(curr) return records def _fmt_aux_value(val) -> str: """Format a single value for AUX output (quote strings, stringify numbers).""" if isinstance(val, str): return f'"{val}"' return str(val) def build_aux_string(objecttype: str, fieldlist: List[str], records: List[dict], subdatatypes: Optional[Union[str, List[str]]] = None) -> str: """Build an AUX DATA block string from records. Parameters ---------- objecttype : str PowerWorld object type (e.g. ``"Gen"``, ``"Contingency"``). fieldlist : List[str] Field names for the parent object. records : List[dict] Each dict must have keys from *fieldlist*. If *subdatatypes* is provided, the dict may also have a key for each subdata type whose value is a list of lists. subdatatypes : str or List[str] or None SubData section names to write. A single string is normalised to a one-element list. Returns ------- str Complete AUX DATA block ready for ``exec_aux``. """ if subdatatypes is None: subdatatypes = [] elif isinstance(subdatatypes, str): subdatatypes = [subdatatypes] header = f'DATA ({objecttype}, [{", ".join(fieldlist)}])\n{{\n' body_lines: List[str] = [] for rec in records: vals = [_fmt_aux_value(rec[f]) for f in fieldlist] body_lines.append(" ".join(vals)) for sdt in subdatatypes: if sdt in rec and rec[sdt]: body_lines.append(f" <SUBDATA {sdt}>") for row in rec[sdt]: body_lines.append(" " + " ".join(_fmt_aux_value(v) for v in row)) body_lines.append(" </SUBDATA>") return header + "\n".join(body_lines) + "\n}\n"