fsl.utils.fslsub

This module submits jobs to a computing cluster using FSL’s fsl_sub command line tool. It is assumed that the computing cluster is managed by SGE.

Example usage, building a short pipeline:

from fsl.utils.fslsub import submit, wait

# submits bet to veryshort queue unless <mask_filename> already exists
bet_job = submit('bet <input_filename> -m',
                 queue='veryshort.q',
                 output='<mask_filename>')

# submits another job
other_job = submit('some other pre-processing step', queue='short.q')

# submits cuda job, that should only start after both preparatory jobs are
# finished. This will work if bet_job and other_job are single job-ids
# (i.e., strings) or a sequence of multiple job-ids
cuda_job = submit('expensive job',
                  wait_for=(bet_job, other_job),
                  queue='cuda.q')

# waits for the cuda job to finish
wait(cuda_job)

submit

Submits a given command to the cluster

info

Gets information on a given job id

output

Returns the output of the given job.

wait

Wait for one or more jobs to finish

func_to_cmd

Defines the command needed to run the function from the command line

class fsl.utils.fslsub.SubmitParams(minutes: Optional[float] = None, queue: Optional[str] = None, architecture: Optional[str] = None, priority: Optional[int] = None, email: Optional[str] = None, wait_for: Union[str, None, Collection[str]] = None, job_name: Optional[str] = None, ram: Optional[int] = None, logdir: Optional[str] = None, mail_options: Optional[str] = None, flags: bool = False, multi_threaded: Optional[Tuple[str, str]] = None, verbose: bool = False, env: dict = None)[source]

Bases: object

Represents the fsl_sub parameters

minutes = None
queue = None
architecture = None
priority = None
email = None
wait_for = None
job_name = None
ram = None
logdir = None
mail_options = None
flags = False
multi_threaded = None
verbose = False
env = None
cmd_line_flags = {'-M': 'email', '-N': 'job_name', '-R': 'ram', '-T': 'minutes', '-a': 'architecture', '-l': 'logdir', '-m': 'mail_options', '-p': 'priority', '-q': 'queue'}
__post_init__()[source]
as_flags()[source]

Creates flags for submission using fsl_sub

All parameters changed from their default value (typically None) will be included in the flags.

Returns

tuple with the flags

__str__()[source]

Return str(self).

__call__(*command, **kwargs)[source]

Submits the command to the cluster.

Parameters
  • command – string or tuple of strings with the command to submit

  • kwargs – Keyword arguments can override any parameters set in self

Returns

job ID

update(**kwargs)[source]

Creates a new SubmitParams withe updated parameters

classmethod add_to_parser(parser: argparse.ArgumentParser, as_group='fsl_sub commands', include=('wait_for', 'logdir', 'email', 'mail_options'))[source]

Adds submission parameters to the parser

Parameters
  • parser – parser that should understand submission commands

  • as_group – add as a new group

  • include – sequence of argument flags/names that should be added to the parser (set to None to include everything)

Returns

the group the arguments got added to

