Source code for pygromos.simulations.hpc_queuing.submission_systems.local

import os
import warnings
import pandas as pd
from pygromos.utils.typing import List

from pygromos.simulations.hpc_queuing.submission_systems._submission_system import _SubmissionSystem
from pygromos.simulations.hpc_queuing.submission_systems.submission_job import Submission_job

from pygromos.utils import bash


[docs]class LOCAL(_SubmissionSystem): """ This class handles local submission without a queueing system """ def __init__( self, submission: bool = True, nomp: int = 1, nmpi: int = 1, job_duration: str = "24:00", verbose: bool = False, environment=None, zip_trajectories: bool = True, ): super().__init__( verbose=verbose, nmpi=nmpi, nomp=nomp, job_duration=job_duration, submission=submission, environment=environment, zip_trajectories=zip_trajectories, )
[docs] def submit_to_queue(self, sub_job: Submission_job) -> int: """ submitt a local job """ orig_dir = os.getcwd() if isinstance(sub_job.submit_from_dir, str) and os.path.isdir(sub_job.submit_from_dir): os.chdir(sub_job.submit_from_dir) command_file_path = sub_job.submit_from_dir + "/job_" + str(sub_job.jobName) + ".sh" else: command_file_path = "./job_" + str(sub_job.jobName) + ".sh" sub_job.command = sub_job.command.strip() # remove trailing linebreaks if self._nomp >= 1: command = "export OMP_NUM_THREADS=" + str(self._nomp) + ";\n " + sub_job.command + "" else: command = sub_job.command if sub_job.sumbit_from_file: command_file = open(command_file_path, "w") command_file.write("#!/bin/bash\n") command_file.write(command.replace("&& ", ";\n") + ";\n") command_file.close() command = command_file_path bash.execute("chmod +x " + command_file_path, env=self.environment) # finalize string if self.verbose: print("Submission Command: \t", " ".join(command)) if self.submission: try: process = bash.execute(command=command, catch_STD=True, env=self.environment) std_out_buff = map(str, process.stdout.readlines()) std_out = "\t" + "\n\t".join(std_out_buff) # next sopt_job is queued with id: if self.verbose: print("STDOUT: \n\t" + std_out + "\nEND\n") if os.path.exists(orig_dir): os.chdir(orig_dir) return 0 except ChildProcessError: try: print(process) except ChildProcessError: pass raise ChildProcessError("command failed: \n" + str(command)) else: print("Did not submit: ", command) return -1
[docs] def submit_jobAarray_to_queue(self, sub_job: Submission_job) -> int: """ submitt a local job array """ # generate submission_string: submission_string = "" if isinstance(sub_job.submit_from_dir, str) and os.path.isdir(sub_job.submit_from_dir): submission_string += "cd " + sub_job.submit_from_dir + " && " if self._nomp > 1: command = submission_string + " export OMP_NUM_THREADS=" + str(self._nomp) + " && " + sub_job.command else: command = submission_string + sub_job.command if self.verbose: print("Submission Command: \t", " ".join(submission_string)) if self.submission: try: for jobID in range(sub_job.start_job, sub_job.end_job + 1): std_out_buff = bash.execute( command="export JOBID=" + str(jobID) + " && " + command, env=self.environment ) std_out = "\n".join(std_out_buff.readlines()) if self.verbose: print("sdtout : " + str(std_out)) return 0 except ChildProcessError: raise ChildProcessError("could not submit this command: \n" + submission_string) else: print("Did note submit: ", command) return -1
[docs] def search_queue_for_jobname(self, job_name: str, **kwargs) -> List[str]: """search_queue_for_jobname this jobs searches the job queue for a certain job name. DUMMY FUNCTION! Parameters ---------- job_name : str regex: bool, optional if the string is a Regular Expression Returns ------- List[str] the output of the queue containing the jobname """ if self.verbose: print("Searching job Name: ", job_name) warnings.warn("Queue search was called, but no queue present!") return []
[docs] def search_queue_for_jobid(self, job_id: int, **kwargs) -> pd.DataFrame: """search_queue_for_jobid this jobs searches the job queue for a certain job id. DUMMY FUNCTION! Parameters ---------- job_id : int id of the job Raises ------- NotImplemented Needs to be implemented in subclasses """ if self.verbose: print("Searching job ID: ", job_id) warnings.warn("Queue search was called, but no queue present!") return []