Source code for pygromos.files.trajectory.tre

File:            Class for tre files in pandas
    The pandas trajectory TRE class offers a easy method to process GROMOS's .tre files in python
    The tre files are parsed into an easy to use pandas dataframe.

    This class should be a alternative for the data post processing with ene_ana in gromos++

Author: Marc Thierry Lehner

TODO: add stride option to all member functions
TODO: add support for periodic boundary condition

TODO: add ene_ana functions


# imports
import pandas as pd
import numpy as np

import pygromos.files.trajectory._general_trajectory as traj
from pygromos.files.trajectory.tre_field_libs.ene_fields import (
from pygromos.utils.typing import Tuple, Dict, Tre_Type
from pygromos.analysis import energy_analysis as ea

[docs]class Tre(traj._General_Trajectory): """ The Tre files are results from Gromos simulations, that store all the calculated energies and properties during the simulation. """ _gromos_file_ending: str = "tre" _contributions_nonbonded_names: Tuple[str] = ( "Lennard-Jones", "Coulomb/RF", "lattice sum real", "lattice sum reciproc", ) _contributions_bonded_names: Tuple[str] = ("bond", "angle", "improper", "dihedral", "crossdihedral") _contributions_baths: Tuple[str] = ("kinetic total", "centre of mass", "internal/rotational") _contributions_special: Tuple[str] = ( "constraints", "pos. restraints", "dist. restraints", "disfield res", "dihe. restr.", "SASA", "SASA volume", "jvalue", "rdc", "local elevation", "path integral", "angle restraint", ) _contributions_temperature: Tuple[str] = ("total", "com", "ir", "scaling factor")
[docs] def __init__( self, input_value: str, auto_save: bool = True, stride: int = 1, skip: int = 0, _ene_ana_names: gromos_tre_block_names_table = gromos_2021_tre_block_names_table, ): """ Build a Gromos energy trajectory file (.tre) Parameters ---------- input_value : str,None The input value can be None, or a string path to the .tre/.tre.gz file. auto_save : bool, optional automatically save the file, by default True stride : int, optional only read every x value, by default 1 skip : int, optional skip the first x timesteps, by default 0 _ene_ana_names : gromos_tre_block_names_table, optional get the field names after the provided standard., by default gromos_2020_tre_block_names_table """ super().__init__(input_value, auto_save=auto_save, stride=stride, skip=skip) self.tre_block_name_table = _ene_ana_names if "time" in self.database.columns: self.time = self.database.time if "step" in self.database.columns: self.step = self.database.step
"""-------------------------------------------------------------- Basic getters for Subblocks and Subsubblocks ----------------------------------------------------------------- The following functions, return well formated values from the tre trajectories. ENERGY03 - fields: """
[docs] def get_totals(self) -> pd.DataFrame: """ get all totals of the system """ # print(self.database["totals"][0].shape, self.database["totals"][0]) if not hasattr(self, "totals"): totals_data = np.stack(self.database["totals"].to_numpy()) self.totals = pd.DataFrame( data=totals_data, index=self.database.time, columns=self.tre_block_name_table.totals_subblock_names[: totals_data.shape[1]], ) else: pass return self.totals
[docs] def get_totene(self) -> pd.DataFrame: """ get the total System energy / per time """ self.totene = self.get_totals()["totene"] return self.totene
[docs] def get_totkin(self) -> pd.DataFrame: """ get the total kinetic Energy / per time """ self.totkin = self.get_totals()["totkin"] return self.totkin
[docs] def get_totpot(self) -> pd.DataFrame: """ get the total potential Energy / per time """ self.totpot = self.get_totals()["totpot"] return self.totpot
[docs] def get_totcov(self) -> pd.DataFrame: """ get the total covalent contribution/ per time """ self.totcov = self.get_totals()["totcov"] return self.totcov
[docs] def get_totbonded(self) -> pd.DataFrame: """ get the total bonded contribution/ per time """ self.totbonded = self.get_totals()["totbond"] return self.totbonded
[docs] def get_totangle(self) -> pd.DataFrame: """ get the total angle contribution/ per time """ self.totangle = self.get_totals()["totangle"] return self.totangle
[docs] def get_totdihedral(self) -> pd.DataFrame: """ get the total dihedral contribution/ per time """ self.totdihedral = self.get_totals()["totdihedral"] return self.totdihedral
[docs] def get_totnonbonded(self) -> pd.DataFrame: """ get the total nonbonded contribution/ per time """ self.totnonbonded = self.get_totals()["totnonbonded"] return self.totnonbonded
[docs] def get_totlj(self) -> pd.DataFrame: """ get the total lennard jones contribution/ per time """ self.totlj = self.get_totals()["totlj"] return self.totlj
[docs] def get_totcrf(self) -> pd.DataFrame: """ get the total columbic reactionfield contribution/ per time """ self.totcrf = self.get_totals()["totcrf"] return self.totcrf
[docs] def get_baths(self) -> pd.DataFrame: """ extract data of the baths block """ return self._set_data(attibute_name="baths", rows_name="baths", field_names=self._contributions_baths)
[docs] def get_bondedContributions(self) -> Dict[int, pd.DataFrame]: """ extract data of the bonded block """ return self._set_data( attibute_name="bondedContributions", rows_name="bonded", field_names=self._contributions_bonded_names )
[docs] def get_nonbondedContributions(self) -> Dict[int, Dict[int, pd.DataFrame]]: """ This function returns a nice formatted dictionary for the nonbonded Contributions according to the Force groups of the tre file. Returns ------- Dict[int, Dict[int, pd.DataFrame]] The dictionary containing the nonbonded contributions of the single ForceGroups with each other. Dict[ForceGroupI, Dict[ForceGroupJ, NonbondedEnergyContribs]] Raises ------ ValueError returns Value error, if the dimensionality of the different contributions does not fit to the _nonbonded_contribution_names. """ if not hasattr(self, "forceGroupNonbondedContributions"): # Check contibution_dimensionalities nFFContributions = self.database.nonbonded[0].shape[1] if nFFContributions != len(self._contributions_nonbonded_names): raise ValueError( "The dimensionality of the NonbondedContributions is not corresponding to the expected dimensionality.\n expected: " + str(len(self._contributions_nonbonded_names)) + " \t found: " + str(nFFContributions) ) # Get the number of force groups: nForceGroup = self._get_numberOfForceGroupsFromNonbondeds() # Generate dictionary for the different contributions t = 0 forceGroupNonbondedContributions = {} for i in range(1, 1 + nForceGroup): forceGroupNonbondedContributions[i] = {} for j in range(1, 1 + nForceGroup): forceGroupNonbondedContributions[i][j] = pd.DataFrame( list(self.database.nonbonded.apply(lambda x: x[i + j - 2])), columns=self._contributions_nonbonded_names, ) t += 1 self.forceGroupNonbondedContributions = forceGroupNonbondedContributions return self.forceGroupNonbondedContributions
[docs] def get_specialContributions(self) -> Dict[int, pd.DataFrame]: # CHECK THIS """ extract data of the special block """ return self._set_data( attibute_name="specialContributions", rows_name="special", field_names=self._contributions_special )
[docs] def get_eds(self) -> pd.DataFrame: """ Get EDS energies if present. Returns ------- pd.DataFrame returns datafrae with columns for each endstate. """ if not hasattr(self, "eds"): num_states = self.database["eds"][0][0] if num_states > 0: state_strings = [ [ column + "_" + str(state_row) for column in self.tre_block_name_table.eds_subblock_names_singleState ] for state_row in range(1, 1 + int(num_states)) ] self.tre_block_name_table.eds_subblock_names = ["numstates"] self.tre_block_name_table.eds_subblock_names.extend(list(np.concatenate(state_strings))) else: self.tre_block_name_table.eds_subblock_names = ["numstates"] self.eds = pd.DataFrame( data=np.stack(self.database["eds"].to_numpy()), index=self.database.time, columns=self.tre_block_name_table.eds_subblock_names, ) else: pass return self.eds
[docs] def get_precalclam(self) -> pd.DataFrame: """ Get the energies calculated for the different defined lambda values in a trajectory. Returns ------- pd.DataFrame return the energies calculated for the different lambda values. """ if not hasattr(self, "precalclam"): num_lam = self.database["precalclam"][0][0] if num_lam > 0: state_strings = [ [column + "_" + str(state_row) for column in self.tre_block_name_table.lam_subblock_names_singleLam] for state_row in range(1, 1 + int(num_lam)) ] self.tre_block_name_table.lam_subblock_names = ["nr_lambdas"] self.tre_block_name_table.lam_subblock_names.extend(list(np.concatenate(state_strings))) self.tre_block_name_table.lam_subblock_names.extend(["A_dihedral", "B_dihedral"]) else: self.tre_block_name_table.lam_subblock_names = ["nr_lambdas", "A_dihedral", "B_dihedral"] self.precalclam = pd.DataFrame( data=np.stack(self.database["precalclam"].to_numpy()), index=self.database.time, columns=self.tre_block_name_table.lam_subblock_names, ) else: pass return self.precalclam
""" VOLUMEPRESSURE03 - fields: """
[docs] def get_mass(self) -> pd.Series: """ returns the systems mass per timestep Returns ------- pd.Series series of mass per time """ return pd.Series(map(float, self.database.mass), name="mass", index=self.database.time)
[docs] def get_temperature_Info(self) -> Dict[int, pd.DataFrame]: """ temperature baths Returns ------- Dict[int,pd.DataFrame] returns the full info of the temperature baths per bath """ return self._set_data( attibute_name="temperatureInfo", rows_name="temperature", field_names=self._contributions_temperature )
[docs] def get_temperature( self, ) -> pd.DataFrame: # CHECK THIS - tempcontrib:total com ir scaling factor """ Get the temperature in Kelvin for all temperature baths for every time step Returns ------- pd.DataFrame pandas dataframe with all temperatures """ tmps = [] for i, row in self.database.iterrows(): row_d = {"bath" + str(i + 1): temp for i, temp in enumerate(row["temperature"][:, 0])} tmps.append(row_d) return pd.DataFrame(tmps, index=self.database.time)
""" UTILS """
[docs] def _set_data(self, attibute_name: str, rows_name: str, field_names: Tuple[str]) -> pd.DataFrame: """_summary_ This function extracts generially the information of a column per time Parameters ---------- attibute_name : str name of the target attribute rows_name : str name of the block, that shall be extracted field_names : Tuple[str] name of the fields in each row Returns ------- pd.DataFrame contains the extracted information """ if not hasattr(self, attibute_name): nDimGroups = self.database[rows_name][0].shape[0] setattr(self, attibute_name, {}) for i in range(nDimGroups): getattr(self, attibute_name).update( { i: pd.DataFrame( list(map(lambda x: list(x[i]), list(self.database[rows_name]))), columns=field_names, index=self.database.time, ) } ) return getattr(self, attibute_name)
[docs] def _get_numberOfForceGroupsFromNonbondeds(self) -> int: """ This function gets the number of Force groups in the simulation from the nonbonded block. Returns ------- int number of ForceGroups used for this tre. """ nForceGroupDim = self.database.nonbonded[0].shape[0] r = 0 for nForceGroup in range(1, nForceGroupDim + 1): r += nForceGroup if r == nForceGroupDim: return nForceGroup elif r > nForceGroupDim: raise Exception("That should not happen!")
"""-------------------------------------------------------------- Additional predefined function for common analysis ----------------------------------------------------------------- """
[docs] def get_density(self) -> pd.DataFrame: """ Calculate the density for every frame. (Uses mass and the first volume entry) Returns ------- pd.DataFrame Dataframe with the densities for all time steps """ return pd.Series( list( self.database[["mass", "volume"]].apply(lambda x: ea.get_density(mass=x[0][0], volume=x[1][0]), axis=1) ), index=self.database.time, name="density", )
[docs] def get_Hvap(self, gas_traj: Tre_Type, nMolecules: int = 1, temperature: float = None) -> float: gas_totpot_energy = 0 if type(gas_traj) == type(self): gas_totpot_energy = gas_traj.get_totpot().mean() elif type(gas_traj) == float: gas_totpot_energy = gas_traj else: raise TypeError("Did not understand the type of gas. Allowed are float (E_gas) or Tre (gas_trajectory)") liq_totpot_energy = self.get_totpot().mean() # get temperature from liq trajectory if not given if temperature is None: temperature = float(self.get_temperature().mean()[0]) # calculate heat of vaporization self.heat_vap = ea.get_Hvap( liq_totpot_energy=liq_totpot_energy, gas_totpot_energy=gas_totpot_energy, temperature=temperature, nMolecules=nMolecules, ) return self.heat_vap