import os import sys import glob from functions import natural_key, abs_path class Job: """ A class that represents a job/system residing in a unique directory path Calling prepare_sim() assigns the additional instance variables: steps = number of steps to run the simulation ffs = a list of forcefield files for the simulation next_step, final_step_num = the type of step and the final number of steps conf, out file = the configuration file and the simulation output file """ def __init__(self, path): self.set_empty_vars() self.path = path self.name = self.get_job_name() self.prefix = os.path.join(self.path, self.name) self.pdb, self.psf = self.get_pdb_and_psf() self.stage, self.step, self.coor = self.get_stage() def set_empty_vars(self): self.pdb, self.psf = '', '' self.steps = '' self.ffs_path, self.ffs = '', '' self.next_stage, self.next_step = '', '' self.conf, self.out = '', '' def get_job_name(self): '''Get job name from path''' name = os.path.basename(self.path) return name def get_pdb_and_psf(self): pdb = glob.glob(f'{self.prefix}*_solv_ion.pdb') psf = glob.glob(f'{self.prefix}*_solv_ion.psf') if len(pdb) == 1 and len(psf) == 1: return pdb[0], psf[0] pdb = glob.glob(f'{self.prefix}.pdb') psf = glob.glob(f'{self.prefix}.psf') if len(pdb) == 1 and len(psf) == 1: return pdb[0], psf[0] else: print('No PDB or PSF files found, exiting...') sys.exit(1) def get_stage(self): coors_path = self.prefix + '*.coor' coors = glob.glob(coors_path) coors.sort(key=natural_key) if len(coors) >= 1: coor = coors[-1] stage = coor.split('_')[-2].split('.')[0] step = int(coor.split('_')[-1].split('.')[0]) else: coor = '' stage = '' step = 0 return stage, int(step), coor def prepare_sim(self, ffs_path, steps): self.steps = steps self.ffs_path = abs_path(ffs_path) self.ffs = self.get_ffs() self.next_stage, self.next_step = self.get_next_stage() self.conf, self.out = self.conf_out_file() def get_next_stage(self): if self.stage == '4-sim': next_stage = '4-sim' next_step = self.step + self.steps elif self.stage == '3-heat': next_stage = '4-sim' next_step = steps elif self.stage == '2-min': next_stage = '3-heat' next_step = 0 elif self.stage == '1-min': next_stage = '2-min' next_step = 0 elif self.stage == '': next_stage = '1-min' next_step = 0 return next_stage, int(next_step) def get_ffs(self): ffs = glob.glob(os.path.join(self.ffs_path, '*')) return ffs def conf_out_file(self): if self.next_stage == '4-sim': conf = f'{self.prefix}_{self.next_stage}' \ f'_{str(self.next_step)}.conf' out = f'{self.prefix}_{self.next_stage}' \ f'_{str(self.next_step)}.out' else: conf = f'{self.prefix}_{self.next_stage}.conf' out = f'{self.prefix}_{self.next_stage}.out' return conf, out def info(self): print(f'Job Path: {self.path}\n' f'Name: {self.name}\n' f'Working PDB: {self.pdb}\n' f'Working PSF: {self.psf}\n' f'Previous Step: {self.stage}\n' f'Previous Step Number: {str(self.step)}\n' f'Next Stage: {self.next_stage}\n' f'Next Step Number: {str(self.next_step)}\n' f'FF Path: {self.ffs_path}\n' f'FFs: {self.ffs}\n' ) def get_jobs_from_path(jobs_path): jobs_path += '/**/*.pdb' job_dirs = glob.glob(jobs_path, recursive=True) job_dirs_l = [] for job_dir in job_dirs: job_dirs_l.append(os.path.dirname(job_dir)) if len(job_dirs_l) < 1: print('No valid jobs found in jobs path!') sys.exit(1) return job_dirs_l def get_next_job(jobs): jobs.sort(key=lambda x: (x.stage, x.step)) return jobs[0] def create_job_instances(job_paths): job_instances = [] for job_path in job_paths: job_instances.append(Job(job_path)) return job_instances