Source code for bids.layout.writing

"""
Contains helper functions that involve writing operations.
"""

import warnings
import os
import re
import sys
from string import Formatter
from itertools import product
from ..utils import splitext, listify
from os.path import join, dirname, exists, islink, isabs, isdir

__all__ = ['build_path', 'write_contents_to_file']

_PATTERN_FIND = re.compile(r'({([\w\d]*?)(?:<([^>]+)>)?(?:\|((?:\.?[\w])+))?\})')


[docs]def build_path(entities, path_patterns, strict=False): """ Constructs a path given a set of entities and a list of potential filename patterns to use. Parameters ---------- entities : :obj:`dict` A dictionary mapping entity names to entity values. Entities with ``None`` or empty-string value will be removed. Otherwise, entities will be cast to string values, therefore if any format is expected (e.g., zero-padded integers), the value should be formatted. path_patterns : :obj:`str` or :obj:`list` One or more filename patterns to write the file to. Entities should be represented by the name surrounded by curly braces. Optional portions of the patterns should be denoted by square brackets. Entities that require a specific value for the pattern to match can pass them inside angle brackets. Default values can be assigned by specifying a string after the pipe operator. E.g., (e.g., {type<image>|bold} would only match the pattern if the entity 'type' was passed and its value is "image", otherwise the default value "bold" will be used). strict : :obj:`bool` If True, all passed entities must be matched inside a pattern in order to be a valid match. If False, extra entities will be ignored so long as all mandatory entities are found. Returns ------- A constructed path for this file based on the provided patterns, or ``None`` if no path was built given the combination of entities and patterns. Examples -------- >>> entities = { ... 'extension': 'nii', ... 'space': 'MNI', ... 'subject': '001', ... 'suffix': 'inplaneT2', ... } >>> patterns = ['sub-{subject}[/ses-{session}]/anat/sub-{subject}[_ses-{session}]' ... '[_acq-{acquisition}][_ce-{ceagent}][_rec-{reconstruction}]_' ... '{suffix<T[12]w|T1rho|T[12]map|T2star|FLAIR|FLASH|PDmap|PD|PDT2|' ... 'inplaneT[12]|angio>}.{extension<nii|nii.gz|json>|nii.gz}', ... 'sub-{subject}[/ses-{session}]/anat/sub-{subject}[_ses-{session}]' ... '[_acq-{acquisition}][_ce-{ceagent}][_rec-{reconstruction}]' ... '[_space-{space}][_desc-{desc}]_{suffix<T1w|T2w|T1rho|T1map|T2map|' ... 'T2star|FLAIR|FLASH|PDmap|PD|PDT2|inplaneT[12]|angio>}.' ... '{extension<nii|nii.gz|json>|nii.gz}'] >>> build_path(entities, patterns) 'sub-001/anat/sub-001_inplaneT2.nii' >>> build_path(entities, patterns, strict=True) 'sub-001/anat/sub-001_space-MNI_inplaneT2.nii' >>> entities['space'] = None >>> build_path(entities, patterns, strict=True) 'sub-001/anat/sub-001_inplaneT2.nii' >>> # If some entity is set to None, they are dropped >>> entities['extension'] = None >>> build_path(entities, patterns, strict=True) 'sub-001/anat/sub-001_inplaneT2.nii.gz' >>> # If some entity is set to empty-string, they are dropped >>> entities['extension'] = '' >>> build_path(entities, patterns, strict=True) 'sub-001/anat/sub-001_inplaneT2.nii.gz' >>> # If some selector is not in the pattern, skip it... >>> entities['datatype'] = 'anat' >>> build_path(entities, patterns) 'sub-001/anat/sub-001_inplaneT2.nii.gz' >>> # ... unless the pattern should be strictly matched >>> entities['datatype'] = 'anat' >>> build_path(entities, patterns, strict=True) is None True >>> # If the value of an entity is not valid, do not match the pattern >>> entities['suffix'] = 'bold' >>> build_path(entities, patterns) is None True >>> entities = { ... 'extension': 'bvec', ... 'subject': '001', ... } >>> patterns = ( ... "sub-{subject}[/ses-{session}]/{datatype|dwi}/sub-{subject}[_ses-{session}]" ... "[_acq-{acquisition}]_{suffix|dwi}.{extension<bval|bvec|json|nii.gz|nii>|nii.gz}" ... ) >>> build_path(entities, patterns, strict=True) 'sub-001/dwi/sub-001_dwi.bvec' >>> # Lists of entities are expanded >>> entities = { ... 'extension': 'bvec', ... 'subject': ['%02d' % i for i in range(1, 4)], ... } >>> build_path(entities, patterns, strict=True) ['sub-01/dwi/sub-01_dwi.bvec', 'sub-02/dwi/sub-02_dwi.bvec', 'sub-03/dwi/sub-03_dwi.bvec'] """ path_patterns = listify(path_patterns) # Drop None and empty-strings, keep zeros, and listify entities = {k: listify(v) for k, v in entities.items() if v or v == 0} # One less source of confusion if 'extension' in entities: entities['extension'] = [e.lstrip('.') for e in entities['extension']] # Loop over available patherns, return first one that matches all for pattern in path_patterns: entities_matched = list(_PATTERN_FIND.findall(pattern)) defined = [e[1] for e in entities_matched] # If strict, all entities must be contained in the pattern if strict: if set(entities.keys()) - set(defined): continue # Iterate through the provided path patterns new_path = pattern # Expand options within valid values and # check whether entities provided have acceptable value tmp_entities = entities.copy() # Do not modify the original query for fmt, name, valid, defval in entities_matched: valid_expanded = [v for val in valid.split('|') if val for v in _expand_options(val)] if valid_expanded and defval and defval not in valid_expanded: warnings.warn( 'Pattern "%s" is inconsistent as it defines an invalid default value.' % fmt ) if ( valid_expanded and name in entities and set(entities[name]) - set(valid_expanded) ): continue if defval and name not in tmp_entities: tmp_entities[name] = [defval] # At this point, valid & default values are checked & set - simplify pattern new_path = new_path.replace(fmt, '{%s}' % name) optional_patterns = re.findall(r'(\[.*?\])', new_path) # Optional patterns with selector are cast to mandatory or removed for op in optional_patterns: for ent_name in {k for k, v in entities.items() if v is not None}: if ('{%s}' % ent_name) in op: new_path = new_path.replace(op, op[1:-1]) continue # Surviving optional patterns are removed new_path = new_path.replace(op, '') # Replace entities fields = {pat[1] for pat in Formatter().parse(new_path) if pat[1] and not pat[1].isdigit()} if fields - set(tmp_entities.keys()): continue tmp_entities = {k: v for k, v in tmp_entities.items() if k in fields} new_path = [ new_path.format(**e) for e in _expand_entities(tmp_entities) ] if new_path: if len(new_path) == 1: new_path = new_path[0] return new_path return None
[docs]def write_contents_to_file(path, contents=None, link_to=None, content_mode='text', root=None, conflicts='fail'): """ Uses provided filename patterns to write contents to a new path, given a corresponding entity map. Parameters ---------- path : str Destination path of the desired contents. contents : str Raw text or binary encoded string of contents to write to the new path. link_to : str Optional path with which to create a symbolic link to. Used as an alternative to and takes priority over the contents argument. content_mode : {'text', 'binary'} Either 'text' or 'binary' to indicate the writing mode for the new file. Only relevant if contents is provided. root : str Optional root directory that all patterns are relative to. Defaults to current working directory. conflicts : {'fail', 'skip', 'overwrite', 'append'} One of 'fail', 'skip', 'overwrite', or 'append' that defines the desired action when the output path already exists. 'fail' raises an exception; 'skip' does nothing; 'overwrite' overwrites the existing file; 'append' adds a suffix to each file copy, starting with 1. Default is 'fail'. """ if root is None and not isabs(path): root = os.getcwd() if root: path = join(root, path) if exists(path) or islink(path): if conflicts == 'fail': msg = 'A file at path {} already exists.' raise ValueError(msg.format(path)) elif conflicts == 'skip': msg = 'A file at path {} already exists, skipping writing file.' warnings.warn(msg.format(path)) return elif conflicts == 'overwrite': if isdir(path): warnings.warn('New path is a directory, not going to ' 'overwrite it, skipping instead.') return os.remove(path) elif conflicts == 'append': i = 1 while i < sys.maxsize: path_splits = splitext(path) path_splits[0] = path_splits[0] + '_%d' % i appended_filename = os.extsep.join(path_splits) if not exists(appended_filename) and \ not islink(appended_filename): path = appended_filename break i += 1 else: raise ValueError('Did not provide a valid conflicts parameter') if not exists(dirname(path)): os.makedirs(dirname(path)) if link_to: os.symlink(link_to, path) elif contents: mode = 'wb' if content_mode == 'binary' else 'w' with open(path, mode) as f: f.write(contents) else: raise ValueError('One of contents or link_to must be provided.')
def _expand_options(value): """ Expand optional substrings of valid entity values. Examples -------- >>> _expand_options('[Jj]son[12]') ['Json1', 'Json2', 'json1', 'json2'] >>> _expand_options('json') ['json'] """ expand_patterns = re.findall(r'\[(.*?)\]', value) if not expand_patterns: return [value] value = re.sub(r'\[(.*?)\]', '%s', value) return [value % _r for _r in product(*expand_patterns)] def _expand_entities(entities): """ Generate multiple replacement queries based on all combinations of values. Examples -------- >>> entities = {'subject': ['01', '02'], 'session': ['1', '2'], 'task': ['rest', 'finger']} >>> out = _expand_entities(entities) >>> len(out) 8 >>> {'subject': '01', 'session': '1', 'task': 'rest'} in out True >>> {'subject': '02', 'session': '1', 'task': 'rest'} in out True >>> {'subject': '01', 'session': '2', 'task': 'rest'} in out True >>> {'subject': '02', 'session': '2', 'task': 'rest'} in out True >>> {'subject': '01', 'session': '1', 'task': 'finger'} in out True >>> {'subject': '02', 'session': '1', 'task': 'finger'} in out True >>> {'subject': '01', 'session': '2', 'task': 'finger'} in out True >>> {'subject': '02', 'session': '2', 'task': 'finger'} in out True """ keys = list(entities.keys()) values = list(product(*[entities[k] for k in keys])) return [{k: v for k, v in zip(keys, combs)} for combs in values]