import os
import glob
import pandas as pd
from pygromos.utils.typing import Tuple
from pygromos.files.coord.cnf import Cnf
from pygromos.files.gromos_system import Gromos_System
from pygromos.simulations.hpc_queuing.submission_systems._submission_system import _SubmissionSystem
from pygromos.simulations.hpc_queuing.submission_systems.submission_job import Submission_job
from pygromos.simulations.hpc_queuing.submission_systems.local import LOCAL
from pygromos.utils import bash
from pygromos.utils.utils import spacer3
[docs]def do_skip_job(
tmp_out_cnf: str,
simSystem: Gromos_System,
tmp_jobname: str,
job_submission_system: _SubmissionSystem,
previous_job: int,
verbose: bool = True,
verbose_lvl: int = 1,
) -> Tuple[bool, int]:
"""
This function returns true if the job was already submitted or already finished, as well as the job id.
Parameters
----------
tmp_out_cnf : str
the possible final out cnf
simSystem : Gromos_System
the simulation system
tmp_jobname : str
possible job-name
job_submission_system : _SubmissionSystem
submission system
previous_job : int
job, that was submitted before this one.
verbose : bool, optional
more bla bla, by default True
verbose_lvl : int, optional
nicely define ammount of bla bla, by default 1
Returns
-------
Tuple[bool, int]
the bool tells, if the job was already submitted,
the id is the possibly found job id of the job, if it is in the queue.
Raises
------
IOError
if a tmp_out_cnf is present, we assume the simulation run was successfull. If it can not be parsed in, something else is a problem, but we should not continue our chain!
ValueError
if multiple jobs with same name were found in the queue, we assume, the search regex did not work.
"""
# Check if job with same name is already in the queue!
if (verbose) and verbose_lvl >= 2:
print("Checking if jobs was already submitted or done")
if job_submission_system.block_double_submission: # can we find an job with this name in the queue?
if (verbose) and verbose_lvl >= 2:
print("Checking for jobs with name: " + tmp_jobname)
queued_job_ids = job_submission_system.search_queue_for_jobname(job_name=tmp_jobname)
if isinstance(queued_job_ids, (pd.DataFrame, pd.Series)):
queued_job_ids = list(queued_job_ids.where(queued_job_ids.STAT.isin(["RUN", "PEND"])).dropna().JOBID)
# check if already submitted
if len(queued_job_ids) > 1: # check if job is already submitted:
if verbose:
print(
"\t\t\tSKIP job Submission: "
+ tmp_jobname
+ " was already submitted to the queue! \n\t\t\t\tSKIP\n"
+ "\t\t\tsubmitted IDS: "
+ "\n\t\t\t".join(map(str, queued_job_ids))
+ "\n"
)
try:
simSystem.cnf = tmp_out_cnf
except Exception as err:
raise IOError("Tried to read found cnf file: " + tmp_out_cnf + ". FAILED! \n" + str(err.args))
if len(queued_job_ids) == 1:
previous_job = queued_job_ids[0]
if (verbose) and verbose_lvl >= 2:
print("\nTRY to attach next job to ", previous_job, "\n")
else:
raise ValueError(
"\nthere are multiple jobs, that could be the precessor. " + " ".join(map(str, queued_job_ids))
)
if verbose:
print(True)
if verbose:
print()
return True, previous_job
# Check if run was already finished:
tmp_out_cnfs_regex = "_".join(tmp_out_cnf.split("_")[:-1]) + "*.cnf"
if verbose:
print("Checking for resulting files: " + tmp_out_cnfs_regex)
if len(glob.glob(tmp_out_cnfs_regex)) > 0: # was this job already run and finished?
if verbose:
print("\t\t NOT SUBMITTED!(inScript) as these Files were found: \n\t" + tmp_out_cnfs_regex)
setattr(simSystem, "coord_seeds", tmp_out_cnf) # set next coord Files
if (verbose) and verbose_lvl >= 2:
print(simSystem.cnf.path)
if (verbose) and verbose_lvl >= 2:
print(True)
if (verbose) and verbose_lvl >= 2:
print()
return True, None
if verbose:
print(False)
if verbose:
print()
return False, previous_job
[docs]def chain_submission(
simSystem: Gromos_System,
out_dir_path: str,
out_prefix: str,
chain_job_repetitions: int,
worker_script: str,
job_submission_system: _SubmissionSystem,
jobname: str,
run_analysis_script_every_x_runs: int = 0,
in_analysis_script_path: str = "",
start_run_index: int = 1,
prefix_command: str = "",
previous_job_ID: int = None,
work_dir: str = None,
initialize_first_run: bool = True,
reinitialize_every_run: bool = False,
verbose: bool = False,
verbose_lvl: int = 1,
) -> Tuple[int, str, Gromos_System]:
"""
this function submits a chain of simulation steps to the queuing system and does the file managment.
Parameters
----------
simSystem : Gromos_System
simulation system
out_dir_path : str
out directory path
out_prefix : str
out prefix for simulation files
chain_job_repetitions : int
how often, should the simulation be repeated (in continuation)
worker_script : str
worker, that should be submitted. This script will be executed at each scheduled job.
job_submission_system : _SubmissionSystem
submission system, what type of submission?
jobname : str
name of the simulation job
run_analysis_script_every_x_runs : int, optional
run analysis in between - (careful will not be overwritten, make sure final analysis is correct.), by default 0
in_analysis_script_path : str, optional
analysis script for simulation, that should be applied (will at least be applied after the full simulation chain.), by default ""
start_run_index : int, optional
start index of the job chain., by default 1
prefix_command : str, optional
any bash prefix commands, before submitting?, by default ""
previous_job_ID : int, optional
ID of the prefious job, to be chained to. , by default None
work_dir : str, optional
dir to wich the work in progress will be written. if None a tmp-srcatch dir will be used with LSF!, by default None
initialize_first_run : bool, optional
should the velocities for the first run be initialized?, by default True
reinitialize_every_run : bool, optional
should in every run, the velocities be reinitialized?, by default False
verbose : bool, optional
more bla bla, by default False
verbose_lvl : int, optional
nicely define ammount of bla bla, by default 1
Returns
-------
Tuple[int, str, Gromos_System]
Tuple[previous_job_ID, tmp_jobname, simSystem]
will return the last job_ID, the last tmp_jobname and the final gromosSystem.
Raises
------
ValueError
if submission fails. This can habe various reasons, always check also the present files! (*omd etc.)
"""
if verbose:
print("\nChainSubmission - " + out_prefix + "\n" + "=" * 30 + "\n")
if (verbose) and verbose_lvl >= 2:
print("start_run_index " + str(start_run_index))
if (verbose) and verbose_lvl >= 2:
print("job reptitions " + str(chain_job_repetitions))
if job_submission_system is not LOCAL:
simSystem._future_promise = True
ana_id = None
job_submission_system.job_duration = job_submission_system.job_duration
for runID in range(start_run_index, chain_job_repetitions + 1):
if verbose:
print("\n submit " + jobname + "_" + str(runID) + "\n" + spacer3)
tmp_outprefix = out_prefix + "_" + str(runID)
tmp_jobname = jobname + "_" + str(runID)
tmp_outdir = out_dir_path + "/" + tmp_outprefix
tmp_out_cnf = tmp_outdir + "/" + tmp_outprefix + ".cnf"
# Checks if run should be skipped!
do_skip, previous_job_ID = do_skip_job(
tmp_out_cnf=tmp_out_cnf,
simSystem=simSystem,
tmp_jobname=tmp_jobname,
job_submission_system=job_submission_system,
previous_job=previous_job_ID,
verbose=verbose,
)
if not do_skip:
bash.make_folder(tmp_outdir)
# build COMMANDS:
if len(prefix_command) > 1:
prefix_command += " && "
# We will write the arguments to the python script in a bash array
# to make it simpler to read in our input files.
md_args = "md_args=(\n"
md_args += "-out_dir " + tmp_outdir + "\n"
md_args += "-in_cnf_path " + simSystem.cnf.path + "\n"
md_args += "-in_imd_path " + simSystem.imd.path + "\n"
md_args += "-in_top_path " + simSystem.top.path + "\n"
md_args += "-runID " + str(runID) + "\n"
# OPTIONAL ARGS
if simSystem.disres is not None:
md_args += "-in_disres_path " + simSystem.disres.path + "\n"
if simSystem.ptp is not None:
md_args += "-in_perttopo_path " + simSystem.ptp.path + "\n"
if simSystem.refpos is not None:
md_args += "-in_refpos_path " + simSystem.refpos.path + "\n"
if simSystem.qmmm is not None:
md_args += "-in_qmmm_path " + simSystem.qmmm.path + " "
if simSystem.posres is not None:
md_args += "-in_posres_path " + simSystem.posres.path + "\n"
md_args += "-nmpi " + str(job_submission_system.nmpi) + "\n"
md_args += "-nomp " + str(job_submission_system.nomp) + "\n"
md_args += "-initialize_first_run " + str(initialize_first_run) + "\n"
md_args += "-reinitialize_every_run " + str(reinitialize_every_run) + "\n"
md_args += "-gromosXX_bin_dir " + str(simSystem.gromosXX.bin) + "\n"
md_args += "-gromosXX_check_binary_paths " + str(simSystem.gromosXX._check_binary_paths) + "\n"
if work_dir is not None:
md_args += "-work_dir " + str(work_dir) + "\n"
if hasattr(simSystem.imd, "WRITETRAJ"):
if simSystem.imd.WRITETRAJ.NTWX > 0:
md_args += "-out_trc " + str(True) + "\n"
if simSystem.imd.WRITETRAJ.NTWE > 0:
md_args += "-out_tre " + str(True) + "\n"
if simSystem.imd.WRITETRAJ.NTWV > 0:
md_args += "-out_trv " + str(True) + "\n"
if simSystem.imd.WRITETRAJ.NTWF > 0:
md_args += "-out_trf " + str(True) + "\n"
if simSystem.imd.WRITETRAJ.NTWG > 0:
md_args += "-out_trg " + str(True) + "\n"
md_args += "-zip_trajectories " + str(job_submission_system.zip_trajectories) + "\n"
md_args += ")\n" # closing the bash array which stores all arguments.
# add zip option here
# MAIN commands
md_script_command = prefix_command + "\n\n" + md_args + "\n"
md_script_command += "python3 " + worker_script + ' "${md_args[@]}" \n'
if verbose:
print("PREVIOUS ID: ", previous_job_ID)
if verbose_lvl >= 2:
print("COMMAND: ", md_script_command)
# SCHEDULE THE COMMANDS
try:
if verbose:
print("\tSIMULATION")
os.chdir(tmp_outdir)
sub_job = Submission_job(
command=md_script_command,
jobName=tmp_jobname,
submit_from_dir=tmp_outdir,
queue_after_jobID=previous_job_ID,
outLog=tmp_outdir + "/" + out_prefix + "_md.out",
errLog=tmp_outdir + "/" + out_prefix + "_md.err",
sumbit_from_file=True,
)
previous_job_ID = job_submission_system.submit_to_queue(sub_job)
if verbose:
print("SIMULATION ID: ", previous_job_ID)
except ValueError as err: # job already in the queue
raise ValueError(
"ERROR during submission of main job " + str(tmp_jobname) + ":\n" + "\n".join(err.args)
)
# OPTIONAL schedule - analysis inbetween.
if (
runID > 1
and run_analysis_script_every_x_runs != 0
and runID % run_analysis_script_every_x_runs == 0
and runID < chain_job_repetitions
):
if (verbose) and verbose_lvl >= 2:
print("\tINBETWEEN ANALYSIS")
sub_job = Submission_job(
command=in_analysis_script_path,
jobName=jobname + "_intermediate_ana_run_" + str(runID),
outLog=tmp_outdir + "/" + out_prefix + "_inbetweenAna.out",
errLog=tmp_outdir + "/" + out_prefix + "_inbetweenAna.err",
queue_after_jobID=previous_job_ID,
)
try:
ana_id = job_submission_system.submit_to_queue(sub_job)
if (verbose) and verbose_lvl >= 2:
print("\n")
except ValueError as err: # job already in the queue
print("ERROR during submission of analysis command of " + sub_job.jobName + ":\n")
print("\n".join(err.args))
else:
if (verbose) and verbose_lvl >= 2:
print("Did not submit!")
if (verbose) and verbose_lvl >= 2:
print("\n")
if (verbose) and verbose_lvl >= 2:
print("job_postprocess ")
prefix_command = ""
# Resulting cnf is provided to use it in further approaches.
simSystem.cnf = Cnf(tmp_out_cnf, _future_file=True)
if ana_id is not None:
previous_job_ID = ana_id
return previous_job_ID, tmp_jobname, simSystem