fsl.utils.fslsub
¶
This module submits jobs to a computing cluster using FSL’s fsl_sub command line tool. It is assumed that the computing cluster is managed by SGE.
Example usage, building a short pipeline:
from fsl.utils.fslsub import submit, wait
# submits bet to veryshort queue unless <mask_filename> already exists
bet_job = submit('bet <input_filename> -m',
queue='veryshort.q',
output='<mask_filename>')
# submits another job
other_job = submit('some other pre-processing step', queue='short.q')
# submits cuda job, that should only start after both preparatory jobs are
# finished. This will work if bet_job and other_job are single job-ids
# (i.e., strings) or a sequence of multiple job-ids
cuda_job = submit('expensive job',
wait_for=(bet_job, other_job),
queue='cuda.q')
# waits for the cuda job to finish
wait(cuda_job)
Submits a given command to the cluster |
|
Gets information on a given job id |
|
Returns the output of the given job. |
|
Wait for one or more jobs to finish |
|
Defines the command needed to run the function from the command line |
-
class
fsl.utils.fslsub.
SubmitParams
(minutes: Optional[float] = None, queue: Optional[str] = None, architecture: Optional[str] = None, priority: Optional[int] = None, email: Optional[str] = None, wait_for: Union[str, None, Collection[str]] = None, job_name: Optional[str] = None, ram: Optional[int] = None, logdir: Optional[str] = None, mail_options: Optional[str] = None, flags: bool = False, multi_threaded: Optional[Tuple[str, str]] = None, verbose: bool = False, env: dict = None)[source]¶ Bases:
object
Represents the fsl_sub parameters
-
minutes
= None¶
-
queue
= None¶
-
architecture
= None¶
-
priority
= None¶
-
email
= None¶
-
wait_for
= None¶
-
job_name
= None¶
-
ram
= None¶
-
logdir
= None¶
-
mail_options
= None¶
-
flags
= False¶
-
multi_threaded
= None¶
-
verbose
= False¶
-
env
= None¶
-
cmd_line_flags
= {'-M': 'email', '-N': 'job_name', '-R': 'ram', '-T': 'minutes', '-a': 'architecture', '-l': 'logdir', '-m': 'mail_options', '-p': 'priority', '-q': 'queue'}¶
-
as_flags
()[source]¶ Creates flags for submission using fsl_sub
All parameters changed from their default value (typically None) will be included in the flags.
- Returns
tuple with the flags
-
__call__
(*command, **kwargs)[source]¶ Submits the command to the cluster.
- Parameters
command – string or tuple of strings with the command to submit
kwargs – Keyword arguments can override any parameters set in self
- Returns
job ID
-
classmethod
add_to_parser
(parser: argparse.ArgumentParser, as_group='fsl_sub commands', include=('wait_for', 'logdir', 'email', 'mail_options'))[source]¶ Adds submission parameters to the parser
- Parameters
parser – parser that should understand submission commands
as_group – add as a new group
include – sequence of argument flags/names that should be added to the parser (set to None to include everything)
- Returns
the group the arguments got added to
-
__annotations__
= {'architecture': typing.Union[str, NoneType], 'email': typing.Union[str, NoneType], 'env': <class 'dict'>, 'flags': <class 'bool'>, 'job_name': typing.Union[str, NoneType], 'logdir': typing.Union[str, NoneType], 'mail_options': typing.Union[str, NoneType], 'minutes': typing.Union[float, NoneType], 'multi_threaded': typing.Union[typing.Tuple[str, str], NoneType], 'priority': typing.Union[int, NoneType], 'queue': typing.Union[str, NoneType], 'ram': typing.Union[int, NoneType], 'verbose': <class 'bool'>, 'wait_for': typing.Union[str, NoneType, typing.Collection[str]]}¶
-
__dataclass_fields__
= {'architecture': Field(name='architecture',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'email': Field(name='email',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'env': Field(name='env',type=<class 'dict'>,default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'flags': Field(name='flags',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'job_name': Field(name='job_name',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'logdir': Field(name='logdir',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'mail_options': Field(name='mail_options',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'minutes': Field(name='minutes',type=typing.Union[float, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'multi_threaded': Field(name='multi_threaded',type=typing.Union[typing.Tuple[str, str], NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'priority': Field(name='priority',type=typing.Union[int, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'queue': Field(name='queue',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'ram': Field(name='ram',type=typing.Union[int, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'verbose': Field(name='verbose',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'wait_for': Field(name='wait_for',type=typing.Union[str, NoneType, typing.Collection[str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD)}¶
-
__dataclass_params__
= _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False)¶
-
__dict__
= mappingproxy({'__module__': 'fsl.utils.fslsub', '__annotations__': {'minutes': typing.Union[float, NoneType], 'queue': typing.Union[str, NoneType], 'architecture': typing.Union[str, NoneType], 'priority': typing.Union[int, NoneType], 'email': typing.Union[str, NoneType], 'wait_for': typing.Union[str, NoneType, typing.Collection[str]], 'job_name': typing.Union[str, NoneType], 'ram': typing.Union[int, NoneType], 'logdir': typing.Union[str, NoneType], 'mail_options': typing.Union[str, NoneType], 'flags': <class 'bool'>, 'multi_threaded': typing.Union[typing.Tuple[str, str], NoneType], 'verbose': <class 'bool'>, 'env': <class 'dict'>}, '__doc__': '\n Represents the fsl_sub parameters\n ', 'minutes': None, 'queue': None, 'architecture': None, 'priority': None, 'email': None, 'wait_for': None, 'job_name': None, 'ram': None, 'logdir': None, 'mail_options': None, 'flags': False, 'multi_threaded': None, 'verbose': False, 'env': None, 'cmd_line_flags': {'-T': 'minutes', '-q': 'queue', '-a': 'architecture', '-p': 'priority', '-M': 'email', '-N': 'job_name', '-R': 'ram', '-l': 'logdir', '-m': 'mail_options'}, '__post_init__': <function SubmitParams.__post_init__>, 'as_flags': <function SubmitParams.as_flags>, '__str__': <function SubmitParams.__str__>, '__call__': <function SubmitParams.__call__>, 'update': <function SubmitParams.update>, 'add_to_parser': <classmethod object>, 'from_args': <classmethod object>, '__dict__': <attribute '__dict__' of 'SubmitParams' objects>, '__weakref__': <attribute '__weakref__' of 'SubmitParams' objects>, '__dataclass_params__': _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False), '__dataclass_fields__': {'minutes': Field(name='minutes',type=typing.Union[float, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'queue': Field(name='queue',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'architecture': Field(name='architecture',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'priority': Field(name='priority',type=typing.Union[int, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'email': Field(name='email',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'wait_for': Field(name='wait_for',type=typing.Union[str, NoneType, typing.Collection[str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'job_name': Field(name='job_name',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'ram': Field(name='ram',type=typing.Union[int, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'logdir': Field(name='logdir',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'mail_options': Field(name='mail_options',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'flags': Field(name='flags',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'multi_threaded': Field(name='multi_threaded',type=typing.Union[typing.Tuple[str, str], NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'verbose': Field(name='verbose',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'env': Field(name='env',type=<class 'dict'>,default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD)}, '__init__': <function __create_fn__.<locals>.__init__>, '__repr__': <function __create_fn__.<locals>.__repr__>, '__eq__': <function __create_fn__.<locals>.__eq__>, '__hash__': None})¶
-
__eq__
(other)¶
-
__hash__
= None¶
-
__init__
(minutes: Optional[float] = None, queue: Optional[str] = None, architecture: Optional[str] = None, priority: Optional[int] = None, email: Optional[str] = None, wait_for: Union[str, None, Collection[str]] = None, job_name: Optional[str] = None, ram: Optional[int] = None, logdir: Optional[str] = None, mail_options: Optional[str] = None, flags: bool = False, multi_threaded: Optional[Tuple[str, str]] = None, verbose: bool = False, env: dict = None) → None¶
-
__module__
= 'fsl.utils.fslsub'¶
-
__repr__
()¶
-
__weakref__
¶ list of weak references to the object (if defined)
-
-
fsl.utils.fslsub.
submit
(*command, **kwargs)[source]¶ Submits a given command to the cluster
You can pass the command and arguments as a single string, or as a regular or unpacked sequence.
- Parameters
command – string or regular/unpacked sequence of strings with the job command
minutes – Estimated job length in minutes, used to auto-set queue name
queue – Explicitly sets the queue name
architecture – e.g., darwin or lx24-amd64
priority – Lower priority [0:-1024] default = 0
email – Who to email after job completion
wait_for – Place a hold on this task until the job-ids in this string or tuple are complete
job_name – Specify job name as it will appear on the queue
ram – Max total RAM to use for job (integer in MB)
logdir – where to output logfiles
mail_options – Change the SGE mail options, see qsub for details
output – If <output> image or file already exists, do nothing and exit
flags – If True, use flags embedded in scripts to set SGE queuing options
multi_threaded –
Submit a multi-threaded task - Set to a tuple containing two elements:
<pename>: a PE configures for the requested queues
<threads>: number of threads to run
verbose – If True, use verbose mode
env – Dict containing environment variables
- Returns
string of submitted job id
-
fsl.utils.fslsub.
info
(job_id)[source]¶ Gets information on a given job id
Uses qstat -j <job_id>
- Parameters
job_id – string with job id
- Returns
dictionary with information on the submitted job (empty if job does not exist)
-
fsl.utils.fslsub.
output
(job_id, logdir='.', command=None, name=None)[source]¶ Returns the output of the given job.
- Parameters
job_id – String containing job ID.
logdir – Directory containing the log - defaults to the current directory.
command – Command that was run. Not currently used.
name – Job name if it was specified. Not currently used.
- Returns
A tuple containing the standard output and standard error.
-
fsl.utils.fslsub.
wait
(job_ids)[source]¶ Wait for one or more jobs to finish
- Parameters
job_ids – string or tuple of strings with jobs that should finish before continuing
-
fsl.utils.fslsub.
_flatten_job_ids
(job_ids)[source]¶ Returns a potentially nested sequence of job ids as a single comma-separated string
- Parameters
job_ids – possibly nested sequence of job ids. The job ids themselves should be strings.
- Returns
comma-separated string of job ids
-
fsl.utils.fslsub.
func_to_cmd
(func, args, kwargs, tmp_dir=None, clean=False)[source]¶ Defines the command needed to run the function from the command line
WARNING: if submitting a function defined in the __main__ script, the script will be run again to retrieve this function. Make sure there is a “if __name__ == ‘__main__’” guard to prevent the full script from being rerun.
- Parameters
func – function to be run
args – positional arguments
kwargs – keyword arguments
tmp_dir – directory where to store the temporary file
clean – if True removes the submitted script after running it
- Returns
string which will run the function