"""
Module: System
This module shall be used to implement subclasses of system. It wraps all information needed and generated by a simulation.
"""
import os
import warnings
import numpy as np
import pandas as pd
import scipy.constants as const
from tqdm import tqdm
pd.options.mode.use_inf_as_na = True
# Typing
from ensembler.util.basic_class import _baseClass
from ensembler.util.ensemblerTypes import samplerCls, conditionCls, potentialCls, Number, Union, Iterable, NoReturn, \
List
from ensembler.util import dataStructure as data
from ensembler.samplers.newtonian import newtonianSampler
from ensembler.samplers.stochastic import langevinIntegrator, metropolisMonteCarloIntegrator
from ensembler.potentials.OneD import metadynamicsPotential as metadynamicsPotential1D, harmonicOscillatorPotential
from ensembler.potentials.TwoD import metadynamicsPotential as metadynamicsPotential2D
[docs]class system(_baseClass):
"""
The system class is managing the simulation approaches and all system data as well as the simulation results.
"""
# static attributes
name = "system"
state = data.basicState
verbose: bool
# general attributes
nParticles: int
nDimensions: int
nStates: int
"""
Attributes
"""
@property
def potential(self) -> potentialCls:
"""
The potential energy function class can be explored in a simulation
Returns
-------
_potentialCls
systems potential energy class
"""
return self._potential
@potential.setter
def potential(self, potential: potentialCls):
# if(issubclass(potential.__class__, _potentialCls)):
self._potential = potential
# else:
# raise ValueError("Potential needs to be a subclass of potential")
@property
def sampler(self) -> samplerCls:
"""
The sampler method is used by the system to explore the potential energy function.
Returns
-------
_samplerCls
the sampler method that can be used to explore the potential energy function.
"""
return self._integrator
@sampler.setter
def sampler(self, integrator: samplerCls):
self._integrator = integrator
@property
def conditions(self) -> List[conditionCls]:
"""
conditions list contains the system conditions.
These conditions are applied during the sampling of the potential energy function to add additional constraints.
Returns
-------
List[_conditionCls]
the list of conditions coupled to the system.
"""
return self._conditions
@conditions.setter
def conditions(self, conditions: List[conditionCls]):
if (isinstance(conditions, List)):
self._conditions = conditions
else:
raise ValueError("Conditions needs to be a List of objs, that are a subclass of _conditionCls")
@property
def total_system_energy(self) -> Number:
"""
the total energy of the current system
Returns
-------
Number
total energy of the current system
"""
return self._currentTotE
@property
def total_potential_energy(self) -> Number:
"""
the total potential energy of the current system
Returns
-------
Number
total potential energy of the current system
"""
return self._currentTotPot
@property
def total_kinetic_energy(self) -> Number:
"""
the total kinetic energy of the current system
Returns
-------
Number
total kinetic energy of the current system
"""
return self._currentTotKin
@property
def current_state(self) -> state:
return self._currentState
[docs] def set_current_state(self, current_position: Union[Number, Iterable[Number]],
current_velocities: Union[Number, Iterable[Number]] = 0,
current_force: Union[Number, Iterable[Number]] = 0,
current_temperature: Number = 298):
"""
set_current_state
set the current State of the system.
Parameters
----------
current_position: Union[Number, Iterable[Number]]
new current system position
current_velocities: Union[Number, Iterable[Number]], optional
new current system velocity. (default: 0)
current_force: Union[Number, Iterable[Number]], optional
new current system force. (default: 0)
current_temperature: Union[Number, Iterable[Number]], optional
new current system temperature. (default: 298)
"""
self._currentPosition = current_position
self._currentForce = current_force
self._currentVelocities = current_velocities
self._currentTemperature = current_temperature
self.currentState = self.state(self._currentPosition, self._currentTemperature, np.nan, np.nan, np.nan, np.nan,
np.nan)
self._update_energies()
self.update_current_state()
@property
def trajectory(self) -> pd.DataFrame:
return pd.DataFrame(list(map(lambda x: x._asdict(), self._trajectory)), columns=list(self.state.__dict__["_fields"]))
@property
def position(self) -> Union[Number, Iterable[Number]]:
return self._currentPosition
@position.setter
def position(self, position: Union[Number, Iterable[Number]]):
self._currentPosition = position
if (len(self.trajectory) == 0):
self.initial_position = self._currentPosition
self._update_energies()
self.update_current_state()
[docs] def set_position(self, position: Union[Number, Iterable[Number]]):
self.position = position
@property
def velocity(self) -> Union[Number, Iterable[Number]]:
"""
velocity
The current velocity of the system
Returns
-------
Union[Number, Iterable[Number]]
"""
return self._currentVelocities
@velocity.setter
def velocity(self, velocity: Union[Number, Iterable[Number]]):
self._currentVelocities = velocity
self._update_energies()
self.update_current_state()
[docs] def set_velocities(self, velocities):
self.velocities = velocities
@property
def temperature(self) -> Number:
"""
The set temperature of the system
Returns
-------
Number
set temperature
"""
return self._temperature
@temperature.setter
def temperature(self, temperature: Number):
self._temperature = temperature
self._currentTemperature = temperature
self._update_energies()
[docs] def set_temperature(self, temperature: Number):
"""
set Temperature
set the systems current temperature.
Parameters
----------
temperature
"""
self.temperature = temperature
@property
def mass(self):
return self._mass
@mass.setter
def mass(self, mass: float):
self._mass = mass
[docs] def __init__(self, potential: potentialCls=harmonicOscillatorPotential(), sampler: samplerCls=metropolisMonteCarloIntegrator(), conditions: Iterable[conditionCls] = None,
temperature: Number = 298.0, start_position: (Iterable[Number] or Number) = None, mass: Number = 1,
verbose: bool = True) -> NoReturn:
"""
The system class is wrapping all components needed for a simulation.
It can be used as the control unit for executing a simulation (simulate) and also to manage the generated data or input data.
Parameters
----------
potential : _potentialCls
gives the potential function to be explored/sampled
sampler : _samplerCls
gives the method of choice to sample/explore the potential function
conditions : Iterable[_conditionCls], optional
apply the given conditions to the systems in a preset (tau) step iteration
temperature : float, optional
temperature of the system
start_position : float, optional
starting position of the system during the simulation
mass : float, optional
mass of the single particle
verbose : bool, optional
I can tell you a long iterative story...
"""
################################
# Declare Attributes
#################################
##Physical parameters
self.nParticles = 1 # FUTURE: adapt it to be multiple particles
self._mass = mass # for one particle systems!!!!
self._temperature = temperature
# Output
self._currentState = self.state(**{key: np.nan for key in self.state.__dict__["_fields"]})
self._trajectory =[]
# tmpvars - private:
self._currentTotE: (Number) = np.nan
self._currentTotPot: (Number) = np.nan
self._currentTotKin: (Number) = np.nan
self._currentPosition: (Number or Iterable[Number]) = np.nan
self._currentVelocities: (Number or Iterable[Number]) = np.nan
self._currentForce: (Number or Iterable[Number]) = np.nan
self._currentTemperature: (Number or Iterable[Number]) = np.nan
# BUILD System
## Fundamental Parts:
self._potential = potential
self._integrator = sampler
if(conditions is None):
self._conditions = []
else:
self._conditions = conditions
## set dim
if (potential.constants[potential.nDimensions] > 0):
self.nDimensions = potential.constants[potential.nDimensions]
else:
raise IOError(
"Could not estimate the disered Dimensionality as potential dim was <1 and no initial position was given.")
###is the potential a state dependent one? - needed for initial pos.
if (hasattr(potential, "nStates")):
self.nStates = potential.constants[potential.nStates]
else:
self.nstates = 1
# PREPARE THE SYSTEM
# Only init velocities, if the samplers uses them
if (issubclass(sampler.__class__, (newtonianSampler, langevinIntegrator))):
init_velocity = True
else:
init_velocity = False
self.initialise(withdraw_Traj=True, init_position=True, init_velocity=init_velocity,
set_initial_position=start_position)
##check if system should be coupled to conditions:
# update for metadynamics simulation - local elevation bias is like a condition/potential hybrid.
if (isinstance(self.potential, metadynamicsPotential1D) or isinstance(self.potential, metadynamicsPotential2D)):
self._conditions.append(self.potential)
for condition in self._conditions:
if (not hasattr(condition, "system")):
condition.couple_system(self)
else:
# warnings.warn("Decoupling system and coupling it again!")
condition.couple_system(self)
if (not hasattr(condition, "dt") and hasattr(self.sampler, "dt")):
condition.dt = self.sampler.dt
else:
condition.dt = 1
self.verbose = verbose
"""
Initialisation
"""
[docs] def initialise(self, withdraw_Traj: bool = True, init_position: bool = True, init_velocity: bool = True,
set_initial_position: Union[Number, Iterable[Number]] = None) -> NoReturn:
"""
initialise
initialises the system, i.e. can set an initial position, initial velocities and initialize the forces.
Parameters
----------
withdraw_Traj: bool, optional
reset the simulation trajectory?
init_position: bool, optional
reinitialize the start_position - currentPosition
init_velocity: bool, optional
reinitialize the start_velocity - currentVelocity
set_initial_position: Union[Number, Iterable[Number]], optional
set the start_position to the given one.
Returns
-------
NoReturn
"""
if (withdraw_Traj):
self.clear_trajectory()
if (init_position):
self._init_position(initial_position=set_initial_position)
# Try to init the force
try:
self._currentForce = self.potential.force(self.initial_position) # initialise forces!
except:
warnings.warn("Could not initialize the force of the potential? Check if you need it!")
if (init_velocity):
self._init_velocities()
# set initial Temperature
self._currentTemperature = self.temperature
# update current state
self.step = 0
self.update_system_properties()
self.update_current_state()
self._trajectory.append(self.current_state)
[docs] def _init_position(self, initial_position: Union[Number, Iterable[Number]] = None) -> NoReturn:
"""
_init_position
this function initializes the current position of the system.
Parameters
----------
initial_position: Union[Number, Iterable[Number]], optional
if None, a random position is selected else the given position is used.
"""
if (isinstance(initial_position, type(None))):
self.initial_position = self.random_position()
elif ((isinstance(initial_position, Number) and self.nDimensions == 1) or
(isinstance(initial_position, Iterable) and all(
[isinstance(x, Number) for x in initial_position]) and self.nDimensions == len(initial_position))):
self.initial_position = initial_position
else:
raise Exception("Did not understand the initial position! \n given: " + str(
initial_position) + "\n Expected dimensions: " + str(self.nDimensions))
self._currentPosition = self.initial_position
self.update_current_state()
return self.initial_position
[docs] def _init_velocities(self) -> NoReturn:
"""
_init_velocities
Initializes the initial velocity randomly.
"""
if (self.nStates > 1):
self._currentVelocities = [[self._gen_rand_vel() for dim in range(self.nDimensions)] for s in
range(self.nStates)] if (self.nDimensions > 1) else [self._gen_rand_vel() for
state in
range(self.nStates)]
else:
self._currentVelocities = [self._gen_rand_vel() for dim in range(self.nDimensions)] if (
self.nDimensions > 1) else self._gen_rand_vel()
self.veltemp = self.mass / const.gas_constant / 1000.0 * np.linalg.norm(self._currentVelocities) ** 2 # t
self.update_current_state()
return self._currentVelocities
[docs] def _gen_rand_vel(self) -> Number:
"""
_gen_rand_vel
get a random velocity according to the temperature and mass.
Returns
-------
Number, Iterable[Number]
a randomly selected velocity
"""
return np.sqrt(const.gas_constant / 1000.0 * self.temperature / self.mass) * np.random.normal()
[docs] def random_position(self) -> Union[Number, Iterable[Number]]:
"""
randomPos
returns a randomly selected position for the system.
Returns
-------
Union[Number, Iterable[Number]]
a random position
"""
random_pos = np.squeeze(np.array(np.subtract(np.multiply(np.random.rand(self.nDimensions), 20), 10)))
if (len(random_pos.shape) == 0):
return np.float(random_pos)
else:
return random_pos
"""
Update
"""
[docs] def calculate_total_kinetic_energy(self) -> Union[Iterable[Number], Number]:
"""
totKin
returns the total kinetic energy of the system.
Returns
-------
Union[Iterable[Number], Number, np.nan]
total kinetic energy.
"""
if (isinstance(self._currentVelocities, Number) or (isinstance(self._currentVelocities, Iterable) and all(
[isinstance(x, Number) and not np.isnan(x) for x in self._currentVelocities]))):
return np.sum(0.5 * self.mass * np.square(np.linalg.norm(self._currentVelocities)))
else:
return np.nan
[docs] def calculate_total_potential_energy(self) -> Union[Iterable[Number], Number]:
"""
totPot
return the total potential energy
Returns
-------
Union[Iterable[Number], Number]
summed up total potential energies
"""
return self.potential.ene(self._currentPosition)
[docs] def update_system_properties(self) -> NoReturn:
"""
updateSystemProperties
updates the energies and temperature of the system
Returns
-------
NoReturn
"""
self._update_energies()
self._update_temperature()
[docs] def update_current_state(self) -> NoReturn:
"""
updateCurrentState
update current state from the _current vars.
Returns
-------
NoReturn
"""
self._currentState = self.state(self._currentPosition, self._currentTemperature,
self._currentTotE, self._currentTotPot, self._currentTotKin,
self._currentForce, self._currentVelocities)
[docs] def _update_temperature(self) -> NoReturn:
"""
this looks like a thermostat like thing! not implemented!@ TODO calc temperature from velocity
Returns
-------
NoReturn
"""
self._currentTemperature = self.temperature
[docs] def _update_energies(self) -> NoReturn:
"""
_updateEne
update all total energy terms.
Returns
-------
NoReturn
"""
self._currentTotPot = self.calculate_total_potential_energy()
self._currentTotKin = self.calculate_total_kinetic_energy()
self._currentTotE = self._currentTotPot if (np.isnan(self._currentTotKin)) else np.add(self._currentTotKin,
self._currentTotPot)
[docs] def _update_current_vars_from_current_state(self):
"""
_update_current_vars_from_current_state
update the _current Vars from the currentState
Returns
-------
NoReturn
"""
self._currentPosition = self.current_state.position
self._currentTemperature = self.current_state.temperature
self._currentTotE = self.current_state.total_system_energy
self._currentTotPot = self.current_state.total_potential_energy
self._currentTotKin = self.state.total_kinetic_energy
self._currentForce = self.current_state.dhdpos
self._currentVelocities = self.current_state.velocity
[docs] def _update_state_from_traj(self) -> NoReturn:
"""
_update_state_from_traj
replaces the current state and the currentstate vars by the last trajectory state.
Returns
-------
NoReturn
"""
self.currentState = self.state(**self.trajectory.iloc[-1].to_dict())
self._update_current_vars_from_current_state()
return
"""
Functionality
"""
[docs] def simulate(self, steps: int,
withdraw_traj: bool = False, save_every_state: int = 1,
init_system: bool = False,
verbosity: bool = True, _progress_bar_prefix: str = "Simulation: ") -> state:
"""
this function executes the simulation, by exploring the potential energy function with the sampling method for the given n steps.
Parameters
----------
steps: int
number of integration steps
init_system: bool, optional
initialize the system. (default: False)
withdraw_traj: bool, optional
reset the current simulation trajectory. (default: False)
save_every_state: int, optional
save every n step. (and leave out the rest) (default: 1 - each step)
verbosity: bool, optional
change the verbosity of the simulation. (default: True)
_progress_bar_prefix: str, optional
prefix of tqdm progress bar. (default: "Simulation")
Returns
-------
state
returns the last current state
"""
if (init_system):
self._init_position()
self._init_velocities()
if (withdraw_traj):
self._trajectory = []
self._trajectory.append(self.current_state)
self.update_current_state()
self.update_system_properties()
# progressBar or no ProgressBar
if (verbosity):
iteration_queue = tqdm(range(steps), desc=_progress_bar_prefix + " Simulation: ", mininterval=1.0,
leave=verbosity)
else:
iteration_queue = range(steps)
# Simulation loop
for self.step in iteration_queue:
# Do one simulation Step.
self.propagate()
# Apply Restraints, Constraints ...
self.apply_conditions()
# Calc new Energy&and other system properties
self.update_system_properties()
# Set new State
self.update_current_state()
if (self.step % save_every_state == 0 and self.step != steps - 1):
self._trajectory.append(self.current_state)
self._trajectory.append(self.current_state)
return self.current_state
[docs] def propagate(self) -> (
Union[Iterable[Number], Number], Union[Iterable[Number], Number], Union[Iterable[Number], Number]):
"""
propagate
Do a single exploration step.
Not stored in the trajectory and no energies returned or updated.
Returns
-------
(Union[Iterable[Number], Number], Union[Iterable[Number], Number], Union[Iterable[Number], Number])
returns the new current position, the new current velocities and the new current forces
"""
self._currentPosition, self._currentVelocities, self._currentForce = self.sampler.step(self)
return self._currentPosition, self._currentVelocities, self._currentForce
[docs] def apply_conditions(self) -> NoReturn:
"""
applyConditions
this function applies the coupled conditions to the current state of system.
Returns
-------
NoReturn
"""
for condition in self._conditions:
condition.apply_coupled()
[docs] def append_state(self, new_position: Union[Iterable[Number], Number], new_velocity: Union[Iterable[Number], Number],
new_forces: Union[Iterable[Number], Number]) -> NoReturn:
"""
append_state
appends a new state, based on the given arguments and updates the system to them.
Parameters
----------
new_position: Union[Iterable[Number], Number]
a new position
new_velocity: Union[Iterable[Number], Number]
a new velocity
new_forces: Union[Iterable[Number], Number]
a new Force
"""
self._currentPosition = new_position
self._currentVelocities = new_velocity
self._currentForce = new_forces
self._update_temperature()
self._update_energies()
self.update_current_state()
self._trajectory.append(self.current_state)
[docs] def revert_step(self) -> NoReturn:
"""
revertStep
removes the last step which was performed from the trajectory and sets back the system to the one before.
Returns
-------
NoReturn
"""
if(len(self._trajectory)>1):
self._trajectory.pop()
self._currentState = self._trajectory[-1]
self._update_current_vars_from_current_state()
else:
warnings.warn("Could not revert step, as only 1 step is in the trajectory!")
[docs] def clear_trajectory(self):
"""
deletes all entries of trajectory and adds current state as first timestep to the trajectory
:return: None
"""
self._trajectory = []
[docs] def write_trajectory(self, out_path: str) -> str:
"""
writeTrajectory
Writes the trajectory out to a file.
Parameters
----------
out_path: str
the string, where the traj csv should be stored.
Returns
-------
str
returns the out_path
See Also
---------
save
"""
if (not os.path.exists(os.path.dirname(os.path.abspath(out_path)))):
raise Exception("Could not find output folder: " + os.path.dirname(out_path))
traj = self.trajectory
traj.to_csv(out_path, header=True)
return out_path