"""
This module is doing all the post simulation file juggeling needed for gromos. CURRENTLY OLD DON"T USE
"""
import os
import glob
import tempfile
import warnings
import pandas as pd
import multiprocessing as mult
from subprocess import SubprocessError
from pygromos.files.simulation_parameters import imd
from pygromos.files.otherfiles import repdat
from pygromos.files.trajectory import tre
from pygromos.gromos import gromosPP
from pygromos.utils import bash
from pygromos.utils.typing import List, Dict, Union, Tuple, Cnf_Type
"""
PARALLEL WORKER - These functions are required for parallelized code Execution
"""
[docs]def _thread_worker_cat_trc(
job: int,
replicaID_range: List[int],
trc_files: Dict[int, List[str]],
out_prefix: str,
topology_path: str,
out_trcs: dict,
dt: float,
time: float = 0,
verbose: bool = False,
boundary_conditions: str = "r cog",
include_all: bool = False,
):
"""_thread_worker_cat_trc
This thread worker_scripts concatenates all .trc files of one replica into one file.
Parameters
----------
job : rank of this thread
replicaID_range : x_range - list of all
trc_files : Dict[int, List[str]]
Dictionary containing all replicas, with list of all trc files concerning one trc.
out_prefix : str
output prefix
verbose : bool
verbosity?
Returns
-------
None
"""
gromPP = gromosPP.GromosPP()
start_dir = os.getcwd()
if verbose:
print("JOB " + str(job) + ": range " + str(list(replicaID_range)))
for replicaID in replicaID_range:
out_path = out_prefix + str(replicaID) + ".trc"
compress_out_path = out_path + ".gz"
out_trcs.update({replicaID: compress_out_path})
if os.path.exists(compress_out_path): # found perfect compressed trc file:)
warnings.warn("Skipped generating file as I found: " + compress_out_path)
if os.path.exists(out_path):
bash.remove_file(out_path)
continue
elif os.path.exists(out_path): # did not find compressed file. will compress
warnings.warn("Skipped generating file as I found: " + out_path)
continue
else: # concat files
if verbose:
print("JOB " + str(job) + ": " + "write out " + out_path + "\n")
out_dir = os.path.dirname(out_path)
tmp_dir = bash.make_folder(out_dir + "/TMP_replica_" + str(replicaID), additional_option="-p")
os.chdir(tmp_dir)
if include_all:
out_path = gromPP.frameout(
in_top_path=topology_path,
in_coord_path=" ".join(trc_files[replicaID]),
periodic_boundary_condition=boundary_conditions,
single_file=True,
out_file_format="trc",
out_file_path=out_path,
time=time,
dt=dt,
include="ALL",
)
else:
out_path = gromPP.frameout(
in_top_path=topology_path,
in_coord_path=" ".join(trc_files[replicaID]),
periodic_boundary_condition=boundary_conditions,
single_file=True,
out_file_format="trc",
out_file_path=out_path,
time=time,
dt=dt,
)
os.chdir(start_dir)
bash.wait_for_fileSystem(out_path)
bash.remove_folder(tmp_dir)
if verbose:
print("JOB " + str(job) + ": " + "write out " + out_path + "\t DONE\n")
if verbose:
print("JOB " + str(job) + ": " + "compress " + compress_out_path + "\n")
compressed_trc = bash.compress_gzip(out_path, out_path=compress_out_path)
if verbose:
print("JOB " + str(job) + ": " + "compress " + compressed_trc + "\t DONE\n")
def _thread_worker_cat_tre(
job: int,
replicaID_range: List[int],
tre_files: Dict[int, List[str]],
out_prefix: str,
out_tres: dict,
verbose: bool = False,
):
if verbose:
print("JOB " + str(job) + ": range " + str(list(replicaID_range)))
for replicaID in replicaID_range:
use_tre_file_paths, unarchived_files = find_and_unarchive_tar_files(tre_files[replicaID], verbose=verbose)
if verbose:
print("FILES: ", use_tre_file_paths)
if verbose:
print("Archs:", unarchived_files)
out_path = out_prefix + str(replicaID) + ".tre"
compressed_tre = out_path + ".gz"
if os.path.exists(compressed_tre):
warnings.warn("Skipped generating .tre.gz file as I found: " + out_path)
else:
if os.path.exists(out_path):
warnings.warn("Skipped generating .tre file as I found: " + out_path + "\n\t Continue Compressing.")
else:
tre_file = tre.Tre(use_tre_file_paths[0])
if verbose:
print("JOB " + str(job) + ": parsing " + os.path.basename(use_tre_file_paths[0]))
if len(use_tre_file_paths) > 1:
for tre_file_path in use_tre_file_paths[1:]:
if verbose:
print("JOB " + str(job) + ": append " + os.path.basename(tre_file_path))
tre_file += tre.Tre(tre_file_path)
if verbose:
print("JOB " + str(job) + ": write out " + os.path.basename(out_path))
tre_file.write(out_path)
bash.wait_for_fileSystem(out_path)
if verbose:
print("JOB " + str(job) + ": done " + os.path.basename(out_path))
del tre_file
if verbose:
print("JOB " + str(job) + ": compress " + os.path.basename(out_path))
compressed_tre = bash.compress_gzip(out_path, out_path=compressed_tre)
if verbose:
print("JOB " + str(job) + ": " + "compress " + compressed_tre + "\t DONE\n")
# file cleaning:
for file_path in use_tre_file_paths:
bash.compress_gzip(file_path)
out_tres.update({replicaID: compressed_tre})
[docs]def thread_worker_concat_repdat(
job: int, repdat_file_out_path: str, repdat_file_paths: Union[str, List[str]], verbose: bool = False
) -> str:
if os.path.exists(repdat_file_out_path):
warnings.warn("Skipped repdat creation as already existed!: " + repdat_file_out_path)
else:
if verbose:
print("JOB " + str(job) + ": Found repdats: " + str(repdat_file_paths)) # verbose_repdat
if isinstance(repdat_file_paths, str):
repdat_file_paths = [repdat_file_paths]
if verbose:
print("JOB " + str(job) + ": Concatenate repdats: \tSTART") # verbose_repdat
repdat_file = repdat.Repdat(repdat_file_paths.pop(0)) # repdat Class
for repdat_path in repdat_file_paths:
if verbose:
print("JOB " + str(job) + ": concat:\t", repdat_path)
tmp_repdat_file = repdat.Repdat(repdat_path)
repdat_file.append(tmp_repdat_file)
del tmp_repdat_file
if verbose:
print("JOB " + str(job) + ": Concatenate repdats: \tDONE") # verbose_repdat
if verbose:
print("JOB " + str(job) + ": Write out repdat: " + str(repdat_file_out_path)) # verbose_repdat
repdat_file.write(repdat_file_out_path)
del repdat_file
def _thread_worker_cnfs(
job: int,
out_cnfs: List[str],
in_cnfs: List[Cnf_Type],
replica_range: List[int],
out_folder: str,
verbose: bool = False,
):
if verbose:
print("JOB: " + str(job) + " copy to " + out_folder)
for replicaID in replica_range:
out_cnfs.update(
{
replicaID: bash.copy_file(
in_cnfs[replicaID][-1], out_folder + "/" + os.path.basename(in_cnfs[replicaID][-1])
)
}
)
def _thread_worker_conv_trc(
job: int,
replica_range: List[int],
trc_files: List[str],
in_topology_path: str,
gromos_path: str,
out_traj: dict,
fit_traj_to_mol: int = 1,
boundary_conditions: str = "r",
verbose: bool = False,
):
if verbose:
print("JOB: " + str(job) + " RANGE\t" + str(replica_range))
gromPP = gromosPP.GromosPP(gromos_path)
first = True
import mdtraj
start_dir = os.getcwd()
for replicaID in replica_range:
trc_path = trc_files[replicaID]
if first:
temp = tempfile.mkdtemp(suffix="_job" + str(job), prefix="convert_", dir=os.path.dirname(trc_path))
os.chdir(temp)
first = False
unarchived = False
use_trc = trc_path
if verbose:
print("using trc:", use_trc)
out_top_path = use_trc.replace(".trc", "_last.pdb").replace(".gz", "")
out_traj_path = use_trc.replace(".trc", ".dcd").replace(".gz", "")
out_traj.update({replicaID: {"top": out_top_path, "traj": out_traj_path}})
if os.path.exists(out_top_path) and os.path.exists(out_traj_path):
warnings.warn("Found already the traj and its top!:" + out_traj_path)
continue
if verbose:
print(
"JOB "
+ str(job)
+ ": Converting trc_path to -> "
+ use_trc.replace("trc", "pdb").replace(".tar.gz", "")
)
pdb = gromPP.frameout(
in_top_path=in_topology_path,
in_coord_path=use_trc,
periodic_boundary_condition=boundary_conditions,
gather="cog",
out_file_format="pdb",
atomsfit=str(fit_traj_to_mol) + ":a",
single_file=True,
out_file_path=use_trc.replace("trc", "pdb").replace("tar", "").replace(".gz", ""),
)
if verbose:
print("JOB " + str(job) + ": loading pdb : " + pdb)
traj = mdtraj.load_pdb(pdb)
if verbose:
print("JOB " + str(job) + ": write out data to " + use_trc.replace(".trc", ".dcd/.pdb"))
traj.save(out_traj_path)
traj[0].save(out_top_path)
del traj
if verbose:
print("Done writing out")
bash.remove_file(pdb)
if unarchived:
if verbose:
print("Clean unarchiving")
bash.remove_file(use_trc)
bash.remove_folder(temp)
os.chdir(start_dir)
[docs]def thread_worker_isolate_energies(
in_en_file_paths: str,
out_folder: str,
properties: List[str],
replicas: List[int],
in_ene_ana_lib: str,
gromosPP_path: str,
out_prefix: str = "",
tre_prefix: str = "",
time=None,
dt=None,
job: int = -1,
verbose=True,
) -> List[str]:
"""isolate_properties_from_tre
This func uses Ene Ana from gromos to isolate potentials from out_tre Files
in in_folder generated by reeds.
Parameters
----------
in_en_file_paths : str
path, in which the input tre_folders are situated.
out_folder : str
output folder, where to write the energy .csvs
properties : List[str]
potentials to isolate from the .out_tre Files
replicas : int
number of replicas, that should be found
in_ene_ana_lib : str
path to the ene_ana lib, encoding the out_tre Files
gromosPP_path : str
path to the ene_ana lib, encoding the out_tre Files
out_prefix : str, optional
tre_prefix : str, optional
verbose : bool, optional
Returns
-------
List[str]
return list of result Files.
"""
gromos = gromosPP.GromosPP(gromosPP_path)
result_files = []
temp = tempfile.mkdtemp(suffix="_job" + str(job), prefix="ene_ana_", dir=out_folder)
start_dir = os.getcwd()
os.chdir(temp)
# get the potentials from each replica.tre*
if verbose:
print("JOB" + str(job) + ": working with job: " + str(list(replicas)))
for replicaID in replicas:
in_en_file_path = in_en_file_paths[replicaID]
out_suffix = "energies_s" + str(replicaID)
out_path = out_folder + "/" + out_prefix + "_" + out_suffix + ".dat"
if verbose:
print("CHECKING: " + out_path)
if not os.path.exists(out_path):
if verbose:
print("JOB" + str(job) + ":\t" + str(replicaID))
if verbose:
print("JOB" + str(job) + ":", in_en_file_path)
tmp_out = gromos.ene_ana(
in_ene_ana_library_path=in_ene_ana_lib,
in_en_file_paths=in_en_file_path,
out_energy_folder_path=out_folder,
out_files_suffix=out_suffix,
out_files_prefix=out_prefix,
time=str(time) + " " + str(dt),
in_properties=properties,
verbose=verbose,
single_file=True,
workdir=True,
)
result_files.append(tmp_out)
bash.remove_file(temp + "/*") # remove logs if succesfull
os.chdir(start_dir)
bash.remove_folder(temp)
if verbose:
print("JOB" + str(job) + ": DONE")
return result_files
def _thread_worker_delete(job: int, file_paths: List[str], verbose: bool = False) -> int:
for file_path in file_paths:
if verbose:
print("JOB" + str(job) + " - rm: " + file_path + "")
bash.remove_file(file_path)
return 0
def _thread_worker_compress(job: int, in_file_paths: List[str], verbose: bool = False) -> int:
for file_path in in_file_paths:
if verbose:
print("JOB" + str(job) + " - gz: " + file_path)
bash.compress_gzip(in_path=file_path, verbose=verbose)
return 0
"""
FILE FINDER
"""
[docs]def find_and_unarchive_tar_files(trc_files: List[str], verbose: bool = False):
# archive handling
archived_files = list(filter(lambda x: (".tar" in x or ".gz" in x or ".tar.gz" in x), trc_files))
not_archived_files = list(filter(lambda x: not ("tar" in x or ".gz" in x or ".tar.gz" in x), trc_files))
unarchived_files = []
if verbose:
print("archives: ", archived_files)
if verbose:
print("narchives: ", not_archived_files)
# untar files:
for tared_file in archived_files:
if len(not_archived_files) == 0 or not any([noAfile in tared_file for noAfile in not_archived_files]):
try:
# print("Ungzip ->\t", tared_file)
out_path = bash.compress_gzip(
in_path=tared_file, out_path=tared_file.replace(".tar", "").replace(".gz", ""), extract=True
)
except SubprocessError:
# print("Failed gzip, trying tar")
out_path = bash.extract_tar(
in_path=tared_file,
out_path=tared_file.replace(".tar", "").replace(".gz", ""),
gunzip_compression=True,
)
# fix for stupid taring! #todo: remove part
if any(["cluster" == xfile for xfile in os.listdir(os.path.dirname(tared_file))]) and not os.path.exists(
out_path
):
nfound = True
for cpath, tdir, files in os.walk(os.path.dirname(tared_file) + "/cluster"):
if os.path.basename(tared_file).replace(".tar", "").replace(".gz", "") in files:
if verbose:
print(
"FOUND PATH: ",
cpath + "/" + os.path.basename(tared_file).replace(".tar", "").replace(".gz", ""),
)
wrong_path = cpath + "/" + os.path.basename(tared_file).replace(".tar", "").replace(".gz", "")
out_file = bash.move_file(wrong_path, tared_file.replace(".tar", "").replace(".gz", ""))
unarchived_files.append(out_file)
nfound = False
break
if nfound:
raise IOError("could not find untarred file!")
else:
unarchived_files.append(out_path)
# raise Exception("this tar needs special treatment as it is in cluster/yadda/yadda/fun.trc")
else:
if verbose:
print([noAfile for noAfile in not_archived_files if (noAfile in tared_file)])
new_files = not_archived_files
new_files.extend(unarchived_files)
use_tre_file_paths = sorted(new_files, key=lambda x: int(x.split("_")[-1].split(".")[0]))
return use_tre_file_paths, unarchived_files
[docs]def gather_simulation_replica_file_paths(
in_folder: str,
replicas: int,
filePrefix: str = "",
fileSuffixes: Union[str, List[str]] = [".tre", ".tre.tar.gz"],
verbose: bool = False,
finalNumberingSort=False,
) -> Dict[int, List[str]]:
"""gather_replica_file_paths
Finds all trajectory paths in a simulation folder and sorts them by replica.
Parameters
----------
in_folder : str
folder, containing the files
replicas : int
Number of replicas
filePrefix : str, optional
str prefix the desired files
fileSuffixes : str, optional
str suffix of the desired files
verbose : bool
toggle verbosity
Returns
-------
"""
if isinstance(fileSuffixes, str):
fileSuffixes = [fileSuffixes]
# browse folders
# init statewise dic
files = {}
for replica in range(1, replicas + 1):
files.update({replica: []})
if verbose:
print("SEARCH PATTERN: " + filePrefix + " + * +" + str(fileSuffixes))
for dirname, dirnames, filenames in os.walk(in_folder):
if str(dirname[-1]).isdigit() and os.path.basename(dirname).startswith("eq"):
continue
# check actual in_dir for fle pattern
tmp_files = [
file for file in filenames if (filePrefix in file and any([suffix in file for suffix in fileSuffixes]))
]
if len(tmp_files) != 0:
for x in range(1, replicas + 1):
grom_file_prefix = sorted(
[
dirname + "/" + file
for file in tmp_files
if (any(["_" + str(x) + suffix in file for suffix in fileSuffixes]))
]
)
files[x] += grom_file_prefix
if verbose:
print("walking to in_dir: ", os.path.basename(dirname), "found: ", len(tmp_files))
if not finalNumberingSort:
# final_cleanup
for x in files:
files[x].sort(key=lambda x: int(x.split("_")[-2]))
if verbose:
print("\nfoundFiles:\n")
for x in sorted(files):
print("\n" + str(x))
print("\t" + "\t".join([y + "\n" for y in files[x]]))
if len(files[1]) == 0:
raise ValueError("could not find any file with the prefix: " + filePrefix + " in folder : \n" + in_folder)
return files
[docs]def gather_simulation_file_paths(
in_folder: str,
filePrefix: str = "",
fileSuffixes: Union[str, List[str]] = [".tre", ".tre.tar.gz"],
files_per_folder: int = 1,
verbose: bool = False,
) -> List[str]:
files = []
if isinstance(fileSuffixes, str):
fileSuffixes = [fileSuffixes]
if verbose:
print("SEARCH PATTERN: " + filePrefix + " + * +" + str(fileSuffixes))
for dirname, dirnames, filenames in os.walk(in_folder):
if str(dirname[-1]).isdigit() and os.path.basename(dirname).startswith("eq"):
continue
# check actual in_dir for fle pattern
tmp_files = [
file for file in filenames if (filePrefix in file and any([suffix in file for suffix in fileSuffixes]))
]
if len(tmp_files) == files_per_folder:
files.extend(list(map(lambda x: dirname + "/" + x, tmp_files)))
if verbose:
print("walking to in_dir: ", os.path.basename(dirname), "found: ", len(tmp_files))
try:
keys = [[int(y) for y in x.split("_") if (y.isdecimal())][-1] for x in files]
sorted_files = list(map(lambda y: y[1], sorted(zip(keys, files), key=lambda x: x[0])))
except SubprocessError:
warnings.warn("Files are not all enumerated! no file sorting.")
sorted_files = files
if verbose:
print("\nfoundFiles:\n")
print("\t" + "\n\t".join(sorted_files))
if len(sorted_files) == 0:
raise ValueError("could not find any file with the prefix: " + filePrefix + " in folder : \n" + in_folder)
return sorted_files
"""
ENERGY FILE FUNCTIONS
"""
[docs]def parse_csv_energy_trajectory(in_ene_traj_path: str, verbose: bool = False) -> pd.DataFrame:
"""
parse_one ene_ana csv
Parameters
----------
in_ene_traj_path : str
path to input file
verbose : bool
loud?
Returns
-------
pd.DataFrame
return a pandas data frame containing all energies
"""
if verbose:
print("deal with: ", in_ene_traj_path)
ene_traj = pd.read_csv(in_ene_traj_path, header=find_header(in_ene_traj_path), delim_whitespace=True)
ene_traj.columns = [x.replace("#", "").strip() for x in ene_traj.columns]
setattr(ene_traj, "in_path", in_ene_traj_path)
return ene_traj
[docs]def parse_csv_energy_trajectories(in_folder: str, ene_trajs_prefix: str, verbose: bool = False) -> List[pd.DataFrame]:
"""
searches a directory and loads energy eds csvs as pandas dataframes.
Parameters
----------
in_folder : str
folder with energy_traj - csvs
ene_trajs_prefix : str
prefix name
verbose : bool
loud?
Returns
-------
List[pd.DataFrame]
return a list with pandas data frames containing all energy infos.
"""
if verbose:
print("SEARCH: ", in_folder + "/" + ene_trajs_prefix + "*.dat")
in_ene_traj_paths = sorted(
glob.glob(in_folder + "/" + ene_trajs_prefix + "*.dat"),
key=lambda x: int(x.split("_")[-1].split(".")[0].replace("s", "")),
)
ene_trajs: List[pd.DataFrame] = []
if verbose:
print("FOUND: ", "\n".join(in_ene_traj_paths))
for in_ene_traj_path in in_ene_traj_paths:
ene_traj = parse_csv_energy_trajectory(in_ene_traj_path, verbose=verbose)
if verbose:
print("csv columns: \t", ene_traj.columns)
# note: the previous version created problems for filenames which contained periods
# setattr(ene_traj, "s", ((ene_traj.in_path.split("."))[0]).split("_")[-1])
# setattr(ene_traj, "replicaID", int(((ene_traj.in_path.split("."))[0]).split("_")[-1].replace("s", "")))
setattr(ene_traj, "s", ((ene_traj.in_path.split("."))[-2]).split("_")[-1])
setattr(ene_traj, "replicaID", int(((ene_traj.in_path.split("."))[-2]).split("_")[-1].replace("s", "")))
ene_trajs.append(ene_traj)
if len(ene_trajs) == 0:
raise ValueError("could not find any energy_trajectory in: ", in_folder + "/" + ene_trajs_prefix + "*.dat")
ene_trajs = list(sorted(ene_trajs, key=lambda x: int(x.s.replace("s", ""))))
return ene_trajs
"""
concatenation wrapper
"""
[docs]def project_concatenation(
in_folder: str,
in_topology_path: str,
in_imd: str,
num_replicas: int,
control_dict: Dict[str, bool],
out_folder: str,
in_ene_ana_lib_path: str,
out_file_prefix: str = "test",
fit_traj_to_mol: int = 1,
starting_time: float = 0,
include_water_in_trc=True,
additional_properties: Union[Tuple[str], List[str]] = ("solvtemp2", "totdisres"),
n_processes: int = 1,
gromosPP_bin_dir: str = None,
verbose: bool = False,
nofinal=False,
boundary_conditions: str = "r cog",
) -> dict:
if verbose:
print("reading imd file: " + in_imd)
imd_file = imd.Imd(in_imd)
dt = float(imd_file.STEP.DT)
dt_trc = int(imd_file.WRITETRAJ.NTWX) * dt
dt_tre = int(imd_file.WRITETRAJ.NTWE) * dt
tmp_dir = out_folder + "/tmp_file"
if os.path.isdir(tmp_dir):
bash.make_folder(tmp_dir)
out_cnfs = out_tres = out_trcs = out_dcd = out_repdat = None
p_conv = p_cnf = p_repdat = p_trc = p_tre = p_ene_ana = False
submitted_trc_job = submitted_tre_job = False
if n_processes > 1:
p = mult.Pool(n_processes)
manager = mult.Manager()
if control_dict["cp_cnf"]:
if verbose:
print("\tStart cnfs")
# find all cnf files in this project
sim_dir_cnfs = gather_simulation_replica_file_paths(
in_folder, num_replicas, filePrefix="", fileSuffixes=".cnf", verbose=verbose, finalNumberingSort=nofinal
)
# do parallel
if n_processes > 1:
out_cnfs = manager.dict()
distribute = [
(n, out_cnfs, sim_dir_cnfs, range(n, len(sim_dir_cnfs) + 1, n_processes), out_folder, verbose)
for n in range(1, n_processes + 1)
]
# _async
p_cnf = p.starmap(_thread_worker_cnfs, distribute)
else:
out_cnfs = {}
_thread_worker_cnfs(
job=-1,
out_cnfs=out_cnfs,
in_cnfs=sim_dir_cnfs,
replica_range=list(sim_dir_cnfs.keys()),
out_folder=out_folder,
verbose=verbose,
)
if verbose:
print("Out cnfs: ", out_cnfs)
if control_dict["cat_trc"]:
print("\tStart Trc Cat")
# find all trc files in this project
trc_files = gather_simulation_replica_file_paths(
in_folder,
num_replicas,
filePrefix="",
fileSuffixes=[".trc", ".trc.gz", ".trc.tar.gz"],
verbose=verbose,
finalNumberingSort=nofinal,
)
out_prefix = out_folder + "/" + out_file_prefix + "_"
# concat all files to a single .trc
if n_processes > 1: # parallel
submitted_trc_job = True
if verbose:
print("going parallel: n_processes - " + str(n_processes))
out_trcs = manager.dict()
distributed_jobs = [
(
n,
range(n, len(trc_files) + 1, n_processes),
trc_files,
out_prefix,
in_topology_path,
out_trcs,
dt_trc,
starting_time,
verbose,
include_water_in_trc,
)
for n in range(1, n_processes + 1)
]
# _async
p_trc = p.starmap(_thread_worker_cat_trc, distributed_jobs)
p.close()
p.join()
else:
out_trcs = {}
_thread_worker_cat_trc(
job=-1,
topology_path=in_topology_path,
replicaID_range=list(trc_files.keys()),
trc_files=trc_files,
out_prefix=out_prefix,
dt=dt_trc,
time=starting_time,
out_trcs=out_trcs,
verbose=verbose,
boundary_conditions=boundary_conditions,
include_all=include_water_in_trc,
)
if control_dict["cat_tre"]:
print("\tStart Tre Cat")
# find all trc files in this project
tre_files = gather_simulation_replica_file_paths(
in_folder,
num_replicas,
filePrefix="",
fileSuffixes=[".tre", ".tre.tar.gz"],
verbose=verbose,
finalNumberingSort=nofinal,
)
out_prefix = out_folder + "/" + out_file_prefix + "_"
# concat all files to a single .trc
if n_processes > 1:
if verbose:
print("going parallel: n_processes - " + str(n_processes), " for ", len(tre_files))
submitted_tre_job = True
out_tres = manager.dict()
distributed_jobs = [
(n, range(n, len(tre_files) + 1, n_processes), tre_files, out_prefix, out_tres, verbose)
for n in range(1, n_processes + 1)
]
p = mult.Pool(n_processes)
p_tre = p.starmap(_thread_worker_cat_tre, distributed_jobs)
p.close()
p.join()
else:
out_tres = {}
_thread_worker_cat_tre(
job=-1,
replicaID_range=tre_files.keys(),
tre_files=tre_files,
out_prefix=out_prefix,
out_tres=out_tres,
verbose=verbose,
)
if control_dict["ene_ana"]:
print("\tStart ene ana")
# wait for async job creating the trcs.
if submitted_tre_job:
p_tre.wait()
# gather potentials
properties = list(additional_properties)
# find all trc files in this project
tre_files = gather_simulation_replica_file_paths(
in_folder,
num_replicas,
filePrefix="",
fileSuffixes=[".tre", ".tre.gz"],
verbose=verbose,
finalNumberingSort=nofinal,
) # ".tre.tar.gz"
# isolate potentials
if verbose:
print("Isolate ene_ana:")
if n_processes > 1:
p = mult.Pool(n_processes)
distribute_jobs = [
(
out_folder,
out_folder,
properties,
list(tre_files.keys())[n::n_processes],
in_ene_ana_lib_path,
gromosPP_bin_dir,
out_file_prefix,
"",
dt_tre,
n,
verbose,
)
for n in range(n_processes)
]
p_ene_ana = p.starmap_async(thread_worker_isolate_energies, distribute_jobs)
else:
thread_worker_isolate_energies(
in_en_file_paths=tre_files,
out_folder=out_folder,
properties=properties,
out_prefix=out_file_prefix,
in_ene_ana_lib=in_ene_ana_lib_path,
gromosPP_path=gromosPP_bin_dir,
dt=dt_tre,
replicas=list(tre_files.keys()),
verbose=verbose,
)
if control_dict["convert_trcs"]:
print("\tStart Trc Conversion")
# wait for async job creating the trcs.
if submitted_trc_job:
p_trc.wait()
# get files:
final_trc_files = list(
sorted(glob.glob(out_folder + "/*.trc*"), key=lambda x: int(x.split("_")[-1].split(".")[0]))
)
if n_processes > 1:
out_dcd = manager.dict()
distributed_jobs = [
(
n,
range(n, num_replicas, n_processes),
final_trc_files,
in_topology_path,
gromosPP_bin_dir,
out_dcd,
fit_traj_to_mol,
verbose,
)
for n in range(n_processes)
]
p_conv = p.starmap_async(_thread_worker_conv_trc, distributed_jobs)
else:
out_dcd = {}
_thread_worker_conv_trc(
job=-1,
replica_range=range(num_replicas),
trc_files=final_trc_files,
in_topology_path=in_topology_path,
gromos_path=gromosPP_bin_dir,
out_traj=out_dcd,
fit_traj_to_mol=1,
verbose=verbose,
boundary_conditions=boundary_conditions,
)
if n_processes > 1:
# wait for the jobs to finish
if (
(not p_conv or p_conv.wait())
and (not p_cnf or p_cnf.wait())
and (not p_repdat or p_repdat.wait())
and (not p_trc or p_trc.wait())
and (not p_tre or p_tre.wait())
and (not p_ene_ana or p_ene_ana.wait())
):
raise ChildProcessError("A process failed! ")
p.close()
p.join()
out_dict = {
"out_folder": out_folder,
"cnfs": dict(out_cnfs),
"repdat": out_repdat,
"tres": dict(out_tres),
"trcs": dict(out_trcs),
"dcds": dict(out_dcd),
}
manager.shutdown()
else:
out_dict = {
"out_folder": out_folder,
"cnfs": out_cnfs,
"repdat": out_repdat,
"tres": out_tres,
"trcs": out_trcs,
"dcds": out_dcd,
}
if verbose:
print("all jobs finished")
return out_dict
"""
COMPRESS FUNCTION
"""
[docs]def compress_files(in_paths: List[str], n_processes: int = 1) -> List[str]:
"""compress a list of files
Parameters
----------
in_paths : List[str]
n_processes: int
how many processes can be used in parallel?
Returns
-------
List[str]
outpaths
"""
if type(in_paths) == str:
in_paths = [in_paths]
out_paths = []
# check:
for path in in_paths:
if os.path.exists(path):
archive_path = path + ".gz"
out_paths.append(archive_path)
else:
warnings.warn("File Path: " + path + " was not found!")
# do:
print("Gen Gzips:")
if n_processes == 1:
for path in in_paths:
bash.compress_gzip(in_path=path, out_path=path + ".gz")
else: # do parallel
p = mult.Pool(n_processes)
distribute = [(job, in_paths[job::n_processes], True) for job in range(n_processes)]
p.starmap(_thread_worker_compress, distribute)
p.close()
p.join()
return out_paths