import os
from datetime import datetime
import pandas as pd
from pygromos.utils.typing import Union, List
from pygromos.simulations.hpc_queuing.submission_systems._submission_system import _SubmissionSystem
from pygromos.simulations.hpc_queuing.submission_systems.submission_job import Submission_job
from pygromos.utils import bash
[docs]class LSF(_SubmissionSystem):
"""LSF
This class is a wrapper for the LSF queueing system by IBM, like it is used on Euler.
"""
_dummy: bool = False
_refresh_job_queue_list_all_s: int = 60 # update the job-queue list every x seconds
_job_queue_time_stamp: datetime
def __init__(
self,
submission: bool = True,
nomp: int = 1,
nmpi: int = 1,
job_duration: str = "24:00",
max_storage: float = 1000,
verbose: bool = False,
environment=None,
block_double_submission: bool = True,
bjobs_only_same_host: bool = False,
chain_prefix: str = "done",
begin_mail: bool = False,
end_mail: bool = False,
zip_trajectories: bool = True,
):
# general settings for the submission system
super().__init__(
verbose=verbose,
nmpi=nmpi,
nomp=nomp,
job_duration=job_duration,
max_storage=max_storage,
submission=submission,
environment=environment,
block_double_submission=block_double_submission,
chain_prefix=chain_prefix,
begin_mail=begin_mail,
end_mail=end_mail,
zip_trajectories=zip_trajectories,
)
# Only LSF specific settings:
self.bjobs_only_same_host = bjobs_only_same_host
[docs] def submit_to_queue(self, sub_job: Submission_job) -> int:
"""
This function submits the given command to the LSF QUEUE
Parameters
----------
submission_job : Submission_job
the job to be submitted
-------
"""
# job_properties:Job_properties=None, <- currently not usd
orig_dir = os.getcwd()
# generate submission_string:
submission_string = ""
# QUEUE checking to not double submit
if self._block_double_submission and self._submission:
if self.verbose:
print("check queue")
ids = list(self.search_queue_for_jobname(sub_job.jobName).index)
if len(ids) > 0:
if self.verbose:
print(
"\tSKIP - FOUND JOB: \t\t"
+ "\n\t\t".join(map(str, ids))
+ "\n\t\t with jobname: "
+ sub_job.jobName
)
return ids[0]
if isinstance(sub_job.submit_from_dir, str) and os.path.isdir(sub_job.submit_from_dir):
os.chdir(sub_job.submit_from_dir)
command_file_path = sub_job.submit_from_dir + "/job_" + str(sub_job.jobName) + ".sh"
else:
command_file_path = "./job_" + str(sub_job.jobName) + ".sh"
submission_string += "bsub "
submission_string += " -J" + sub_job.jobName + " "
submission_string += " -W " + str(self._job_duration) + " "
if not isinstance(sub_job.post_execution_command, type(None)):
submission_string += '-Ep "' + sub_job.post_execution_command + '" '
if not isinstance(sub_job.outLog, str) and not isinstance(sub_job.errLog, str):
outLog = sub_job.jobName + ".out"
submission_string += " -o " + outLog
elif isinstance(sub_job.outLog, str):
submission_string += " -o " + sub_job.outLog
if isinstance(sub_job.errLog, str):
submission_string += " -e " + sub_job.errLog
nCPU = self._nmpi * self._nomp
submission_string += " -n " + str(nCPU) + " "
# TODO: add GPU support
# add_string = ""
# add_string= "-R \"select[model==XeonGold_5118 || model==XeonGold_6150 || model==XeonE3_1585Lv5 || model==XeonE3_1284Lv4 || model==XeonE7_8867v3 || model == XeonGold_6140 || model==XeonGold_6150 ]\""
if isinstance(self._max_storage, int):
submission_string += " -R rusage[mem=" + str(self._max_storage) + "] "
if isinstance(sub_job.queue_after_jobID, (int, str)) and (
sub_job.queue_after_jobID != 0 or sub_job.queue_after_jobID != "0"
):
submission_string += ' -w "' + self._chain_prefix + "(" + str(sub_job.queue_after_jobID) + ')" '
if self._begin_mail:
submission_string += " -B "
if self._end_mail:
submission_string += " -N "
sub_job.command = sub_job.command.strip() # remove trailing line breaks
if self._nomp >= 1:
command = "export OMP_NUM_THREADS=" + str(self._nomp) + ";\n " + sub_job.command + " "
else:
command = "\n " + sub_job.command + ""
if sub_job.sumbit_from_file:
if self.verbose:
print("writing tmp-submission-file to: ", command_file_path)
command_file = open(command_file_path, "w")
command_file.write("#!/bin/bash\n")
command_file.write(command + ";\n")
command_file.close()
command = command_file_path
bash.execute("chmod +x " + command_file_path, env=self._environment)
# finalize string
submission_string = list(map(lambda x: x.strip(), submission_string.split())) + [command]
if self.verbose:
print("Submission Command: \t", " ".join(submission_string))
if self._submission and not self._dummy:
try:
out_process = bash.execute(command=submission_string, catch_STD=True, env=self._environment)
std_out = "\n".join(map(str, out_process.stdout.readlines()))
# next sopt_job is queued with id:
id_start = std_out.find("<")
id_end = std_out.find(">")
job_id = int(str(std_out[id_start + 1 : id_end]).strip())
if self.verbose:
print("process returned id: " + str(job_id))
if str(job_id) == "" and job_id.isalnum():
raise ValueError("Did not get at job ID!")
except Exception as e:
raise ChildProcessError("could not submit this command: \n" + str(submission_string) + "\n\n" + str(e))
else:
job_id = -1
os.chdir(orig_dir)
sub_job.jobID = job_id
return job_id
[docs] def submit_jobAarray_to_queue(self, sub_job: Submission_job) -> int:
"""
This functioncan be used for submission of a job array. The ammount of jobs is determined by the difference:
end_job-start_job
An array index variable is defined called ${JOBID} inside the command representing job x in the array.
Parameters
----------
sub_job: Submission_job
the job to be submitted
Returns
-------
int
return job ID
"""
# QUEUE checking to not double submit
if self._submission and self._block_double_submission:
if self.verbose:
print("check queue")
ids = self.search_queue_for_jobname(sub_job.jobName)
if len(ids) > 0:
if self.verbose:
print(
"\tSKIP - FOUND JOB: \t\t"
+ "\n\t\t".join(map(str, ids))
+ "\n\t\t with jobname: "
+ sub_job.jobName
)
return ids[0]
# generate submission_string:
submission_string = ""
if isinstance(sub_job.submit_from_dir, str) and os.path.isdir(sub_job.submit_from_dir):
submission_string += "cd " + sub_job.submit_from_dir + " && "
if sub_job.jobLim is None:
jobLim = sub_job.end_job - sub_job.start_job
jobName = str(sub_job.jobName) + "[" + str(sub_job.start_job) + "-" + str(sub_job.end_job) + "]%" + str(jobLim)
submission_string += 'bsub -J " ' + jobName + ' " -W "' + str(self._job_duration) + '" '
if isinstance(sub_job.jobGroup, str):
submission_string += " -g " + sub_job.jobGroup + " "
if not isinstance(sub_job.outLog, str) and not isinstance(sub_job.errLog, str):
outLog = jobName + ".out"
submission_string += " -oo " + outLog
elif isinstance(sub_job.outLog, str):
submission_string += " -oo " + sub_job.outLog
if isinstance(sub_job.errLog, str):
submission_string += " -eo " + sub_job.errLog
nCPU = self._nmpi * self._nomp
submission_string += " -n " + str(nCPU) + " "
if isinstance(self.max_storage, int):
submission_string += ' -R "rusage[mem=' + str(self._max_storage) + ']" '
if isinstance(sub_job.queue_after_jobID, (int, str)):
submission_string += " -w " + self._chain_prefix + "(" + str(sub_job.queue_after_jobID) + ')" '
if self._begin_mail:
submission_string += " -B "
if self._end_mail:
submission_string += " -N "
if self._nomp > 1:
command = " export OMP_NUM_THREADS=" + str(self._nomp) + " && " + sub_job.command + " "
else:
command = " " + sub_job.command + " "
# finalize string
submission_string = list(map(lambda x: x.strip(), submission_string.split())) + [command]
if self.verbose:
print("Submission Command: \t", " ".join(submission_string))
if self._submission and not self._dummy:
try:
std_out_buff = bash.execute(command=submission_string, env=self._environment)
std_out = "\n".join(std_out_buff.readlines())
# next sopt_job is queued with id:
id_start = std_out.find("<")
id_end = std_out.find(">")
job_id = str(std_out[id_start + 1 : id_end]).strip()
if self.verbose:
print("process returned id: " + str(job_id))
if job_id == "" and job_id.isalnum():
raise ValueError("Did not get at job ID!")
except Exception as e:
raise ChildProcessError(
"could not submit this command: \n" + " ".join(submission_string) + "\n\n" + str(e)
)
else:
job_id = -1
sub_job.jobID = job_id
return int(job_id)
"""
Job Queue Managment
"""
[docs] def get_queued_jobs(self) -> pd.DataFrame:
"""
This function updates the job-list of the queueing system in the class.
Returns
-------
pd.DataFrame
returns the job_queue as pandas dataFrame.
"""
# Do we need an update of the job list?
check_job_list = True
if hasattr(self, "_job_queue_time_stamp"):
last_update = datetime.now() - self._job_queue_time_stamp
check_job_list = last_update.seconds > self._refresh_job_queue_list_all_s
if not self._submission: # shortcut to reduce queue calls!
self._job_queue_list = pd.DataFrame(
columns=["JOBID USER STAT QUEUE FROM_HOST EXEC_HOST JOB_NAME SUBMIT_TIME".split()]
)
return self._job_queue_list
if check_job_list:
# try getting the lsf queue
if not self._dummy:
try:
# get all running and pending jobs
if self.bjobs_only_same_host:
out_process = bash.execute("bjobs -w | grep '$HOSTNAME|JOBID'", catch_STD=True)
else:
out_process = bash.execute("bjobs -w", catch_STD=True)
job_list_str = list(map(lambda x: x.decode("utf-8"), out_process.stdout.readlines()))
# get all finished jobs
if self.bjobs_only_same_host:
out_process = bash.execute("bjobs -wd | grep '$HOSTNAME|JOBID'", catch_STD=True)
else:
out_process = bash.execute("bjobs -wd", catch_STD=True)
job_list_finished_str = list(map(lambda x: x.decode("utf-8"), out_process.stdout.readlines()))
self._job_queue_time_stamp = datetime.now()
except Exception as err:
raise Exception("Could not get job_list!\nerr:\n" + "\n".join(err.args))
else:
job_list_str = []
job_list_finished_str = []
# format information:
jlist = list(map(lambda x: x.strip().split(), job_list_str))
jlist_fin = list(map(lambda x: x.strip().split(), job_list_finished_str))
if len(jlist) > 1:
header = jlist[0]
jobs = jlist[1:] + jlist_fin[1:]
jobs_dict = {}
for job in jobs:
jobID = int(job[0].split("[")[0])
user = job[1]
status = job[2]
queue = job[3]
from_host = job[4]
exec_host = job[5]
job_name = " ".join(job[6:-3])
submit_time = datetime.strptime(
str(datetime.now().year) + " " + " ".join(job[-3:]), "%Y %b %d %H:%M"
)
values = [jobID, user, status, queue, from_host, exec_host, job_name, submit_time]
jobs_dict.update({jobID: {key: value for key, value in zip(header, values)}})
self._job_queue_list = pd.DataFrame(jobs_dict, index=None).T
else:
self._job_queue_list = pd.DataFrame(
columns=[
"JOBID USER STAT QUEUE FROM_HOST EXEC_HOST JOB_NAME SUBMIT_TIME".split()
]
)
else:
if self.verbose:
print("Skipping refresh of job list, as the last update is " + str(last_update) + "s ago")
return self._job_queue_list
[docs] def search_queue_for_jobid(self, job_id: int) -> pd.DataFrame:
self.get_queued_jobs()
return self._job_queue_list.where(self._job_queue_list.JOBID == job_id).dropna()
[docs] def search_queue_for_jobname(self, job_name: str, regex: bool = False) -> pd.DataFrame:
"""search_queue_for_jobname
this jobs searches the job queue for a certain job name.
Parameters
----------
job_name : str
regex: bool, optional
if the string is a Regular Expression
Returns
-------
List[str]
the output of the queue containing the jobname
"""
self.get_queued_jobs()
if regex:
return self._job_queue_list.where(self._job_queue_list.JOB_NAME.str.match(job_name)).dropna()
else:
return self._job_queue_list.where(self._job_queue_list.JOB_NAME == job_name).dropna()
"""
kill jobs
"""
[docs] def kill_jobs(self, job_name: str = None, regex: bool = False, job_ids: Union[List[int], int] = None):
"""
this function can be used to terminate or remove pending jobs from the queue.
Parameters
----------
job_name : str
name of the job to be killed
regex : bool
if true, all jobs matching job_name get killed!
job_ids : Union[List[int], int]
job Ids to be killed
"""
if job_name is not None:
job_ids = list(self.search_queue_for_jobname(job_name, regex=regex).index)
elif job_ids is not None:
if isinstance(job_ids, int):
job_ids = [job_ids]
else:
raise ValueError("Please provide either job_name or job_ids!")
if self.verbose:
print("Stopping: " + ", ".join(map(str, job_ids)))
try:
bash.execute("bkill " + " ".join(map(str, job_ids)))
except Exception as err:
if any(["Job has already finished" in x for x in err.args]):
print("Job has already finished")
else:
raise ChildProcessError("could not execute this command: \n" + str(err.args))