Source code for pygromos.simulations.hpc_queuing.submission_systems.dummy

import warnings
from pygromos.utils.typing import Union, List

from pygromos.simulations.hpc_queuing.submission_systems._submission_system import _SubmissionSystem
from pygromos.simulations.hpc_queuing.submission_systems.submission_job import Submission_job


[docs]class DUMMY(_SubmissionSystem): """DUMMY This SubmissionSystem is for testing submission systems. It basically prints everything out. """ def __init__( self, verbose: bool = False, nomp: int = 1, nmpi: int = 1, job_duration: str = "24:00", submission: bool = True, environment=None, ): super().__init__( verbose=verbose, nmpi=nmpi, nomp=nomp, job_duration=job_duration, submission=submission, environment=environment, )
[docs] def submit_to_queue(self, sub_job: Submission_job) -> Union[int, None]: """submit_to_queue This function submits a str command to the submission system. Parameters ---------- sub_job : Submission_job submission job parameters Returns ------- int, None if a job was submitted the jobID is returned else None. """ if self.submission: print("\n", sub_job.command, "\n") return 0 else: print("did not submit") return None
[docs] def submit_jobAarray_to_queue(self, sub_job: Submission_job) -> Union[int, None]: """submit_jobAarray_to_queue this function is submitting Parameters ---------- sub_job : Submission_job submission job parameters Returns ------- int, None if a job was submitted the jobID is returned else None. """ if self.submission: print() for jobID in range(sub_job.start_job, sub_job.end_job + 1): print("Job " + str(jobID) + ":", sub_job.command, "\n") print() return 0 else: print("did not submit") return None
[docs] def get_jobs_from_queue(self, job_text: str, **kwargs) -> List[int]: """search_queue_for_jobname this jobs searches the job queue for a certain job id. Parameters ---------- job_text : str regex: bool, optional if the string is a Regular Expression Returns ------- List[int] output contains all ids of fitting jobs to the querry """ if self.verbose: print("Retrieving jobs from list with: ", job_text) warnings.warn("Queue search was called, but no queue present!") return []
[docs] def search_queue_for_jobname(self, job_name: str, **kwargs) -> List[str]: """search_queue_for_jobname this jobs searches the job queue for a certain job id. Parameters ---------- job_name : str regex: bool, optional if the string is a Regular Expression Returns ------- List[str] the output of the queue containing the jobname """ if self.verbose: print("Searching job Name: ", job_name) warnings.warn("Queue search was called, but no queue present!") return []