Source code for ycsettings.settings

__all__ = ['Settings', 'parse_n_jobs', 'MissingSettingException']


from collections import OrderedDict, Mapping
import configparser
import importlib
from io import TextIOWrapper
from itertools import chain
import json
import logging
from multiprocessing import cpu_count
import os
import pickle
import re
from tempfile import NamedTemporaryFile
from urllib.parse import ParseResult, urlparse
import warnings

from uriutils import uri_open

try: import yaml
except ImportError: pass


logger = logging.getLogger(__name__)


[docs]class Settings(Mapping): """ This class manages the lookup and prioritizing of setting variables in multiple different sources. Supported sources include: * Environment (``env``): The OS environment, i.e., :attr:`os.environ` * URI/files: Handles different file types including: JSON, YAML, and INI * Python modules: Python modules similar to Django settings module; it can be a ``.py`` file or module path * Dictionary-like objects: Objects with the ``items`` attribute * Arbitrary objects: All ``__dict__`` entries not starting with ``__`` are used as settings If ``settings_uri`` is found in the environment (``SETTINGS_URI``), dictionary, or arbitrary object, it will also load the corresponding settings URI in addition to the object itself. This way, it can autoload from ``SETTINGS_URI`` in the environment and :class:`argparse.Namespace`. For example, .. code-block:: python parser = ArgumentParser(description='Hello World!') parser.add_argument('settings_uri', type=str, metavar='<config_file>', help='Positional option') A = parser.parse_args() settings = Settings(A, warn_missing=False) The file specified in ``A.settings_uri`` will be loaded. """
[docs] def __init__(self, *sources, search_first=['env', 'env_settings_uri'], case_sensitive=False, raise_exception=False, warn_missing=False, env_settings_uri_keys=['SETTINGS_URI'], dict_settings_uri_keys=['settings', 'settings_uri'], object_settings_uri_keys=['settings', 'settings_uri']): """ Initializes the :class:`Settings` object. :param list sources: list of sources to search for settings :param list search_first: list of sources which will be searched first before any other sources specified in ``sources``. :param bool case_sensitive: whether to make case sensitive comparisons for settings key :param bool raise_exception: whether to raise a :exc:`MissingSettingException` exception when the setting is not found :param bool warn_missing: whether to display a warning when the setting is not found :param str env_settings_uri_keys: keys to find settings in the environment; if multiple keys are found, they'll all be used :param str dict_settings_uri_keys: keys to find settings in a :func:`dict`-like object; if multiple keys are found, they'll all be used :param str object_settings_uri_keys: keys to find settings in an arbitrary object; if multiple keys are found, they'll all be used """ self.case_sensitive = case_sensitive self.raise_exception = raise_exception self.warn_missing = warn_missing self.env_settings_uri_keys = env_settings_uri_keys self.dict_settings_uri_keys = dict_settings_uri_keys self.object_settings_uri_keys = object_settings_uri_keys self._cache = {} self._settings = OrderedDict() self._union_keys = None for source in chain(search_first, filter(None, sources)): for name, settings in self._load_settings_from_source(source): if not settings: continue if name in self._settings: warnings.warn('{} appeared more than once in the settings priority list.'.format(name)) self._settings[name] = settings
#end for #end for #end def def _load_settings_from_source(self, source): """ Loads the relevant settings from the specified ``source``. :returns: a standard :func:`dict` containing the settings from the source :rtype: dict """ if not source: pass elif source == 'env_settings_uri': for env_settings_uri_key in self.env_settings_uri_keys: env_settings_uri = self._search_environ(env_settings_uri_key) if env_settings_uri: logger.debug('Found {} in the environment.'.format(env_settings_uri_key)) yield env_settings_uri, self._load_settings_from_uri(env_settings_uri) #end if #end for elif source == 'env': logger.debug('Loaded {} settings from the environment.'.format(len(os.environ))) yield source, dict(os.environ.items()) elif isinstance(source, ParseResult): settings = self._load_settings_from_uri(source) yield source, settings elif isinstance(source, str): try: spec = importlib.util.find_spec(source) except (AttributeError, ModuleNotFoundError): spec = None settings = self._load_settings_from_spec(spec, name=source) if settings is None: _, ext = os.path.splitext(source) with uri_open(source, 'rb') as f: yield source, self._load_settings_from_file(f, ext=ext) else: yield source, settings #end if elif hasattr(source, 'read'): yield source.name, self._load_settings_from_file(source) elif hasattr(source, 'items'): source_type = type(source).__name__ for dict_settings_uri_key in self.dict_settings_uri_keys: if dict_settings_uri_key and dict_settings_uri_key in source and source[dict_settings_uri_key]: logger.debug('Found {} in the dict-like object <{}>.'.format(dict_settings_uri_key, source_type)) yield from self._load_settings_from_source(source[dict_settings_uri_key]) #end if #end for logger.debug('Loaded {} settings from dict-like object <{}>.'.format(len(source), source_type)) yield self._get_unique_name(source_type), source else: source_type = type(source).__name__ for object_settings_uri_key in self.object_settings_uri_keys: if object_settings_uri_key and hasattr(source, object_settings_uri_key) and getattr(source, object_settings_uri_key): logger.debug('Found {} in the object <{}>.'.format(object_settings_uri_key, source_type)) yield from self._load_settings_from_source(getattr(source, object_settings_uri_key)) #end if #end for settings = dict((k, v) for k, v in source.__dict__.items() if not k.startswith('__')) logger.debug('Loaded {} settings from object <{}>.'.format(len(settings), source_type)) yield self._get_unique_name(source_type), settings #end if #end def def _get_unique_name(self, prefix): i = 0 name = '{}_{}'.format(prefix, i) while name in self._settings: i += 1 name = '{}_{}'.format(prefix, i) #end while return name #end def def _load_settings_from_spec(self, spec, name=None): if spec is None: return None mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) settings = dict((k, v) for k, v in mod.__dict__.items() if not k.startswith('__')) if name: logger.debug('Loaded {} settings from Python module <{}>.'.format(len(settings), name)) return settings #end def def _load_settings_from_uri(self, uri): _, ext = os.path.splitext(uri) with uri_open(uri) as f: settings = self._load_settings_from_file(f, ext=ext) logger.debug('Loaded {} settings from URI <{}>.'.format(len(settings), uri)) return settings #end def def _load_settings_from_file(self, f, ext=None): if ext is None or ext == '.gz': name = f.name[:-3] if f.name.endswith('.gz') else f.name basename, ext = os.path.splitext(name) #end if ext = ext.lower() ext_type = ext[1:].upper() if ext in ('.json', '.js'): d = json.load(f) elif ext == '.yaml': d = yaml.load(f) elif ext in ('.pkl', '.pickle'): d = pickle.load(f) elif ext in ['.ini']: config = configparser.ConfigParser() config.read_file(TextIOWrapper(f)) d = dict((name, value) for section in config.sections() for name, value in config.items(section)) elif ext in ['.py']: temp_fname = None ext_type = 'Python module' try: with NamedTemporaryFile(mode='wb', suffix='.py', delete=False) as g: g.write(f.read()) temp_fname = g.name #end with d = self._load_settings_from_spec(importlib.util.spec_from_file_location('settings_module', os.path.abspath(temp_fname))) finally: os.remove(temp_fname) else: raise ValueError('Unknown settings file format: {}'.format(ext)) if d is None: d = {} logger.debug('Loaded {} {} settings from <{}>.'.format(len(d), ext_type, f.name)) return d #end def
[docs] def get(self, key, *, default=None, cast_func=None, case_sensitive=None, raise_exception=None, warn_missing=None, use_cache=True, additional_sources=[]): """ Gets the setting specified by ``key``. For efficiency, we cache the retrieval of settings to avoid multiple searches through the sources list. :param str key: settings key to retrieve :param str default: use this as default value when the setting key is not found :param func cast_func: cast the value of the settings using this function :param bool case_sensitive: whether to make case sensitive comparisons for settings key :param bool raise_exception: whether to raise a :exc:`MissingSettingException` exception when the setting is not found :param bool warn_missing: whether to display a warning when the setting is not found :param list additional_sources: additional sources to search for the key; note that the values obtained here could be cached in a future call :returns: the setting value :rtype: str """ case_sensitive = self.case_sensitive if case_sensitive is None else case_sensitive raise_exception = self.raise_exception if raise_exception is None else raise_exception warn_missing = self.warn_missing if warn_missing is None else warn_missing if not case_sensitive: key = key.lower() if use_cache and key in self._cache: return cast_func(self._cache[key]) if cast_func else self._cache[key] found, value = False, None for source, settings in chain(self._settings.items(), map(self._load_settings_from_source, additional_sources)): if case_sensitive: if key in settings: found = True value = settings[key] else: continue else: possible_keys = [k for k in settings.keys() if k.lower() == key] if not possible_keys: continue else: if len(possible_keys) > 1: warnings.warn('There are more than one possible value for "{}" in <{}> settings due to case insensitivity.'.format(key, source)) found = True value = settings[possible_keys[0]] #end if #end if if found: break #end for if not found: if raise_exception: raise MissingSettingException('The "{}" setting is missing.'.format(key)) if warn_missing: warnings.warn('The "{}" setting is missing.'.format(key)) return default #end if if use_cache: self._cache[key] = value if cast_func: value = cast_func(value) return value
#end def
[docs] def getbool(self, key, **kwargs): """ Gets the setting value as a :func:`bool` by cleverly recognizing true values. :rtype: bool """ def _string_to_bool(s): if isinstance(s, str): if s.strip().lower() in ('true', 't', '1'): return True elif s.strip().lower() in ('false', 'f', '0', 'None', 'null', ''): return False raise ValueError('Unable to get boolean value of "{}".'.format(s)) #end if return bool(s) #end def return self.get(key, cast_func=_string_to_bool, **kwargs)
#end def
[docs] def getint(self, key, **kwargs): """ Gets the setting value as a :obj:`int`. :rtype: int """ return self.get(key, cast_func=int, **kwargs)
#end def
[docs] def getfloat(self, key, **kwargs): """ Gets the setting value as a :obj:`float`. :rtype: float """ return self.get(key, cast_func=float, **kwargs)
#end def
[docs] def getdict(self, key, **kwargs): """ Gets the setting value as a :obj:`dict`. :rtype: dict """ return self.getserialized(key, **kwargs)
#end def
[docs] def getserialized(self, key, decoder_func=None, **kwargs): """ Gets the setting value as a :obj:`dict` or :obj:`list` trying :meth:`json.loads`, followed by :meth:`yaml.load`. :rtype: dict, list """ value = self.get(key, cast_func=None, **kwargs) if isinstance(value, (dict, list, tuple)) or value is None: return value if decoder_func: return decoder_func(value) try: o = json.loads(value) return o except json.decoder.JSONDecodeError: pass try: o = yaml.load(value) return o except yaml.parser.ParserError: pass raise ValueError('Unable to parse {} setting using JSON or YAML.'.format(key))
#end def
[docs] def geturi(self, key, **kwargs): """ Gets the setting value as a :class:`urllib.parse.ParseResult`. :rtype: urllib.parse.ParseResult """ return self.get(key, cast_func=urlparse, **kwargs)
#end def
[docs] def getlist(self, key, delimiter=',', **kwargs): """ Gets the setting value as a :class:`list`; it splits the string using ``delimiter``. :param str delimiter: split the value using this delimiter :rtype: list """ value = self.get(key, **kwargs) if value is None: return value if isinstance(value, str): value = value.strip() if value.startswith('[') and value.endswith(']'): return self.getserialized(key) return [p.strip(' ') for p in value.split(delimiter)] #end if return list(value)
#end def
[docs] def getnjobs(self, key, **kwargs): """ Gets the setting value as an integer relative to the number of CPU. See :func:`ycsettings.settings.parse_n_jobs` for parsing rules. :rtype: int """ return self.get(key, cast_func=parse_n_jobs, **kwargs)
#end def def _search_environ(self, key, default=None): key = key.lower() for k, v in os.environ.items(): if k.lower() == key: return v return default #end def def __getitem__(self, key): return self.get(key) def __iter__(self): if self._union_keys is None: keys = set() self._union_keys = [] for source, settings in self._settings.items(): for k, v in settings.items(): k = k if self.case_sensitive else k.lower() if k in keys: continue keys.add(k) self._union_keys.append(k) yield k #end for #end for else: yield from self._union_keys #end if #end def def __len__(self): if self._union_keys is None: [k for k in self.__iter__()] # just to run through the whole thing and build _union_keys return len(self._union_keys)
#end def #end class class MissingSettingException(Exception): pass
[docs]def parse_n_jobs(s): """ This function parses a "math"-like string as a function of CPU count. It is useful for specifying the number of jobs. For example, on an 8-core machine:: assert parse_n_jobs('0.5 * n') == 4 assert parse_n_jobs('2n') == 16 assert parse_n_jobs('n') == 8 assert parse_n_jobs('4') == 4 :param str s: string to parse for number of CPUs """ n_jobs = None N = cpu_count() if isinstance(s, int): n_jobs = s elif isinstance(s, float): n_jobs = int(s) elif isinstance(s, str): m = re.match(r'(\d*(?:\.\d*)?)?(\s*\*?\s*n)?$', s.strip()) if m is None: raise ValueError('Unable to parse n_jobs="{}"'.format(s)) k = float(m.group(1)) if m.group(1) else 1 if m.group(2): n_jobs = k * N elif k < 1: n_jobs = k * N else: n_jobs = int(k) else: raise TypeError('n_jobs argument must be of type str, int, or float.') n_jobs = int(n_jobs) if n_jobs <= 0: warnings.warn('n_jobs={} is invalid. Setting n_jobs=1.'.format(n_jobs)) n_jobs = 1 #end if return int(n_jobs)
#end def