classmethod from_args(args)[source]
__annotations__ = {'architecture': typing.Union[str, NoneType], 'email': typing.Union[str, NoneType], 'env': <class 'dict'>, 'flags': <class 'bool'>, 'job_name': typing.Union[str, NoneType], 'logdir': typing.Union[str, NoneType], 'mail_options': typing.Union[str, NoneType], 'minutes': typing.Union[float, NoneType], 'multi_threaded': typing.Union[typing.Tuple[str, str], NoneType], 'priority': typing.Union[int, NoneType], 'queue': typing.Union[str, NoneType], 'ram': typing.Union[int, NoneType], 'verbose': <class 'bool'>, 'wait_for': typing.Union[str, NoneType, typing.Collection[str]]}
__dataclass_fields__ = {'architecture': Field(name='architecture',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'email': Field(name='email',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'env': Field(name='env',type=<class 'dict'>,default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'flags': Field(name='flags',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'job_name': Field(name='job_name',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'logdir': Field(name='logdir',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'mail_options': Field(name='mail_options',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'minutes': Field(name='minutes',type=typing.Union[float, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'multi_threaded': Field(name='multi_threaded',type=typing.Union[typing.Tuple[str, str], NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'priority': Field(name='priority',type=typing.Union[int, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'queue': Field(name='queue',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'ram': Field(name='ram',type=typing.Union[int, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'verbose': Field(name='verbose',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'wait_for': Field(name='wait_for',type=typing.Union[str, NoneType, typing.Collection[str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD)}
__dataclass_params__ = _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False)
__dict__ = mappingproxy({'__module__': 'fsl.utils.fslsub', '__annotations__': {'minutes': typing.Union[float, NoneType], 'queue': typing.Union[str, NoneType], 'architecture': typing.Union[str, NoneType], 'priority': typing.Union[int, NoneType], 'email': typing.Union[str, NoneType], 'wait_for': typing.Union[str, NoneType, typing.Collection[str]], 'job_name': typing.Union[str, NoneType], 'ram': typing.Union[int, NoneType], 'logdir': typing.Union[str, NoneType], 'mail_options': typing.Union[str, NoneType], 'flags': <class 'bool'>, 'multi_threaded': typing.Union[typing.Tuple[str, str], NoneType], 'verbose': <class 'bool'>, 'env': <class 'dict'>}, '__doc__': '\n Represents the fsl_sub parameters\n ', 'minutes': None, 'queue': None, 'architecture': None, 'priority': None, 'email': None, 'wait_for': None, 'job_name': None, 'ram': None, 'logdir': None, 'mail_options': None, 'flags': False, 'multi_threaded': None, 'verbose': False, 'env': None, 'cmd_line_flags': {'-T': 'minutes', '-q': 'queue', '-a': 'architecture', '-p': 'priority', '-M': 'email', '-N': 'job_name', '-R': 'ram', '-l': 'logdir', '-m': 'mail_options'}, '__post_init__': <function SubmitParams.__post_init__>, 'as_flags': <function SubmitParams.as_flags>, '__str__': <function SubmitParams.__str__>, '__call__': <function SubmitParams.__call__>, 'update': <function SubmitParams.update>, 'add_to_parser': <classmethod object>, 'from_args': <classmethod object>, '__dict__': <attribute '__dict__' of 'SubmitParams' objects>, '__weakref__': <attribute '__weakref__' of 'SubmitParams' objects>, '__dataclass_params__': _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False), '__dataclass_fields__': {'minutes': Field(name='minutes',type=typing.Union[float, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'queue': Field(name='queue',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'architecture': Field(name='architecture',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'priority': Field(name='priority',type=typing.Union[int, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'email': Field(name='email',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'wait_for': Field(name='wait_for',type=typing.Union[str, NoneType, typing.Collection[str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'job_name': Field(name='job_name',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'ram': Field(name='ram',type=typing.Union[int, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'logdir': Field(name='logdir',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'mail_options': Field(name='mail_options',type=typing.Union[str, NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'flags': Field(name='flags',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'multi_threaded': Field(name='multi_threaded',type=typing.Union[typing.Tuple[str, str], NoneType],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'verbose': Field(name='verbose',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'env': Field(name='env',type=<class 'dict'>,default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD)}, '__init__': <function __create_fn__.<locals>.__init__>, '__repr__': <function __create_fn__.<locals>.__repr__>, '__eq__': <function __create_fn__.<locals>.__eq__>, '__hash__': None})
__eq__(other)
__hash__ = None
__init__(minutes: Optional[float] = None, queue: Optional[str] = None, architecture: Optional[str] = None, priority: Optional[int] = None, email: Optional[str] = None, wait_for: Union[str, None, Collection[str]] = None, job_name: Optional[str] = None, ram: Optional[int] = None, logdir: Optional[str] = None, mail_options: Optional[str] = None, flags: bool = False, multi_threaded: Optional[Tuple[str, str]] = None, verbose: bool = False, env: dict = None) → None
__module__ = 'fsl.utils.fslsub'
__repr__()
__weakref__

list of weak references to the object (if defined)

fsl.utils.fslsub.submit(*command, **kwargs)[source]

Submits a given command to the cluster

You can pass the command and arguments as a single string, or as a regular or unpacked sequence.

Parameters
  • command – string or regular/unpacked sequence of strings with the job command

  • minutes – Estimated job length in minutes, used to auto-set queue name

  • queue – Explicitly sets the queue name

  • architecture – e.g., darwin or lx24-amd64

  • priority – Lower priority [0:-1024] default = 0

  • email – Who to email after job completion

  • wait_for – Place a hold on this task until the job-ids in this string or tuple are complete

  • job_name – Specify job name as it will appear on the queue

  • ram – Max total RAM to use for job (integer in MB)

  • logdir – where to output logfiles

  • mail_options – Change the SGE mail options, see qsub for details

  • output – If <output> image or file already exists, do nothing and exit

  • flags – If True, use flags embedded in scripts to set SGE queuing options

  • multi_threaded

    Submit a multi-threaded task - Set to a tuple containing two elements:

    • <pename>: a PE configures for the requested queues

    • <threads>: number of threads to run

  • verbose – If True, use verbose mode

  • env – Dict containing environment variables

Returns

string of submitted job id

fsl.utils.fslsub.info(job_id)[source]

Gets information on a given job id

Uses qstat -j <job_id>

Parameters

job_id – string with job id

Returns

dictionary with information on the submitted job (empty if job does not exist)

fsl.utils.fslsub.output(job_id, logdir='.', command=None, name=None)[source]

Returns the output of the given job.

Parameters
  • job_id – String containing job ID.

  • logdir – Directory containing the log - defaults to the current directory.

  • command – Command that was run. Not currently used.

  • name – Job name if it was specified. Not currently used.

Returns

A tuple containing the standard output and standard error.

fsl.utils.fslsub.wait(job_ids)[source]

Wait for one or more jobs to finish

Parameters

job_ids – string or tuple of strings with jobs that should finish before continuing

fsl.utils.fslsub._flatten_job_ids(job_ids)[source]

Returns a potentially nested sequence of job ids as a single comma-separated string

Parameters

job_ids – possibly nested sequence of job ids. The job ids themselves should be strings.

Returns

comma-separated string of job ids

fsl.utils.fslsub.func_to_cmd(func, args, kwargs, tmp_dir=None, clean=False)[source]

Defines the command needed to run the function from the command line

WARNING: if submitting a function defined in the __main__ script, the script will be run again to retrieve this function. Make sure there is a “if __name__ == ‘__main__’” guard to prevent the full script from being rerun.

Parameters
  • func – function to be run

  • args – positional arguments

  • kwargs – keyword arguments

  • tmp_dir – directory where to store the temporary file

  • clean – if True removes the submitted script after running it

Returns

string which will run the function