Source code for bin.qstat
#!/usr/bin/env python3
import os
import re
import sys
import getpass
import argparse
import subprocess
'''
At the moment, this is just a wrapper script for qstat, offering enhanced output.
Long-term, an easy to use job manager would be beneficial.
The main tools for managing jobs are:
* qsub
* qstat
* qdel
The corresponding existing wrapper scripts:
* superqsub.sh
* batch_qsub.sh
* qdel_all.sh
* qdel_queued.sh
Ideas:
* Create torque-utils.py module shared by the corresponding python wrapper scripts?
* Create single script with CLI options?
* Create GUI? -> terminal(ncurses) or X(tk, Qt, etc)? Any GUI toolkit with terminal+X support?
.. todo:: Finish this
.. todo:: qsub wrapper: check if job is not already running before submitting by checking qstat output
.. todo:: combine with bfdtd_simtime.py to list progress of jobs as well! This will also allow estimating how many iterations a sim will reach before walltime cutoff, etc.
'''
[docs]def parseQstatFullOutput(qstat_full_output):
'''
Parses "qstat -f" output into a more usable format
input: qstat -f output in string format
output: a list of dictionaries containing the various attributes of a job
'''
job_list = []
pattern_job = re.compile("Job Id: (?P<jobId>.*?)\n(?P<jobDetails>.*?)\n\n",re.DOTALL)
for job_blob in pattern_job.finditer(qstat_full_output):
job_dict = {}
job_id = job_blob.groupdict()['jobId']
job_details = job_blob.groupdict()['jobDetails']
job_dict['jobId'] = job_id
job_details = job_details.replace('\n\t','')
pattern_jobitems = re.compile("\s*(?P<key>.*?)\s*=\s*(?P<value>.*)\s*")
for job_item in pattern_jobitems.finditer(job_details):
key = job_item.groupdict()['key']
value = job_item.groupdict()['value']
job_dict[key]=value
job_list.append(job_dict)
return(job_list)
[docs]def main():
'''
qstat wrapper
'''
parser = argparse.ArgumentParser(description='qstat wrapper offering enhancec output')
if len(sys.argv)>1:
with open(sys.argv[1], 'r') as f:
qstat_full_output = f.read()
else:
qstat_process = subprocess.Popen(['qstat','-f'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
#return_code = qstat_process.wait() # causes hanging on python 2.7.2
(qstat_full_output, qstat_stderr) = qstat_process.communicate()
job_list = parseQstatFullOutput(qstat_full_output)
#Job id Name User Time Use S Queue
#------------------------- ---------------- --------------- -------- - -----
try:
for job in job_list:
if getpass.getuser() in job['Job_Owner']:
#print(job['Job_Name'])
#print(job['jobId'])
#print(job['submit_args'].split()[-1])
if 'Variable_List' in job.keys():
variable_dict = dict([i.split('=',1) for i in job['Variable_List'].split(',')])
#print(variable_dict)
#print(variable_dict['JOBDIR'])
#print(variable_dict['PBS_O_WORKDIR'])
script_path = os.path.join(variable_dict['PBS_O_WORKDIR'],job['submit_args'].split()[-1])
else:
script_path = job['submit_args'].split()[-1]
if 'resources_used.cput' in job.keys():
running_time = job['resources_used.cput']
else:
running_time = '00:00:00'
#job['jobId']
#script_path
#job['Job_Owner']
#running_time
#job['job_state']
#job['queue']
print(job['jobId'] + ' job_state=' + job['job_state'] + ' running_time=' + str(running_time) + ' -> ' + script_path, flush=True)
except (BrokenPipeError, IOError):
pass
#print('Number of jobs = '+str(len(job_list)))
sys.stderr.close()
return
if __name__ == "__main__":
main()