Source code for bids.layout.index

"""File-indexing functionality. """

import os
import json
from collections import defaultdict
from bids_validator import BIDSValidator
from .models import Config, Entity, Tag, FileAssociation
from ..utils import listify, make_bidsfile


def _extract_entities(bidsfile, entities):
    match_vals = {}
    for e in entities.values():
        m = e.match_file(bidsfile)
        if m is None and e.mandatory:
            break
        if m is not None:
            match_vals[e.name] = (e, m)
    return match_vals


def _check_path_matches_patterns(path, patterns):
    """Check if the path matches at least one of the provided patterns. """
    if not patterns:
        return False
    path = os.path.abspath(path)
    for patt in patterns:
        if isinstance(patt, str):
            if path == patt:
                return True
        elif patt.search(path):
            return True
    return False


[docs]class BIDSLayoutIndexer(object): """ Indexer class for BIDSLayout. Args: layout (BIDSLayout): The BIDSLayout to index. """
[docs] def __init__(self, layout): self.layout = layout self.session = layout.session self.validate = layout.validate self.root = layout.root self.config_filename = layout.config_filename self.validator = BIDSValidator(index_associated=True) # Create copies of list attributes we'll modify during indexing self.config = list(layout.config.values()) self.include_patterns = list(layout.force_index) self.exclude_patterns = list(layout.ignore)
def _validate_dir(self, d, default=None): if _check_path_matches_patterns(d, self.include_patterns): return True if _check_path_matches_patterns(d, self.exclude_patterns): return False return default def _validate_file(self, f, default=None): # Inclusion takes priority over exclusion if _check_path_matches_patterns(f, self.include_patterns): return True if _check_path_matches_patterns(f, self.exclude_patterns): return False # If inclusion/exclusion is inherited from a parent directory, that # takes precedence over the remaining file-level rules if default is not None: return default # Derivatives are currently not validated. # TODO: raise warning the first time in a session this is encountered if not self.validate or 'derivatives' in self.layout.config: return True # BIDS validator expects absolute paths, but really these are relative # to the BIDS project root. to_check = os.path.relpath(f, self.root) to_check = os.path.join(os.path.sep, to_check) return self.validator.is_bids(to_check) def _index_dir(self, path, config, default_action=None): abs_path = os.path.join(self.root, path) # Derivative directories must always be added separately # and passed as their own root, so terminate if passed. if abs_path.startswith(os.path.join(self.root, 'derivatives')): return config = list(config) # Shallow copy # Check for additional config file in directory layout_file = self.config_filename config_file = os.path.join(abs_path, layout_file) if os.path.exists(config_file): cfg = Config.load(config_file, session=self.session) config.append(cfg) # Track which entities are valid in filenames for this directory config_entities = {} for c in config: config_entities.update(c.entities) for (dirpath, dirnames, filenames) in os.walk(path): # Set the default inclusion/exclusion directive default = self._validate_dir(dirpath, default=default_action) # If layout configuration file exists, delete it if self.config_filename in filenames: filenames.remove(self.config_filename) for f in filenames: bf = self._index_file(f, dirpath, config_entities, default_action=default) if bf is None: continue self.session.commit() # Recursively index subdirectories for d in dirnames: d = os.path.join(dirpath, d) self._index_dir(d, list(config), default_action=default) # prevent subdirectory traversal break def _index_file(self, f, dirpath, entities, default_action=None): """Create DB record for file and its tags. """ abs_fn = os.path.join(dirpath, f) # Skip files that fail validation, unless forcibly indexing if not self._validate_file(abs_fn, default=default_action): return None bf = make_bidsfile(abs_fn) self.session.add(bf) # Extract entity values match_vals = {} for e in entities.values(): m = e.match_file(bf) if m is None and e.mandatory: break if m is not None: match_vals[e.name] = (e, m) # Create Entity <=> BIDSFile mappings if match_vals: for _, (ent, val) in match_vals.items(): tag = Tag(bf, ent, str(val), ent._dtype) self.session.add(tag) return bf
[docs] def index_files(self): """Index all files in the BIDS dataset. """ self._index_dir(self.root, self.config)
[docs] def index_metadata(self): """Index metadata for all files in the BIDS dataset. """ # Process JSON files first if we're indexing metadata all_files = self.layout.get(absolute_paths=True) # Track ALL entities we've seen in file names or metadatas all_entities = {} for c in self.config: all_entities.update(c.entities) # If key/value pairs in JSON files duplicate ones extracted from files, # we can end up with Tag collisions in the DB. To prevent this, we # store all filename/entity pairs and the value, and then check against # that before adding each new Tag. all_tags = {} for t in self.session.query(Tag).all(): key = '{}_{}'.format(t.file_path, t.entity_name) all_tags[key] = str(t.value) # We build up a store of all file data as we iterate files. It looks # like: { extension/suffix: dirname: [(entities, payload)]}}. # The payload is left empty for non-JSON files. file_data = {} for bf in all_files: file_ents = bf.entities.copy() suffix = file_ents.pop('suffix', None) ext = file_ents.pop('extension', None) if suffix is not None and ext is not None: key = "{}/{}".format(ext, suffix) if key not in file_data: file_data[key] = defaultdict(list) if ext == 'json': with open(bf.path, 'r') as handle: try: payload = json.load(handle) except json.JSONDecodeError as e: msg = ("Error occurred while trying to decode JSON" " from file '{}'.".format(bf.path)) raise IOError(msg) from e else: payload = None to_store = (file_ents, payload, bf.path) file_data[key][bf.dirname].append(to_store) # To avoid integrity errors, track primary keys we've seen seen_assocs = set() def create_association_pair(src, dst, kind, kind2=None): kind2 = kind2 or kind pk1 = '#'.join([src, dst, kind]) if pk1 not in seen_assocs: self.session.add(FileAssociation(src=src, dst=dst, kind=kind)) seen_assocs.add(pk1) pk2 = '#'.join([dst, src, kind2]) if pk2 not in seen_assocs: self.session.add(FileAssociation(src=dst, dst=src, kind=kind2)) seen_assocs.add(pk2) # TODO: Efficiency of everything in this loop could be improved filenames = [bf for bf in all_files if not bf.path.endswith('.json')] for bf in filenames: file_ents = bf.entities.copy() suffix = file_ents.pop('suffix', None) ext = file_ents.pop('extension', None) file_ent_keys = set(file_ents.keys()) if suffix is None or ext is None: continue # Extract metadata associated with the file. The idea is # that we loop over parent directories, and if we find # payloads in the file_data store (indexing by directory # and current file suffix), we check to see if the # candidate JS file's entities are entirely consumed by # the current file. If so, it's a valid candidate, and we # add the payload to the stack. Finally, we invert the # stack and merge the payloads in order. ext_key = "{}/{}".format(ext, suffix) json_key = "json/{}".format(suffix) dirname = bf.dirname payloads = [] ancestors = [] while True: # Get JSON payloads json_data = file_data.get(json_key, {}).get(dirname, []) for js_ents, js_md, js_path in json_data: js_keys = set(js_ents.keys()) if js_keys - file_ent_keys: continue matches = [js_ents[name] == file_ents[name] for name in js_keys] if all(matches): payloads.append((js_md, js_path)) # Get all files this file inherits from candidates = file_data.get(ext_key, {}).get(dirname, []) for ents, _, path in candidates: keys = set(ents.keys()) if keys - file_ent_keys: continue matches = [ents[name] == file_ents[name] for name in keys] if all(matches): ancestors.append(path) parent = os.path.dirname(dirname) if parent == dirname: break dirname = parent if not payloads: continue # Create DB records for metadata associations js_file = payloads[-1][1] create_association_pair(js_file, bf.path, 'Metadata') # Consolidate metadata by looping over inherited JSON files file_md = {} for pl, js_file in payloads[::-1]: file_md.update(pl) # Create FileAssociation records for JSON inheritance n_pl = len(payloads) for i, (pl, js_file) in enumerate(payloads): if (i + 1) < n_pl: other = payloads[i + 1][1] create_association_pair(js_file, other, 'Child', 'Parent') # Inheritance for current file n_pl = len(ancestors) for i, src in enumerate(ancestors): if (i + 1) < n_pl: dst = ancestors[i + 1] create_association_pair(src, dst, 'Child', 'Parent') # Files with IntendedFor field always get mapped to targets intended = listify(file_md.get('IntendedFor', [])) for target in intended: # Per spec, IntendedFor paths are relative to sub dir. target = os.path.join( self.root, 'sub-{}'.format(bf.entities['subject']), target) create_association_pair(bf.path, target, 'IntendedFor', 'InformedBy') # Link files to BOLD runs if suffix in ['physio', 'stim', 'events', 'sbref']: images = self.layout.get( extension=['nii', 'nii.gz'], suffix='bold', return_type='filename', **file_ents) for img in images: create_association_pair(bf.path, img, 'IntendedFor', 'InformedBy') # Link files to DWI runs if suffix == 'sbref' or ext in ['bvec', 'bval']: images = self.layout.get( extension=['nii', 'nii.gz'], suffix='dwi', return_type='filename', **file_ents) for img in images: create_association_pair(bf.path, img, 'IntendedFor', 'InformedBy') # Create Tag <-> Entity mappings, and any newly discovered Entities for md_key, md_val in file_md.items(): tag_string = '{}_{}'.format(bf.path, md_key) # Skip pairs that were already found in the filenames if tag_string in all_tags: file_val = all_tags[tag_string] if str(md_val) != file_val: msg = ( "Conflicting values found for entity '{}' in " "filename {} (value='{}') versus its JSON sidecar " "(value='{}'). Please reconcile this discrepancy." ) raise ValueError(msg.format(md_key, bf.path, file_val, md_val)) continue if md_key not in all_entities: all_entities[md_key] = Entity(md_key, is_metadata=True) self.session.add(all_entities[md_key]) tag = Tag(bf, all_entities[md_key], md_val) self.session.add(tag) if len(self.session.new) >= 1000: self.session.commit() self.session.commit()