Source code for pygromos.files.trajectory.trc

"""
File:            Class for trc files in pandas
Description:
    The pandas trajectory TRC class offers a easy method to process GROMOS's .trc files in python
    The trc files are parsed into an easy to use pandas dataframe

Author: Marc Thierry Lehner

TODO: add stride option to all member functions
TODO: add support for periodic boundary condition
TODO: add support for rdkit mol selector
TODO: add support for rdkit conformers

"""

# imports
import tempfile
import mdtraj
import pandas as pd
import numpy as np
import nglview as nj
from copy import deepcopy

from pygromos.utils import bash
from pygromos.files.blocks._general_blocks import TITLE
from pygromos.files.coord.cnf import Cnf
from pygromos.files.blocks.coord_blocks import POSITION
from pygromos.utils.typing import Dict, List, Tuple
from pygromos.visualization.coordinates_visualization import visualize_system


[docs]class Trc(mdtraj.Trajectory): # Attributes TITLE: TITLE _future_file: bool path: str # Todo: we need to set this variable. def __init__( self, xyz=None, topology=None, time=None, unitcell_lengths=None, unitcell_angles=None, traj_path=None, in_cnf: [str, Cnf] = None, timestep_duration: float = 0.002, _future_file: bool = False, ): self._future_file = _future_file if xyz is None and topology is None and traj_path is None and in_cnf is None: self._future_file = None if traj_path is not None and (traj_path.endswith(".h5") or traj_path.endswith(".hf5")): trj = self.load(traj_path) self.__dict__.update(vars(trj)) elif traj_path is not None and (traj_path.endswith(".trc") or traj_path.endswith(".trc.gz")): # Parse TRC compress = False if traj_path.endswith(".gz"): traj_path = bash.compress_gzip(in_path=traj_path, extract=True) compress = True unitcell_angles = None unitcell_lengths = None if isinstance(traj_path, str): xyz, step, time, unitcell_lengths, unitcell_angles = self.parse_trc_efficiently(traj_path) if compress: traj_path = bash.compress_gzip(in_path=traj_path) # Topology from Cnf if isinstance(in_cnf, str): in_cnf = Cnf(in_cnf) elif isinstance(in_cnf, Cnf) and hasattr(in_cnf, "POSITION"): pass else: in_cnf = self.get_dummy_cnf(xyz) # get cnf boxDims if hasattr(in_cnf, "GENBOX") and (unitcell_lengths is None and unitcell_angles is None): unitcell_angles = np.array(list(in_cnf.GENBOX.angles) * len(xyz)).reshape( len(xyz), len(in_cnf.GENBOX.length) ) unitcell_lengths = np.array(list(in_cnf.GENBOX.length) * len(xyz)).reshape( len(xyz), len(in_cnf.GENBOX.length) ) # Topo tmp file tmpFile = tempfile.NamedTemporaryFile(suffix="_tmp.pdb") in_cnf.write_pdb(tmpFile.name) single = mdtraj.load_pdb(tmpFile.name) tmpFile.close() super().__init__( xyz=xyz, topology=single.topology, time=time, unitcell_lengths=unitcell_lengths, unitcell_angles=unitcell_angles, ) self._step = step elif not (xyz is None and topology is None): super().__init__( xyz=xyz, topology=topology, time=time, unitcell_lengths=unitcell_lengths, unitcell_angles=unitcell_angles, ) self._step = np.array(np.round(self._time / timestep_duration), dtype=int) self.TITLE = TITLE(content=" Generic Title... to be changed by YOU!") else: self._unitcell_lengths = [] self._unitcell_angles = [] self._xyz = np.array([], ndmin=2) self._topology = None self._future_file = True self.path = traj_path def __copy__(self): attribs = { "xyz": deepcopy(self._xyz), "topology": deepcopy(self._topology), "unitcell_lengths": deepcopy(self._unitcell_lengths), "unitcell_angles": deepcopy(self._unitcell_angles), "traj_path": deepcopy(self.path), "_future_file": self._future_file, } for additional_key in ["unitcell_angles", "unitcell_angles"]: if hasattr(self, additional_key) and getattr(self, additional_key) is not None: attribs.update({additional_key: deepcopy(getattr(self, additional_key))}) cCls = self.__class__(**attribs) cCls._step = deepcopy(self._step) cCls.TITLE = deepcopy(self.TITLE) cCls._time = deepcopy(self._time) return cCls def __getitem__(self, key: int): t = self.slice(key) if hasattr(t, "_step"): t._step = deepcopy(np.array(self._step[key], ndmin=1)) if hasattr(t, "_time"): t._time = deepcopy(np.array(self._time[key], ndmin=1)) if hasattr(t, "TITLE"): t.TITLE = deepcopy(self.TITLE) return t
[docs] def get_dummy_cnf(self, xyz: np.array) -> Cnf: from pygromos.files.blocks import coords new_Cnf = Cnf(None) new_Cnf.add_block(blocktitle="TITLE", content="THis is a dummy top depending on the first Frame.") new_Cnf.add_block( blocktitle="POSITION", content=[ coords.atomP(resID=1, resName="DUM", atomType="C", atomID=i, xp=coord[0], yp=coord[1], zp=coord[2]) for i, coord in enumerate(xyz[0]) ], ) return new_Cnf
[docs] def parse_trc_efficiently(self, traj_path: str) -> Tuple[np.array, np.array, np.array]: self._block_map = self._generate_blockMap(in_trc_path=traj_path) # build block mapping rep_time = 1 start = self._block_map["TIMESTEP"] title = self._block_map["TITLE"] end = start + self._block_map["POSITIONRED"] - 1 timestep_block_length = sum( [self._block_map[key] for key in self._block_map if (key not in ["TITLE", "commentLines"])] ) chunk = self._block_map["POSITIONRED"] - self._block_map["commentLines"] - 2 + 1 # block mapping logic base_rows = ( lambda x: not ( (((x - title) % timestep_block_length > start) and ((x - title) % timestep_block_length < end)) or (x - title) % timestep_block_length == rep_time ) if x > title else True ) unitcell_length = None unitcell_angles = None genbox_present = False if "GENBOX" in self._block_map: genbox_present = True chunk += 2 # timestep_block_length += 7 skip_rows1 = lambda x: ( base_rows(x) and not ((x - title) % timestep_block_length == end + 3) and not ((x - title) % timestep_block_length == end + 4) ) skip_rows = lambda x: (x < title) or skip_rows1(x) unitcell_length = [] unitcell_angles = [] else: skip_rows = lambda x: (x < title) or base_rows(x) # parsing data = [] time = [] step = [] for b in pd.read_table( traj_path, delim_whitespace=True, skiprows=skip_rows, names=["x", "y", "z"], chunksize=chunk, comment="#" ): if genbox_present: data.append(b.values[1:-2, :]) time.append(b.values[0, 1]) step.append(b.values[0, 0]) unitcell_length.append(b.values[-2, :]) unitcell_angles.append(b.values[-1, :]) else: data.append(b.values[1:, :]) time.append(b.values[0, 1]) step.append(b.values[0, 0]) # make np.Arrays xyz = np.array(data) time = np.array(time) step = np.array(step) if genbox_present: unitcell_length = np.array(unitcell_length) unitcell_angles = np.array(unitcell_angles) return xyz, step, time, unitcell_length, unitcell_angles
@property def step(self) -> np.array: return self._step # Analysis of traj
[docs] def rmsd(self, reference_frame: int = 0, reference: mdtraj.Trajectory = None) -> pd.DataFrame: if reference is None: reference = self time_scale = pd.Series(data=self.time, name="time") return pd.DataFrame({"rmsd": mdtraj.rmsd(self, reference, reference_frame)}, index=time_scale)
[docs] def distances( self, atom_pairs: List[Tuple[int, int]], periodic: bool = True, opt: bool = True, ) -> pd.DataFrame: arr = mdtraj.compute_distances(self, atom_pairs=atom_pairs, periodic=periodic, opt=opt) time_scale = pd.Series(data=self.time, name="time") return pd.DataFrame( {str(key[0]) + "-" + str(key[1]): val for key, val in zip(atom_pairs, arr.T)}, index=time_scale )
def _generate_blockMap(self, in_trc_path: str) -> Dict[str, int]: block_map = {} with open(in_trc_path, "r") as file_handle: inBlock = False inTitleBlock = False blockKey = None nLines = 0 nCommentLines = 0 titleStr = [] max_it = 1000000 i = 0 while max_it > i: line = file_handle.readline().strip() if line.startswith("#"): nCommentLines += 1 if "END" == line.strip(): block_map.update({blockKey: nLines}) if blockKey == TITLE.__name__: inTitleBlock = False self.TITLE = TITLE(content="\n".join(titleStr)) inBlock = False elif not inBlock: blockKey = line.strip() if blockKey in block_map: break elif blockKey == TITLE.__name__: inTitleBlock = True inBlock = True nLines = 1 elif inTitleBlock: titleStr.append(line) i += 1 # this is a potential danger nLines += 1 if not hasattr(self, TITLE.__name__): self.TITLE = TITLE(content="Empty TITLE") block_map.update({"commentLines": nCommentLines}) return block_map # Visualizaton @property def view(self, re_create: bool = False) -> nj.NGLWidget: if not hasattr(self, "_view") or not isinstance(self._view, nj.NGLWidget) or re_create: self._view = nj.show_mdtraj(self) self.view.clear_representations() self.view.add_representation("licorice", selection="all") return self._view
[docs] def recreate_view(self) -> nj.NGLWidget: self._view = visualize_system(traj=self) return self._view
# io _formatting = np.vectorize(np.format_float_positional)
[docs] def write_trc(self, out_path: str) -> str: first_entry = self.generate_TITLE_entry() array_list = [first_entry] for i in range(len(self.time)): array_list.append(self.generate_entry_for_frame(i)) array = np.concatenate(array_list, axis=0) array = np.array(array, dtype=str) array[array == "None"] = "" np.savetxt(out_path, array, fmt="%s", delimiter="\t")
[docs] def generate_entry_for_frame(self, frame_id: int): length = 3 # TIMESTEP length += 2 # Positionred length += self.xyz[0].shape[0] # Add comments length += self.xyz[0].shape[0] // 10 if not (self.unitcell_lengths is None and len(self.unitcell_lengths) > 0): length += 7 array = np.empty((length, 4), dtype=object) # Add TIMESTEP array[0, 0] = "TIMESTEP" array[1, 1] = self.step[frame_id] array[1, 2] = self._formatting(self.time[frame_id], precision=9, unique=False, pad_left=2) array[2, 0] = "END" # Add POSITIONRED array[3, 0] = "POSITIONRED" start = 4 for block in range(self.xyz[frame_id].shape[0] // 10): array[start + block * 10 : start + (block + 1) * 10, 1:] = self._formatting( self.xyz[frame_id][block * 10 : (block + 1) * 10, :], precision=9, unique=False, pad_left=2 ) array[start + (block + 1) * 10, 0] = "#" array[start + (block + 1) * 10, 1] = (block + 1) * 10 start += 1 last = self.xyz[frame_id].shape[0] % 10 if not (self.unitcell_lengths is None and len(self.unitcell_lengths) > 0): array_last = last + 7 else: array_last = last array[-(array_last + 1) : -(array_last - last) - 1, 1:] = self._formatting( self.xyz[frame_id][-last:, :], precision=9, unique=False, pad_left=2 ) array[-(array_last - last) - 1, 0] = "END" if not (self.unitcell_lengths is None and len(self.unitcell_lengths) > 0): array[-(array_last - last), 0] = "GENBOX" array[-(array_last - last) + 1, 1] = 1 array[-(array_last - last) + 2, 1:] = self._formatting( self.unitcell_lengths[frame_id], precision=9, unique=False, pad_left=2 ) array[-(array_last - last) + 3, 1:] = self._formatting( self.unitcell_angles[frame_id], precision=9, unique=False, pad_left=2 ) array[-(array_last - last) + 4, 1:] = self._formatting(0, precision=9, unique=False, pad_left=2) array[-(array_last - last) + 5, 1:] = self._formatting(0, precision=9, unique=False, pad_left=2) array[-(array_last - last) + 6, 0] = "END" return array
[docs] def generate_TITLE_entry(self): length = 3 # TITLE will only be one line # Add comments array = np.empty((length, 4), dtype=object) # Add Title array[0, 0] = "TITLE" titlestring = "" for t in self.TITLE.content: titlestring += t array[1, 0] = titlestring array[2, 0] = "END" return array
[docs] def to_cnf(self, frame_id: int = None, base_cnf: Cnf = None): from pygromos.files.blocks import coords if frame_id is None: frame_id = 0 content_str = ( "THIS IS THE FRAME AT TIMESTEP: " + str(self.time[frame_id]) + " OF THE TRAJECTORY WITH TITLE: \n" + "\n".join(self.TITLE.content) ) if base_cnf is None: new_Cnf = self.get_dummy_cnf(self.xyz) else: if type(base_cnf) is Cnf: new_Cnf = base_cnf else: new_Cnf = Cnf(base_cnf) new_Cnf.TITLE = TITLE(content_str) new_Cnf.POSITION = POSITION( [ coords.atomP( resID=new_Cnf.POSITION.content[i].resID, resName=new_Cnf.POSITION.content[i].resName, atomType=new_Cnf.POSITION.content[i].atomType, atomID=i, xp=coord[0], yp=coord[1], zp=coord[2], ) for i, coord in enumerate(self.xyz[frame_id]) ] ) if hasattr(new_Cnf, "GENBOX"): new_Cnf.GENBOX.length = list(self.unitcell_lengths[frame_id]) new_Cnf.GENBOX.angles = list(self.unitcell_angles[frame_id]) new_Cnf.GENBOX.euler = [0.0, 0.0, 0.0] new_Cnf.GENBOX.origin = [0.0, 0.0, 0.0] else: from pygromos.files.blocks.coord_blocks import GENBOX box_block = GENBOX( pbc=1, length=list(self.unitcell_lengths[frame_id]), angles=list(self.unitcell_angles[frame_id]) ) new_Cnf.add_block(block=box_block) if hasattr(new_Cnf, "TIMESTEP"): new_Cnf.TIMESTEP.t = self.time[frame_id] new_Cnf.TIMESTEP.step = self.step[frame_id] return new_Cnf
[docs] def save(self, out_path: str) -> str: # write out if out_path.endswith(".trc") or out_path.endswith(".trc.gz"): out_path = self.write_trc(out_path) return out_path else: super().save(out_path) return out_path
[docs] def write(self, out_path: str) -> str: return self.save(out_path)
[docs] @classmethod def load(cls, in_path: str, in_cnf_path: str = None, timestep_duration: float = 0.002) -> any: if in_path.endswith(".trc") or in_path.endswith(".trc.gz"): o = cls(traj_path=in_path, in_cnf_path=in_cnf_path) else: so = super().load(in_path) o = cls( xyz=so.xyz, topology=so.topology, time=so.time, unitcell_lengths=so.unitcell_lengths, unitcell_angles=so.unitcell_angles, timestep_duration=timestep_duration, ) return o