Source code for pyiron_base.utils.parser

# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
"""
General purpose output parser
"""

import ast
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np

__author__ = "Joerg Neugebauer"
__copyright__ = (
    "Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
    "Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "production"
__date__ = "Sep 1, 2017"


[docs] def extract_data_from_str_lst( str_lst: List[str], tag: str, num_args: int = 1 ) -> List[Union[str, List[str]]]: """ General purpose routine to extract any static from a log (text) file Args: str_lst (List[str]): list of strings representing the lines in the file tag (str): string at the beginning of the line num_args (int): number of arguments separated by ' ' or ',' to extract after the tag Returns: List[Union[str, List[str]]]: List of arguments extracted as strings """ def multiple_delimiter_split(s: str, seps: List[str]) -> List[str]: res = [s] for sep in seps: s, res = res, [] for seq in s: res += seq.split(sep) while "" in res: res.remove("") return res collector = [] ind_start = len(tag.split()) for line_in_file in str_lst: if line_in_file.startswith(tag): collector = [] vals = multiple_delimiter_split(line_in_file, (" ", ",")) if num_args == 1: collector.append(vals[ind_start]) else: collector.append(vals[ind_start : num_args + ind_start]) return collector
[docs] def extract_data_from_file( file_name: str, tag: str, num_args: int = 1 ) -> List[Union[str, List[str]]]: """ General purpose routine to extract any static from a log (text) file Args: file_name (str): file name or path to the file, can either be absolute or relative tag (str): string at the beginning of the line num_args (int): number of arguments separated by ' ' or ',' to extract after the tag Returns: List[Union[str, List[str]]]: List of arguments extracted as strings """ with open(file_name) as infile: content = infile.readlines() return extract_data_from_str_lst(str_lst=content, tag=tag, num_args=num_args)
[docs] class Logstatus(object): """ Generic Parser for parsing output files by searching for a specific pattern structure and extracting the data that follows the pattern into the status_dict dictionary. Args: iter_levels (int): Levels of iteration - default = 1 """
[docs] def __init__(self, h5: Optional[Any] = None, iter_levels: int = 1) -> None: """ Initialize the Logstatus object. Args: h5 (Optional[Any]): HDF5 object to store the dictionary in. Defaults to None. iter_levels (int): Levels of iteration. Defaults to 1. """ if h5 is not None: h5.add_group("generic") h5.move_up() self.h5 = h5 self.h5_group_data = h5.getGroup().logStatus self.status_dict = {} self.iter_levels = iter_levels self.iter = iter_levels * [0] self.store_as_vector = [] self.h5_open = False
[docs] def reset_iter(self, dim: int = 0) -> None: """ Reset iteration level Args: dim (int): reset value - default = 0 """ for i in range(dim, self.iter_levels): self.iter[i] = 0
[docs] def raise_iter(self, dim: int = 0) -> None: """ Increase the iteration level Args: dim (int): position - default = 0 """ self.iter[dim] += 1
[docs] def append( self, title: str, data_to_append: Union[list, dict], vec: bool = False ) -> None: """ Append data to the LogStatus object status_dict dictionary Args: title (str): Title of the data to append data_to_append (Union[list, dict]): the data can be of various types vec (bool): [True/False] if the data is a single vector instead of a matrix or a tensor """ if title in self.status_dict.keys(): if vec: raise ValueError( "For appending matrix rather than vector option needed!" ) self.status_dict[title].append([list(self.iter), data_to_append]) else: self.status_dict[title] = [[list(self.iter), data_to_append]]
[docs] def to_hdf(self, hdf: "ProjectHDFio") -> None: """ Store the LogStatus object status_dict dictionary in an HDF5 file Args: hdf (ProjectHDFio): HDF5 object to store the dictionary in. """ for key, value in self.status_dict.items(): if key in self.store_as_vector: if len(value) > 1: raise ValueError( "Multi-dimensional array cannot be saved as vector" ) hdf[key] = np.array(value[0][1]) else: hdf[key] = np.array([val for _, val in value])
[docs] def combine_xyz( self, x_key: str, y_key: str, z_key: str, combined_key: str, as_vector: bool = False, ) -> None: """ Combine three lists representing the x,y,z coordinates, by accessing them from the status_dict dictionary, combining them, store them under the combined_key and remove the other three keys. Args: x_key (str): key of the x coordinates y_key (str): key of the y coordinates z_key (str): key of the z coordinates combined_key (str): name of the combined coordinates as_vector (bool): [True/False] if the combined coordinates should be stored as a single vector instead of a matrix. Defaults to False. """ if ( x_key in self.status_dict and y_key in self.status_dict and z_key in self.status_dict ): combined_lst = [] if as_vector: time_x, val_x = self.status_dict[x_key][0] time_y, val_y = self.status_dict[y_key][0] time_z, val_z = self.status_dict[z_key][0] for val_t_x, val_t_y, val_t_z in zip(val_x, val_y, val_z): combined_lst.append([time_x, [val_t_x, val_t_y, val_t_z]]) else: for var_x, var_y, var_z in zip( self.status_dict[x_key], self.status_dict[y_key], self.status_dict[z_key], ): time_x, val_x = var_x time_y, val_y = var_y time_z, val_z = var_z combined_lst.append( [ time_x, [ [val_t_x, val_t_y, val_t_z] for val_t_x, val_t_y, val_t_z in zip( val_x, val_y, val_z ) ], ] ) del self.status_dict[x_key] del self.status_dict[y_key] del self.status_dict[z_key] self.status_dict[combined_key] = combined_lst
[docs] def combine_mat( self, x_key: str, xy_key: str, xz_key: str, y_key: str, yz_key: str, z_key: str, combined_key: str, ) -> None: """ Combine three lists representing the x,y,z coordinates, by accessing them from the status_dict dictionary, combining them, store them under the combined_key and remove the other three keys. Args: x_key (str): key of the x coordinates xy_key (str): key of the xy coordinates xz_key (str): key of the xz coordinates y_key (str): key of the y coordinates yz_key (str): key of the yz coordinates z_key (str): key of the z coordinates combined_key (str): name of the combined coordinates """ if ( x_key in self.status_dict and y_key in self.status_dict and z_key in self.status_dict ): combined_lst = [] for var_xx, var_xy, var_xz, var_yy, var_yz, var_zz in zip( self.status_dict[x_key], self.status_dict[xy_key], self.status_dict[xz_key], self.status_dict[y_key], self.status_dict[yz_key], self.status_dict[z_key], ): time_xx, val_xx = var_xx time_xy, val_xy = var_xy time_xz, val_xz = var_xz time_yy, val_yy = var_yy time_yz, val_yz = var_yz time_zz, val_zz = var_zz combined_lst.append( [ time_xx, [ [ [var_t_xx, var_t_xy, var_t_xz], [var_t_yx, var_t_yy, var_t_yz], [var_t_zx, var_t_zy, var_t_zz], ] for var_t_xx, var_t_xy, var_t_xz, var_t_yx, var_t_yy, var_t_yz, var_t_zx, var_t_zy, var_t_zz in zip( val_xx, val_xy, val_xz, val_xy, val_yy, val_yz, val_xz, val_yz, val_zz, ) ], ] ) del self.status_dict[x_key] del self.status_dict[xy_key] del self.status_dict[xz_key] del self.status_dict[y_key] del self.status_dict[yz_key] del self.status_dict[z_key] self.status_dict[combined_key] = combined_lst
[docs] def convert_unit(self, key: str, factor: float) -> None: """ Convert the values of a specific key in the status_dict dictionary by multiplying them with a factor. Args: key (str): The key of the values to be converted. factor (float): The factor to multiply the values with. """ if key in self.status_dict: return_lst = [] for step in self.status_dict[key]: time, values = step return_lst.append([time, (np.array(values) * factor).tolist()]) self.status_dict[key] = return_lst
[docs] @staticmethod def extract_item(l_item: str) -> Tuple[str, Optional[List[str]]]: """ Method to extract information from a single line - currently very specific for the Lammps output Args: l_item (str): line to extract information from Returns: Tuple[str, Optional[List[str]]]: the tag_string as string and the arguments as list """ item_list = l_item.split() first_item = item_list[1] if first_item == "NUMBER": num_elements = 3 elif first_item == "BOX": num_elements = 2 else: num_elements = 1 tag = item_list[1 : num_elements + 1] tag_string = " ".join(el for el in tag) if len(item_list) == num_elements + 1: args = None else: args = item_list[num_elements + 1 : :] return tag_string, args
[docs] def extract_from_list( self, list_of_lines: List[str], tag_dict: Dict[str, Any], h5_dict: Optional[Dict[str, str]] = None, key_dict: Optional[Dict[str, str]] = None, ) -> None: """ Main function of the LogStatus class to extract data from an output file by searching for the tag dictionary Args: list_of_lines (List[str]): List of lines from the output file tag_dict (Dict[str, Any]): Dictionary with tags/patterns as key and an additional dictionary to describe the data structure. h5_dict (Optional[Dict[str, str]]): Translation dictionary of output tags as keys to the tags used on the HDF5 file as values. Defaults to None. key_dict (Optional[Dict[str, str]]): Translation dictionary of python internal tags as keys to the output tags as values. Defaults to None. """ val_item = {} tag_vals = {} tag = LogTag(tag_dict, h5_dict, key_dict) iterate_over_lines = iter(list_of_lines) for line_read in iterate_over_lines: while True: if tag.is_item(line_read): # items): tag_name = tag.tag_name if tag.rows() == 0: # read single line_read tag.set_item(tag_vals, self) try: line_read = next(iterate_over_lines) except StopIteration: break else: for _ in range(tag.line_skip()): line_read = next(iterate_over_lines) if isinstance(tag.rows(), str): i_line = 0 while True: try: line_read = next(iterate_over_lines) except StopIteration: break if line_read.find(tag.rows().strip()) > -1: break if "WARNING:" in line_read: break val_line = [ [ ast.literal_eval(line) for line in line_read.split() ] ] if i_line == 0: val_array = np.array(val_line) else: val_array = np.append( arr=val_array, values=val_line, axis=0 ) i_line += 1 else: for i_line in range(tag.rows()): try: line_read = next(iterate_over_lines) except StopIteration: break val_line = [ [ ast.literal_eval(line) for line in line_read.split() ] ] if i_line == 0: val_array = np.array(val_line) else: val_array = np.append( arr=val_array, values=val_line, axis=0 ) if tag.is_func(): val_array = tag.apply_func(val_array) val_item[tag_name] = val_array if np.shape(val_array) == (1, 1): self.append(tag.h5(), val_array[0, 0]) elif tag.test_split(): tag_list = None if tag.split_tag: tag_list = tag_name.split() elif tag.split_arg: if "header" not in tag_dict[tag_name].keys(): tag_list = tag.val_list else: tag_list = tag_dict[tag_name]["header"] for i, t in enumerate(tag_list): if "header" not in tag_dict[tag_name].keys(): self.append( tag.translate(t), np.copy(val_array[:, i]) ) else: self.append(t, np.copy(val_array[:, i])) else: self.append(tag.h5(), np.copy(val_array)) else: try: line_read = next(iterate_over_lines) except StopIteration: break
[docs] def extract_file( self, file_name: str, tag_dict: Dict[str, Any], h5_dict: Optional[Dict[str, str]] = None, key_dict: Optional[Dict[str, str]] = None, ) -> None: """ Main function of the LogStatus class to extract data from an output file by searching for the tag dictionary Args: file_name (str): absolute path to the output file tag_dict (dict): Dictionary with tags/patterns as key and an additional dictionary to describe the data structure. The data structure dictionary can contain the following keys: - "arg": position of the argument - or dimension (":", ":,:") - "type": Python data type - "h5": HDF5 key to store the information - "rows": number of rows from the line where the tag was found - "splitTag": split the tag - [True/False] - "splitArg": split the argument - [True/False] - "lineSkip": skip a line - "func": function to convert the data h5_dict (dict): Translation dictionary of output tags as keys to the tags used on the HDF5 file as values. key_dict (dict): Translation dictionary of python internal tags as keys to the output tags as values. """ with open(file_name, "r") as f: content = f.readlines() self.extract_from_list( list_of_lines=content, tag_dict=tag_dict, h5_dict=h5_dict, key_dict=key_dict )
[docs] class LogTag(object): """ LogTag object to parse for a specific pattern in the output file Args: tag_dict (Dict[str, Any]): Dictionary with tags/patterns as key and an additional dictionary to describe the data structure. The data structure dictionary can contain the following keys: - "arg": position of the argument - or dimension (":", ":,:") - "type": Python data type - "h5": HDF5 key to store the information - "rows": number of rows from the line where the tag was found - "splitTag": split the tag - [True/False] - "splitArg": split the argument - [True/False] - "lineSkip": skip a line - "func": function to convert the data h5_dict (Optional[Dict[str, str]]): Translation dictionary of output tags as keys to the tags used on the HDF5 file as values. Defaults to None. key_dict (Optional[Dict[str, str]]): Translation dictionary of python internal tags as keys to the output tags as values. Defaults to None. """
[docs] def __init__( self, tag_dict: Dict[str, Any], h5_dict: Optional[Dict[str, str]] = None, key_dict: Optional[Dict[str, str]] = None, ) -> None: self._tag_dict = None self._tag_first_word = None self._current = None self._dyn_tags = None self._key_dict = None self._h5_dict = None self._tag_name = None self.tag_dict = tag_dict self.key_dict = key_dict self.h5_dict = h5_dict
@property def current(self) -> Dict[str, Any]: """ Get the current tag Returns: dict: current tag """ return self._current @current.setter def current(self, tag_name: str) -> None: """ Set the current tag Args: tag_name (str): current tag """ if tag_name not in self.tag_dict.keys(): raise ValueError("Unknown tag_name: " + tag_name) self._tag_name = tag_name self._current = self.tag_dict[tag_name] @property def tag_name(self) -> str: """ Get tag name Returns: str: tag name """ return self._tag_name @property def tag_dict(self) -> dict: """ Get tag dictionary with tags/patterns as key and an additional dictionary to describe the data structure. The data structure dictionary can contain the following keys: - "arg": position of the argument - or dimension (":", ":,:") - "type": Python data type - "h5": HDF5 key to store the information - "rows": number of rows from the line where the tag was found - "splitTag": split the tag - [True/False] - "splitArg": split the argument - [True/False] - "lineSkip": skip a line - "func": function to convert the data Returns: dict: tag dictionary """ return self._tag_dict @tag_dict.setter def tag_dict(self, tag_dict: dict) -> None: """ Set tag dictionary with tags/patterns as key and an additional dictionary to describe the data structure. The data structure dictionary can contain the following keys: - "arg": position of the argument - or dimension (":", ":,:") - "type": Python data type - "h5": HDF5 key to store the information - "rows": number of rows from the line where the tag was found - "splitTag": split the tag - [True/False] - "splitArg": split the argument - [True/False] - "lineSkip": skip a line - "func": function to convert the data Args: tag_dict (dict): tag dictionary """ self._tag_dict = tag_dict self._tag_first_word = tuple(self.tag_dict.keys()) self.dyn_tags = tag_dict @property def tag_first_word(self) -> str: """ Get first word of the tag Returns: str: first word """ return self._tag_first_word @property def dyn_tags(self) -> dict: """ Get dynamic tags Returns: dict: dynamic tags """ return self._dyn_tags @dyn_tags.setter def dyn_tags(self, tag_dict: dict) -> None: """ Set dynamic tags Args: tag_dict (dict): tag dictionary """ dyn_tags = {} for w in tag_dict.keys(): items = w.split() if items[0][:1] == "$": dyn_tags[w[1:]] = w self._dyn_tags = dyn_tags @property def key_dict(self) -> dict: """ Get translation dictionary of python internal tags as keys to the output tags as values. Returns: dict: key dictionary """ return self._key_dict @key_dict.setter def key_dict(self, key_dict: dict) -> None: """ Set translation dictionary of python internal tags as keys to the output tags as values. Args: key_dict (dict): key dictionary """ self._key_dict = key_dict @property def h5_dict(self) -> dict: """ Get translation dictionary of output tags as keys to the tags used on the HDF5 file as values. Returns: dict: h5 dictionary """ return self._h5_dict @h5_dict.setter def h5_dict(self, h5_dict: dict) -> None: """ Set translation dictionary of output tags as keys to the tags used on the HDF5 file as values. Args: h5_dict (dict): h5 dictionary """ self._h5_dict = h5_dict
[docs] def is_item(self, item_line: str, start: int = 0) -> bool: """ Check if the current line - item_line - matches one of the provided tags, if that is the case set the tag to be the current tag and update the val_list with the corresponding values. Args: item_line (str): Line of the output file start (int): Character to start with when parsing the item_line - default=0 Returns: bool: [True/False] """ line = item_line.strip() if not line.startswith( self.tag_first_word, start ): # start -> line must start with tag return False tag = None for tag in self.tag_first_word: if start == line.find(tag, start): break items = [ls.strip() for ls in line[len(tag) :].split()] self.current = tag self.val_list = items return True
[docs] def get_item(self, item: str, default: Any) -> Union[list, dict, int, float]: """ If item is part of the current dictionary keys the corresponding value is returned otherwise the default is returned. Args: item (str): dictionary key default (list, dict, int, float): Default value Returns: list, dict, int, float: The values connected to the key item in the current dictionary and if item is not a key in the current dictionary return the default value. """ if self.current is None: raise ValueError("current tag not defined!") if item in self.current.keys(): return self.current[item] else: return default
[docs] def h5(self) -> str: """ Translate current tag to HDF5 tag using the tag dictionary Returns: str: hdf5 key name """ return self.get_item(item="h5", default=self.tag_name)
[docs] def translate(self, item: str) -> str: """ Translate current tag to HDF5 tag using the h5_dict dictionary Args: item (str): Python tag Returns: str: HDF5 tag """ if self.h5_dict is None: raise ValueError("h5_dict is None!" + item) if item in self.h5_dict.keys(): return self.h5_dict[item] else: raise ValueError("tag not in h5_dict: " + item)
[docs] def arg(self) -> str: """ Get tag argument Returns: str: tag arguments """ l_arg = self.get_item(item="arg", default=0) if isinstance(l_arg, str): return l_arg else: return str(l_arg)
[docs] def line_skip(self) -> bool: """ Check how many lines should be skipped. Returns: bool: [True/ False] """ return bool(self.get_item(item="lineSkip", default=0))
[docs] def rows(self) -> Union[int, str]: """ Number of rows to parse Returns: int, str: number of rows """ rows = self.get_item(item="rows", default=0) try: return int(rows) except ValueError: return rows
[docs] def test_split(self) -> bool: """ Check if the argument or the tag should be split - if "splitArg" or "splitTag" is included in the tag_dict dictionary. Returns: bool: [True/ False] """ self.split_arg = self.get_item(item="splitArg", default=False) self.split_tag = self.get_item(item="splitTag", default=False) return self.split_arg or self.split_tag
[docs] def is_func(self) -> bool: """ Check if a function is defined to convert the data - if "func" is included in the tag_dict dictionary Returns: bool: [True/ False] """ my_func = self.get_item(item="func", default=None) return my_func is not None
[docs] def apply_func( self, val: Union[list, dict, int, float] ) -> Union[list, dict, int, float]: """ Apply the function on a given value Args: val (dict, list, float, int): value to apply the function on Returns: dict, list, float, int: result of applying the function """ my_func = self.get_item(item="func", default=None) if my_func is not None: return my_func(val)
[docs] def set_item( self, tag_vals: dict, log_file: Logstatus ) -> Tuple[str, dict, int, bool]: """ Set LogTag item Args: tag_vals (dict): tag value dictionary log_file (Logstatus): Logstatus object Returns: list: tag name, tag values, rows, line skip [True/False] """ tag_name = self.tag_name if self.rows() == 0: if not len(self.arg()) == 1: val = [] for i_item in ast.literal_eval(self.arg()): val.append(ast.literal_eval("self.val_list[" + i_item + "]")) else: # input is an array val = eval("self.val_list[" + self.arg() + "]") if isinstance(val, str): val = ast.literal_eval(val) tag_vals[tag_name] = val if len(self.arg()) == 1: log_file.append(self.h5(), data_to_append=val) else: for i_num, i_val in enumerate(val): log_file.append(self.h5()[i_num], data_to_append=i_val) if tag_name in self.dyn_tags.keys(): self.resolve_dynamic_variable(val) return tag_name, tag_vals, self.rows(), self.line_skip()
[docs] def resolve_dynamic_variable(self, val: list) -> None: """ Resolve dynamic variable using the key_dict dictionary Args: val: values to resolve """ d_name = self.dyn_tags[self.tag_name] if self.key_dict is not None: val = [self.key_dict[v] for v in val if v in self.key_dict.keys()] resolved_name = " ".join(val) v = self.tag_dict[d_name] self.tag_dict[resolved_name] = v del self.tag_dict[d_name] self.dyn_tags = self.tag_dict self._tag_first_word = tuple(self.tag_dict.keys())