Source code for bids.analysis.analysis

"""BIDS-StatsModels functionality."""

import json
from collections import namedtuple, OrderedDict
from itertools import chain

import numpy as np
import pandas as pd

from bids.layout import BIDSLayout
from bids.utils import matches_entities, convert_JSON
from bids.variables import (BIDSVariableCollection, SparseRunVariable,
                            merge_collections)
from bids.analysis import transformations as tm
from .model_spec import create_model_spec


[docs]class Analysis(object): """Represents an entire BIDS-Model analysis. Parameters ---------- layout : :obj:`bids.layout.BIDSLayout` or str A BIDSLayout instance or path to pass on to the BIDSLayout initializer. model : str or dict A BIDS model specification. Can either be a string giving the path of the JSON model spec, or an already-loaded dict containing the model info. """
[docs] def __init__(self, layout, model): if not isinstance(layout, BIDSLayout): layout = BIDSLayout(layout) self.layout = layout self._load_model(model)
def __iter__(self): for b in self.steps: yield b def __getitem__(self, index): if isinstance(index, int): return self.steps[index] level = index.lower() name_matches = list(filter(lambda x: x.name == level, self.steps)) if not name_matches: raise KeyError('There is no step with the name "%s".' % index) return name_matches[0] def _load_model(self, model): if isinstance(model, str): with open(model, 'r', encoding='utf-8') as fobj: model = json.load(fobj) # Convert JSON from CamelCase to snake_case keys self.model = convert_JSON(model) steps = self.model['steps'] self.steps = [] for i, step_args in enumerate(steps): step = Step(self.layout, index=i, **step_args) self.steps.append(step)
[docs] def setup(self, steps=None, drop_na=False, finalize=True, **kwargs): """Set up the sequence of steps for analysis. Parameters ---------- steps : list Optional list of steps to set up. Each element must be either an int giving the index of the step in the JSON config step list, or a str giving the (unique) name of the step, as specified in the JSON config. Steps that do not match either index or name will be skipped. drop_na : bool Boolean indicating whether or not to automatically drop events that have a n/a amplitude when reading in data from event files. finalize : bool Indicates whether or not to finalize setup. If False, variables are loaded in each Step, but transformations aren't yet applied, and outputs from each Step aren't fed forward. If True, the latter procedures are executed, and all Step instances are finalized for design matrix generation. """ # Use inputs from model, and update with kwargs selectors = self.model.get('input', {}).copy() selectors.update(kwargs) for i, step in enumerate(self.steps): # Skip any steps whose names or indexes don't match step list if steps is not None and i not in steps and step.name not in steps: continue step.add_collections(drop_na=drop_na, **selectors) if finalize: selectors.pop('scan_length', None) # see TODO below self.finalize(**selectors)
def finalize(self, **kwargs): # The first Step in the sequence can't have any contrast inputs input_contrasts = None for step in self.steps: step.setup(input_contrasts, **kwargs) input_contrasts = [step.get_contrasts(c) for c in step.get_collections(**kwargs)] input_contrasts = list(chain(*input_contrasts))
[docs]class Step(object): """Represents a single analysis step from a BIDS-Model specification. Parameters ---------- layout : :obj:`bids.layout.BIDSLayout` The BIDSLayout containing all project files. level : str The BIDS keyword to use as the grouping variable; must be one of ['run', 'session', 'subject', or 'dataset']. index : int The numerical index of the current Step within the sequence of steps. name : str Optional name to assign to the step. Must be specified in order to enable name-based indexing in the parent Analysis. transformations : list List of BIDS-Model transformations to apply. model : dict The 'model' part of the BIDS-StatsModels specification. contrasts : list List of contrasts to apply to the parameter estimates generated when the model is fit. inputs : list Optional list of BIDSVariableCollections to use as input to this Step (typically, the outputs from the preceding Step). dummy_contrasts : dict Optional dictionary specifying which conditions to create indicator contrasts for. Dictionary must include a "type" key ('t' or 'FEMA'), and optionally a subset of "conditions". This parameter is over-written by the setting in setup() if the latter is passed. """
[docs] def __init__(self, layout, level, index, name=None, transformations=None, model=None, contrasts=None, inputs=None, dummy_contrasts=False): self.layout = layout self.level = level.lower() self.index = index self.name = name self.transformations = transformations or [] self.model = model or None self.contrasts = contrasts or [] self.inputs = inputs or [] self.dummy_contrasts = dummy_contrasts self._collections = [] # Collections loaded but not yet processed/transformed self._raw_collections = []
def _filter_collections(self, collections, kwargs): # Keeps only collections that match target entities, and also removes # those keys from the kwargs dict. kwargs = kwargs.copy() valid_ents = {'task', 'subject', 'session', 'run'} entities = {k: kwargs.pop(k) for k in dict(kwargs) if k in valid_ents} collections = [c for c in collections if matches_entities(c, entities)] return (collections, kwargs) def _group_objects_by_entities(self, objects): # Group list of objects into bins defined by all entities at current # Step level or higher. E.g., if the level is 'subject', the # returned list will have one element per subject, where each element # is a list containing all objects that belongs to that subject. Any # object with a defined .entities attribute is groupable. if self.level == 'dataset': return {'dataset': objects} groups = OrderedDict() valid_ents = ['subject', 'session', 'task', 'run'] valid_ents = valid_ents[:(valid_ents.index(self.level) + 1)] for o in objects: key = {k: v for k, v in o.entities.items() if k in valid_ents} key = tuple(sorted(key.items(), key=str)) if key not in groups: groups[key] = [] groups[key].append(o) return groups def _merge_contrast_inputs(self, inputs): """ Merges a list of ContrastInfo tuples and constructs a dict mapping from units of the current level to BIDSVariableCollections. Parameters ---------- inputs: [[ContrastInfo]] List of list of ContrastInfo tuples passed from the previous Step. Each element in the outer list maps to the output of a unit at the previous level; each element in the inner list is a ContrastInfo tuple. E.g., if contrast information is being passed from run-level to subject-level, each outer element is a run. Returns ------- A dictionary, where the keys are the values of the entities at the current level (e.g., '01', '02'...) and the values are BIDSVariableCollection containing contrast information. Notes ----- Each output BIDSVariableCollection contains information for a single unit at the present level. The variables in the collection reflect the union of all contrasts found in one or more of the inputs. A value of 1 indicates that the contrast is present for a given row in the input; 0 indicates that the contrast was missing. """ groups = self._group_objects_by_entities(inputs) ent_cols = list(list(groups.values())[0][0].entities.keys()) collections = {} for name, contrasts in groups.items(): # Create a DF with contrasts in rows and contrast names in columns data = [{**c.entities, **{c.name: 1}} for c in contrasts] data = pd.DataFrame.from_records(data) # Group by all entities and sum, collapsing over rows belonging # to the current unit data = data.groupby(ent_cols).sum() # Split the DF up into separate data and entities DFs entities = data.index.to_frame(index=False) data = data.reset_index(drop=True) # Construct the collection coll = BIDSVariableCollection.from_df(data, entities, self.level) collections[name] = coll return collections
[docs] def setup(self, inputs=None, **kwargs): """Set up the Step. Processes inputs from previous step, combines it with currently loaded data, and applies transformations to produce a design matrix-ready set of variable collections. Parameters ---------- inputs : list Optional list of BIDSVariableCollections produced as output by the preceding Step in the analysis. If None, uses inputs passed at initialization (if any). kwargs : dict Optional keyword arguments constraining the collections to include. """ inputs = inputs or self.inputs or [] input_grps = self._merge_contrast_inputs(inputs) if inputs else {} # filter on passed selectors and group by unit of current level collections, _ = self._filter_collections(self._raw_collections, kwargs) groups = self._group_objects_by_entities(collections) # Merge in the inputs for key, input_ in input_grps.items(): if key not in groups: groups[key] = [] groups[key].append(input_) # Set up and validate variable lists model = self.model or {} X = model.get('x', []) for grp, colls in groups.items(): coll = merge_collections(colls) colls = tm.TransformerManager().transform(coll, self.transformations) if X: tm.Select(coll, X) self._collections.append(coll)
[docs] def add_collections(self, drop_na=False, **kwargs): """Add BIDSVariableCollections (i.e., predictors) to the current Step. Parameters ---------- drop_na : bool Boolean indicating whether or not to automatically drop events that have a n/a amplitude when reading in data from event files. kwargs : dict Optional keyword arguments to pass onto load_variables. Notes ----- No checking for redundancy is performed, so if load_collections() is invoked multiple times with overlapping selectors, redundant predictors are likely to be stored internally. """ # TODO: remove the scan_length argument entirely once we switch tests # to use the synthetic dataset with image headers. if self.level != 'run': kwargs = kwargs.copy() kwargs.pop('scan_length', None) collections = self.layout.get_collections(self.level, drop_na=drop_na, **kwargs) self._raw_collections.extend(collections)
[docs] def get_collections(self, **filters): """Returns BIDSVariableCollections at the current Step. Parameters ---------- filters : dict Optional keyword filters used to constrain which of the available collections get returned (e.g., passing subject=['01', '02'] will return collections for only subjects '01' and '02'). Returns ------- list of BIDSVariableCollection instances One instance per unit of the current analysis level (e.g., if level='run', each element in the list represents the collection for a single run). """ return self._filter_collections(self._collections, filters)[0]
[docs] def get_contrasts(self, collection, names=None, variables=None): """Return contrast information at this step for the passed collection. Parameters ---------- collection : BIDSVariableCollection The collection to generate/validate contrasts for. names : list Optional list of names of contrasts to return. If None (default), all contrasts are returned. variables : bool Optional list of strings giving the names of design matrix columns to use when generating the matrix of weights. Returns ------- list A list of ContrastInfo namedtuples, one per contrast. Notes ----- The 'variables' argument take precedence over the natural process of column selection. I.e., if a variable shows up in a contrast, but isn't named in variables, it will *not* be included in the result. """ # Verify that there are no invalid columns in the condition_lists all_conds = [c['condition_list'] for c in self.contrasts] all_conds = set(chain(*all_conds)) bad_conds = all_conds - set(collection.variables.keys()) if bad_conds: raise ValueError("Invalid condition names passed in one or more " " contrast condition lists: %s." % bad_conds) # Construct a list of all contrasts, including dummy contrasts contrasts = list(self.contrasts) # Check that all contrasts have unique name contrast_names = [c['name'] for c in contrasts] if len(set(contrast_names)) < len(contrast_names): raise ValueError("One or more contrasts have the same name") contrast_names = list(set(contrast_names)) if self.dummy_contrasts: if 'conditions' in self.dummy_contrasts: conditions = [c for c in self.dummy_contrasts['conditions'] if c in collection.variables.keys()] else: conditions = collection.variables.keys() for col_name in conditions: if col_name not in contrast_names: contrasts.append({ 'name': col_name, 'condition_list': [col_name], 'weights': [1], 'type': self.dummy_contrasts['type'] }) # Filter on desired contrast names if passed if names is not None: contrasts = [c for c in contrasts if c['name'] in names] def setup_contrast(c): weights = np.atleast_2d(c['weights']) weights = pd.DataFrame(weights, columns=c['condition_list']) # If variables were explicitly passed, use them as the columns if variables is not None: var_df = pd.DataFrame(columns=variables) weights = pd.concat([weights, var_df], sort=True)[variables].fillna(0) test_type = c.get('type', ('t' if len(weights) == 1 else 'F')) return ContrastInfo(c['name'], weights, test_type, collection.entities) return [setup_contrast(c) for c in contrasts]
[docs] def get_model_spec(self, collection, sampling_rate='TR'): """Get a ModelSpec instance for the passed collection. Parameters ---------- collection : BIDSVariableCollection The BIDSVariableCollection to construct a model for. sampling_rate : {'TR', 'highest'} or float For run-level models, the sampling rate at which to generate the design matrix. When 'TR', the repetition time is used, if available, to select the sampling rate (1/TR). When 'highest', all variables are resampled to the highest sampling rate of any variable in the collection. The sampling rate may also be specified explicitly in Hz. Has no effect on non-run-level collections. Returns ------- A bids.analysis.model_spec.ModelSpec instance. Notes ----- If the passed BIDSVariableCollection contains any sparse variables, they will be automatically converted to dense (using the specified sampling rate) before the ModelSpec is constructed. For non-run-level collections, timing is irrelevant, so the design matrix is constructed based on the "as-is" values found in each variable. """ if self.model is None: raise ValueError("Cannot generate a ModelSpec instance; no " "BIDS-StatsModels model specification found " "for this step!") if collection.level == 'run': collection = collection.resample(sampling_rate, force_dense=True) return create_model_spec(collection, self.model)
ContrastInfo = namedtuple('ContrastInfo', ('name', 'weights', 'type', 'entities'))