"""Helper functions for data conversion for SimAuto COM interface."""
import json
import logging
import os
import re
import tempfile
import uuid
from pathlib import Path, PureWindowsPath
from typing import List, Optional, Tuple, Union, Sequence
import pandas as pd
import pythoncom
import win32com
from win32com.client import VARIANT
# =============================================================================
# PowerWorld Command String Formatting Helpers
# =============================================================================
def load_ts_csv_results(base_path: Path, delete_files: bool = False) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Reads and parses transient stability results from CSV files generated by PowerWorld.
Args:
base_path: The base file path used in the TSGetResults command.
delete_files: Whether to delete the found files after reading.
Returns:
Tuple[pd.DataFrame, pd.DataFrame]: (Metadata, Time-Series Data)
"""
logger = logging.getLogger(__name__)
# PowerWorld appends suffixes, so we search for the base name pattern.
search_pattern = f"{base_path.stem}*.csv"
found_files = list(base_path.parent.glob(search_pattern))
meta = pd.DataFrame()
data_frames = []
try:
if not found_files:
return meta, pd.DataFrame()
header_files = [f for f in found_files if "_header" in f.name.lower()]
data_files = [f for f in found_files if f not in header_files]
# --- Process Header ---
if header_files:
# Use the first header file found
header_path = header_files[0]
try:
# Check if first line is just a title (e.g. "ObjectFields")
with header_path.open('r') as f:
first_line = f.readline()
# PowerWorld sometimes puts "ObjectFields" on the first line alone
skip_rows = 1 if "ObjectFields" in first_line and "," not in first_line else 0
meta = pd.read_csv(header_path, sep=',', skiprows=skip_rows)
meta.columns = meta.columns.str.strip()
# Standardize column names
rename_map = {
'Column': 'ColHeader',
'Object': 'ObjectType',
'Variable': 'VariableName',
'Key 1': 'PrimaryKey',
'Key 2': 'SecondaryKey'
}
meta.rename(columns=rename_map, inplace=True)
# Force ColHeader to be 0-based index strings to match data columns
meta['ColHeader'] = [str(i) for i in range(len(meta))]
except Exception as e:
logger.warning(f"Failed to read header file {header_path}: {e}")
# --- Process Data ---
for dpath in data_files:
try:
df = pd.read_csv(dpath, sep=',', header=None)
if not df.empty:
# Rename time column (index 0) and data columns (1..N)
# Map 0 -> "time", 1 -> "0", 2 -> "1", ...
col_map = {0: "time"}
col_map.update({i: str(i-1) for i in range(1, len(df.columns))})
df.rename(columns=col_map, inplace=True)
data_frames.append(df)
except Exception as e:
logger.warning(f"Failed to read data file {dpath}: {e}")
data = pd.concat(data_frames, ignore_index=True) if data_frames else pd.DataFrame()
# Ensure time is float and sorted
if 'time' in data.columns:
data['time'] = pd.to_numeric(data['time'], errors='coerce')
data.sort_values('time', inplace=True)
return meta, data
finally:
if delete_files:
for f in found_files:
try:
f.unlink(missing_ok=True)
except OSError as e:
logger.warning(f"Failed to unlink temp file {f}: {e}")
[docs]
def get_temp_filepath(suffix: str = ".csv") -> str:
"""Generates a unique temporary filepath."""
temp_dir = tempfile.gettempdir()
unique_name = f"esa_{uuid.uuid4()}{suffix}"
return os.path.join(temp_dir, unique_name)
[docs]
def df_to_aux(fp, df, object_name: str):
"""Convert a dataframe to PW aux/axd data section.
Parameters
----------
fp : file
File handler.
df : pandas.DataFrame
DataFrame to convert.
object_name : str
PowerWorld object type.
"""
# write the header
fields = ",".join(df.columns.tolist())
header = f"DATA ({object_name}, [{fields}])"
header_chunks = header.split(",")
i = 0
line_width = 0
max_width = 86
working_line = []
container = []
while True:
if line_width + len(header_chunks[i]) <= max_width:
working_line.append(header_chunks[i])
line_width += len(header_chunks[i])
i += 1
else:
container.append(",".join(working_line))
working_line = []
line_width = 0
if i == len(header_chunks):
if len(working_line):
container.append(",".join(working_line))
break
container = [ls + "," for ls in container[:-1]] + [container[-1]]
container = [container[0]] + [" " + ls for ls in container[1:]] # add tab to each line
# write the remaining part
container.append("{")
container.extend(json.dumps(row, separators=(" ", ": "))[1:-1] for row in df.values.tolist())
container.append("}\r\n")
fp.write("\n".join(container))
[docs]
def convert_to_windows_path(p):
"""Given a path, p, convert it to a Windows path."""
return str(PureWindowsPath(p))
[docs]
def convert_list_to_variant(list_in: list) -> VARIANT:
"""Given a list, convert to a variant array."""
# noinspection PyUnresolvedReferences
return VARIANT(pythoncom.VT_VARIANT | pythoncom.VT_ARRAY, list_in)
[docs]
def convert_df_to_variant(df):
"""Given a DataFrame, convert to a variant array for Rect functions."""
data_as_list = df.values.tolist()
return win32com.client.VARIANT(pythoncom.VT_ARRAY | pythoncom.VT_VARIANT, data_as_list)
[docs]
def convert_nested_list_to_variant(list_in: list) -> List[VARIANT]:
"""Given a list of lists, convert to a variant array."""
return [convert_list_to_variant(sub_array) for sub_array in list_in]
[docs]
def create_object_string(object_type: str, *keys) -> str:
"""
Helper to format a PowerWorld object string identifier.
This function creates strings formatted like ``[BUS 1]`` or ``[BRANCH 1 2 1]``
which are used to identify objects in SimAuto script commands.
Parameters
----------
object_type : str
The type of object (e.g. "Bus", "Gen", "Branch").
*keys : Any
The key values identifying the object.
Returns
-------
str
Formatted string like ``[ObjectType key1 key2 ...]``.
"""
parts = [object_type.upper()]
for key in keys:
parts.append(str(key))
return f"[{' '.join(parts)}]"
def pack_args(*args) -> str:
"""
Formats arguments for a PowerWorld script command.
Filters out trailing None values. Converts remaining None values to empty strings.
Joins arguments with commas.
"""
args_list = list(args)
while args_list and args_list[-1] is None:
args_list.pop()
return ", ".join("" if a is None else str(a) for a in args_list)
# =============================================================================
# AUX Parsing / Building Helpers
# =============================================================================
_SPLITTER = re.compile(r'(?:[^\s"]|"(?:\\.|[^"])*")+')
def parse_aux_line(line: str) -> List[str]:
"""Parse one AUX data line, detecting bracket or space-delimited format.
Bracket format example: ``[1, 100.0], [2, 200.0]``
Space-delimited example: ``101 "Gen 1" 50.0``
Only treats the line as bracket format when it consists entirely of
``[...]`` groups (ignoring whitespace/commas between them).
Parameters
----------
line : str
A single data line from inside a DATA or SUBDATA block.
Returns
-------
List[str]
Parsed field values.
"""
line = line.strip()
if not line:
return []
# Bracket format: entire line is bracket groups separated by commas/spaces
stripped = re.sub(r'\[.*?\]', '', line).replace(',', '').strip()
if '[' in line and not stripped:
return [m.group(1).strip() for m in re.finditer(r'\[(.*?)\]', line)]
# Space-delimited (respecting quoted strings)
return [x.replace('"', '') for x in _SPLITTER.findall(line)]
def parse_aux_content(content: str, fieldlist: List[str],
subdatalist: Optional[List[str]] = None) -> List[dict]:
"""Parse AUX file content into a list of record dicts.
Supports both header formats produced by PowerWorld:
* **Legacy**: ``DATA (ObjectType, [field1, field2]) { ... }``
* **Concise**: ``ObjectType (field1, field2) { ... }``
Parameters
----------
content : str
Full text of the AUX file.
fieldlist : List[str]
Expected field names for the parent object.
subdatalist : List[str], optional
SubData section names to look for. Defaults to ``[]``.
Returns
-------
List[dict]
Each dict has keys from *fieldlist* (scalar strings) and from
*subdatalist* (lists of lists).
Raises
------
ValueError
If a ``<SUBDATA>`` tag is malformed (missing name).
"""
subdatalist = subdatalist or []
# Try Legacy format first, then Concise
match = re.search(
r'DATA\s*\(\s*\w+\s*,\s*\[.*?\]\s*\)\s*\{(.*)\}',
content, re.DOTALL | re.IGNORECASE,
)
if not match:
match = re.search(
r'\w+\s*\(.*?\)\s*\{(.*)\}',
content, re.DOTALL,
)
if not match:
return []
records: List[dict] = []
curr: dict = {}
sub_key: Optional[str] = None
for line in match.group(1).strip().split('\n'):
line = line.strip()
if not line or line.startswith('//'):
continue
if line.upper().startswith('<SUBDATA'):
m = re.search(r'<SUBDATA\s+(\w+)>', line, re.IGNORECASE)
if not m:
raise ValueError(f"Malformed SUBDATA tag: {line!r}")
sub_key = m.group(1)
elif line.upper().startswith('</SUBDATA>'):
sub_key = None
elif sub_key:
curr.setdefault(sub_key, []).append(parse_aux_line(line))
else:
if curr:
records.append(curr)
tokens = _SPLITTER.findall(line)
curr = {
k: v.replace('"', '')
for k, v in zip(fieldlist, tokens)
}
for s in subdatalist:
curr[s] = []
if curr:
records.append(curr)
return records
def _fmt_aux_value(val) -> str:
"""Format a single value for AUX output (quote strings, stringify numbers)."""
if isinstance(val, str):
return f'"{val}"'
return str(val)
def build_aux_string(objecttype: str, fieldlist: List[str],
records: List[dict],
subdatatypes: Optional[Union[str, List[str]]] = None) -> str:
"""Build an AUX DATA block string from records.
Parameters
----------
objecttype : str
PowerWorld object type (e.g. ``"Gen"``, ``"Contingency"``).
fieldlist : List[str]
Field names for the parent object.
records : List[dict]
Each dict must have keys from *fieldlist*. If *subdatatypes* is
provided, the dict may also have a key for each subdata type
whose value is a list of lists.
subdatatypes : str or List[str] or None
SubData section names to write. A single string is normalised
to a one-element list.
Returns
-------
str
Complete AUX DATA block ready for ``exec_aux``.
"""
if subdatatypes is None:
subdatatypes = []
elif isinstance(subdatatypes, str):
subdatatypes = [subdatatypes]
header = f'DATA ({objecttype}, [{", ".join(fieldlist)}])\n{{\n'
body_lines: List[str] = []
for rec in records:
vals = [_fmt_aux_value(rec[f]) for f in fieldlist]
body_lines.append(" ".join(vals))
for sdt in subdatatypes:
if sdt in rec and rec[sdt]:
body_lines.append(f" <SUBDATA {sdt}>")
for row in rec[sdt]:
body_lines.append(" " + " ".join(_fmt_aux_value(v) for v in row))
body_lines.append(" </SUBDATA>")
return header + "\n".join(body_lines) + "\n}\n"