source: rtems-central/rtemsspec/items.py @ 3ab0b6d

Last change on this file since 3ab0b6d was 3ab0b6d, checked in by Sebastian Huber <sebastian.huber@…>, on 12/14/20 at 15:55:36

items: Improve substitution error messages

  • Property mode set to 100644
File size: 23.2 KB
Line 
1# SPDX-License-Identifier: BSD-2-Clause
2""" This module provides specification items and an item cache. """
3
4# Copyright (C) 2019, 2020 embedded brains GmbH (http://www.embedded-brains.de)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25# POSSIBILITY OF SUCH DAMAGE.
26
27from contextlib import contextmanager
28import hashlib
29import os
30import pickle
31import string
32import stat
33from typing import Any, Callable, Dict, Iterable, Iterator, List, NamedTuple, \
34    Optional, Tuple, Union
35import yaml
36
37
38class ItemGetValueContext(NamedTuple):
39    """ Context used to get an item value. """
40    item: "Item"
41    path: str
42    value: Any
43    key: str
44    index: Any  # should be int, but this triggers a mypy error
45
46
47ItemMap = Dict[str, "Item"]
48ItemGetValue = Callable[[ItemGetValueContext], Any]
49ItemGetValueMap = Dict[str, Tuple[ItemGetValue, Any]]
50
51
52def _is_enabled_op_and(enabled: List[str], enabled_by: Any) -> bool:
53    for next_enabled_by in enabled_by:
54        if not is_enabled(enabled, next_enabled_by):
55            return False
56    return True
57
58
59def _is_enabled_op_not(enabled: List[str], enabled_by: Any) -> bool:
60    return not is_enabled(enabled, enabled_by)
61
62
63def _is_enabled_op_or(enabled: List[str], enabled_by: Any) -> bool:
64    for next_enabled_by in enabled_by:
65        if is_enabled(enabled, next_enabled_by):
66            return True
67    return False
68
69
70_IS_ENABLED_OP = {
71    "and": _is_enabled_op_and,
72    "not": _is_enabled_op_not,
73    "or": _is_enabled_op_or
74}
75
76
77def is_enabled(enabled: List[str], enabled_by: Any) -> bool:
78    """ Verifies if the given parameter is enabled by specific enables. """
79    if isinstance(enabled_by, bool):
80        return enabled_by
81    if isinstance(enabled_by, list):
82        return _is_enabled_op_or(enabled, enabled_by)
83    if isinstance(enabled_by, dict):
84        key, value = next(iter(enabled_by.items()))
85        return _IS_ENABLED_OP[key](enabled, value)
86    return enabled_by in enabled
87
88
89def _str_representer(dumper, data):
90    return dumper.represent_scalar("tag:yaml.org,2002:str",
91                                   data,
92                                   style="|" if "\n" in data else "")
93
94
95yaml.add_representer(str, _str_representer)
96
97
98class Link:
99    """ A link to an item. """
100    def __init__(self, item: "Item", data: Any):
101        self._item = item
102        self._data = data
103
104    @classmethod
105    def create(cls, link: "Link", item: "Item") -> "Link":
106        """ Creates a link using an existing link with a new target item. """
107        return cls(item, link._data)  # pylint: disable=protected-access
108
109    def __getitem__(self, name: str) -> Any:
110        return self._data[name]
111
112    def __setitem__(self, key: str, value: Any) -> None:
113        self._data[key] = value
114
115    @property
116    def item(self) -> "Item":
117        """ The item referenced by this link. """
118        return self._item
119
120    @property
121    def role(self) -> str:
122        """ The link role. """
123        return self._data["role"]
124
125
126def _get_value(ctx: ItemGetValueContext) -> Any:
127    value = ctx.value[ctx.key]
128    if ctx.index >= 0:
129        return value[ctx.index]
130    return value
131
132
133def normalize_key_path(key_path: str, prefix: str = "") -> str:
134    """ Normalizes the key path with an optional prefix path. """
135    if not os.path.isabs(key_path):
136        key_path = os.path.join(prefix, key_path)
137    return os.path.normpath(key_path)
138
139
140_TYPES = {
141    type(True): "B".encode("utf-8"),
142    type(1.0): "F".encode("utf-8"),
143    type(1): "I".encode("utf-8"),
144    type(None): "N".encode("utf-8"),
145    type(""): "S".encode("utf-8"),
146}
147
148
149def _hash_data(data, state) -> None:
150    if isinstance(data, list):
151        for value in data:
152            _hash_data(value, state)
153    elif isinstance(data, dict):
154        for key, value in sorted(data.items()):
155            if not key.startswith("_"):
156                state.update(key.encode("utf-8"))
157                _hash_data(value, state)
158    else:
159        state.update(_TYPES[type(data)])
160        state.update(str(data).encode("utf-8"))
161
162
163class Item:
164    """ Objects of this class represent a specification item. """
165
166    # pylint: disable=too-many-public-methods
167    def __init__(self, item_cache: "ItemCache", uid: str, data: Any):
168        self._cache = item_cache
169        self._uid = uid
170        self._data = data
171        self._links_to_parents = []  # type: List[Link]
172        self._links_to_children = []  # type: List[Link]
173
174    def __eq__(self, other: Any) -> bool:
175        if not isinstance(other, Item):
176            return NotImplemented
177        return self._uid == other._uid  # pylint: disable=protected-access
178
179    def __lt__(self, other: Any) -> bool:
180        if not isinstance(other, Item):
181            return NotImplemented
182        return self._uid < other._uid  # pylint: disable=protected-access
183
184    def __hash__(self) -> int:
185        return hash(self._uid)
186
187    def __contains__(self, key: str) -> bool:
188        return key in self._data
189
190    def __getitem__(self, key: str) -> Any:
191        return self._data[key]
192
193    def __setitem__(self, key: str, value: Any) -> None:
194        self._data[key] = value
195
196    @property
197    def cache(self) -> "ItemCache":
198        """ Returns the cache of the items. """
199        return self._cache
200
201    @property
202    def digest(self) -> str:
203        """ Returns the digest of the item data. """
204        state = hashlib.sha512()
205        _hash_data(self._data, state)
206        return state.hexdigest()
207
208    def get(self, key: str, default: Any) -> Any:
209        """
210        Gets the attribute value if the attribute exists, otherwise the
211        specified default value is returned.
212        """
213        return self._data.get(key, default)
214
215    def get_by_normalized_key_path(self, normalized_key_path: str,
216                                   get_value_map: ItemGetValueMap) -> Any:
217        """
218        Gets the attribute value corresponding to the normalized key path.
219        """
220        path = "/"
221        value = self._data
222        for key in normalized_key_path.strip("/").split("/"):
223            parts = key.split("[")
224            try:
225                index = int(parts[1].split("]")[0])
226            except IndexError:
227                index = -1
228            ctx = ItemGetValueContext(self, path, value, parts[0], index)
229            get_value, get_value_map = get_value_map.get(
230                parts[0], (_get_value, {}))
231            value = get_value(ctx)
232            path = os.path.join(path, key)
233        return value
234
235    def get_by_key_path(self, key_path: str, prefix: str = "") -> Any:
236        """ Gets the attribute value corresponding to the key path. """
237        return self.get_by_normalized_key_path(
238            normalize_key_path(key_path, prefix), {})
239
240    @property
241    def uid(self) -> str:
242        """ Returns the UID of the item. """
243        return self._uid
244
245    @property
246    def spec(self) -> str:
247        """ Returns the UID of the item with an URL-like format. """
248        return f"spec:{self._uid}"
249
250    def to_abs_uid(self, abs_or_rel_uid: str) -> str:
251        """
252        Returns the absolute UID of an absolute UID or an UID relative to this
253        item.
254        """
255        if abs_or_rel_uid == ".":
256            return self._uid
257        if os.path.isabs(abs_or_rel_uid):
258            return abs_or_rel_uid
259        return os.path.normpath(
260            os.path.join(os.path.dirname(self.uid), abs_or_rel_uid))
261
262    def map(self, abs_or_rel_uid: str) -> "Item":
263        """
264        Maps the absolute UID or the UID relative to this item to the
265        corresponding item.
266        """
267        return self._cache[self.to_abs_uid(abs_or_rel_uid)]
268
269    def links_to_parents(self) -> Iterator[Link]:
270        """ Yields the links to the parents of this items. """
271        yield from self._links_to_parents
272
273    def parents(
274            self,
275            role: Optional[Union[str,
276                                 Iterable[str]]] = None) -> Iterator["Item"]:
277        """ Yields the parents of this items. """
278        if role is None:
279            for link in self._links_to_parents:
280                yield link.item
281        elif isinstance(role, str):
282            for link in self._links_to_parents:
283                if link.role == role:
284                    yield link.item
285        else:
286            for link in self._links_to_parents:
287                if link.role in role:
288                    yield link.item
289
290    def parent(self,
291               role: Optional[Union[str, Iterable[str]]] = None,
292               index: Optional[int] = 0) -> "Item":
293        """ Returns the parent with the specified role and index. """
294        for item_index, item in enumerate(self.parents(role)):
295            if item_index == index:
296                return item
297        raise IndexError
298
299    def links_to_children(self) -> Iterator[Link]:
300        """ Yields the links to the children of this items. """
301        yield from self._links_to_children
302
303    def children(
304            self,
305            role: Optional[Union[str,
306                                 Iterable[str]]] = None) -> Iterator["Item"]:
307        """ Yields the children of this items. """
308        if role is None:
309            for link in self._links_to_children:
310                yield link.item
311        elif isinstance(role, str):
312            for link in self._links_to_children:
313                if link.role == role:
314                    yield link.item
315        else:
316            for link in self._links_to_children:
317                if link.role in role:
318                    yield link.item
319
320    def child(self,
321              role: Optional[Union[str, Iterable[str]]] = None,
322              index: Optional[int] = 0) -> "Item":
323        """ Returns the child with the specified role and index. """
324        for item_index, item in enumerate(self.children(role)):
325            if item_index == index:
326                return item
327        raise IndexError
328
329    def init_parents(self, item_cache: "ItemCache") -> None:
330        """ Initializes the list of links to parents of this items. """
331        for data in self._data["links"]:
332            try:
333                link = Link(item_cache[self.to_abs_uid(data["uid"])], data)
334                self._links_to_parents.append(link)
335            except KeyError as err:
336                msg = (f"item '{self.uid}' links "
337                       f"to non-existing item '{data['uid']}'")
338                raise KeyError(msg) from err
339
340    def init_children(self) -> None:
341        """ Initializes the list of links to children of this items. """
342        for link in self.links_to_parents():
343            link.item.add_link_to_child(Link.create(link, self))
344
345    def add_link_to_child(self, link: Link):
346        """ Adds a link to a child item of this items. """
347        self._links_to_children.append(link)
348
349    def is_enabled(self, enabled: List[str]):
350        """ Returns true if the item is enabled by the specified enables. """
351        return is_enabled(enabled, self["enabled-by"])
352
353    @property
354    def data(self) -> Any:
355        """ The item data. """
356        return self._data
357
358    @property
359    def file(self) -> str:
360        """ Returns the file of the item. """
361        return self._data["_file"]
362
363    @file.setter
364    def file(self, value: str):
365        """ Sets the file of the item. """
366        self._data["_file"] = value
367
368    @property
369    def type(self) -> str:
370        """ Returns the type of the item. """
371        return self._data["_type"]
372
373    def save(self):
374        """ Saves the item to the corresponding file. """
375        with open(self.file, "w") as dst:
376            data = {}
377            for key, value in self._data.items():
378                if not key.startswith("_"):
379                    data[key] = value
380            dst.write(
381                yaml.dump(data, default_flow_style=False, allow_unicode=True))
382
383    def load(self):
384        """ Loads the item from the corresponding file. """
385        filename = self.file
386        with open(filename, "r") as src:
387            self._data = yaml.safe_load(src.read())
388            self._data["_file"] = filename
389
390
391class ItemTemplate(string.Template):
392    """ String template for item mapper identifiers. """
393    idpattern = "[a-zA-Z0-9._/-]+(:[][a-zA-Z0-9._/-]+)?(|[a-zA-Z0-9_]+)*"
394
395
396class _ItemMapperContext(dict):
397    """ Context to map identifiers to items and attribute values. """
398    def __init__(self, mapper: "ItemMapper", item: Optional[Item],
399                 prefix: Optional[str], recursive: bool):
400        super().__init__()
401        self._mapper = mapper
402        self._item = item
403        self._prefix = prefix
404        self._recursive = recursive
405
406    def __getitem__(self, identifier):
407        item, key_path, value = self._mapper.map(identifier, self._item,
408                                                 self._prefix)
409        if self._recursive:
410            return self._mapper.substitute(value, item,
411                                           os.path.dirname(key_path))
412        return value
413
414
415class ItemMapper:
416    """ Maps identifiers to items and attribute values. """
417    def __init__(self, item: Item, recursive: bool = False):
418        self._item = item
419        self._recursive = recursive
420        self._prefix = [""]
421        self._get_value_map = {}  # type: Dict[str, ItemGetValueMap]
422
423    @property
424    def item(self) -> Item:
425        """ The item of the mapper. """
426        return self._item
427
428    @item.setter
429    def item(self, item: Item) -> None:
430        """ Sets the item of the mapper. """
431        self._item = item
432
433    def add_get_value(self, type_path_key: str,
434                      get_value: ItemGetValue) -> None:
435        """
436        Adds a get value for the specified type and key path.
437        """
438        type_name, path_key = type_path_key.split(":")
439        keys = path_key.strip("/").split("/")
440        get_value_map = self._get_value_map.setdefault(type_name, {})
441        for key in keys[:-1]:
442            _, get_value_map = get_value_map.setdefault(key, (_get_value, {}))
443        get_value_map[keys[-1]] = (get_value, {})
444
445    def push_prefix(self, prefix: str) -> None:
446        """ Pushes a key path prefix. """
447        self._prefix.append(prefix)
448
449    def pop_prefix(self) -> None:
450        """ Pops a key path prefix. """
451        self._prefix.pop()
452
453    @contextmanager
454    def prefix(self, prefix: str) -> Iterator[None]:
455        """ Opens a key path prefix context. """
456        self.push_prefix(prefix)
457        yield
458        self.pop_prefix()
459
460    def get_value_map(self, item: Item) -> ItemGetValueMap:
461        """ Returns the get value map for the item. """
462        return self._get_value_map.get(item.type, {})
463
464    def map(self,
465            identifier: str,
466            item: Optional[Item] = None,
467            prefix: Optional[str] = None) -> Tuple[Item, str, Any]:
468        """
469        Maps an identifier with item and prefix to the corresponding item and
470        attribute value.
471        """
472        uid_key_path, *pipes = identifier.split("|")
473        colon = uid_key_path.find(":")
474        if colon >= 0:
475            uid, key_path = uid_key_path[:colon], uid_key_path[colon + 1:]
476        else:
477            uid, key_path = uid_key_path, "/_uid"
478        if uid == ".":
479            if item is None:
480                item = self._item
481            if prefix is None:
482                prefix = "/".join(self._prefix)
483        else:
484            item = self._item.map(uid)
485            prefix = ""
486        key_path = normalize_key_path(key_path, prefix)
487        value = item.get_by_normalized_key_path(key_path,
488                                                self.get_value_map(item))
489        for func in pipes:
490            value = getattr(self, func)(value)
491        return item, key_path, value
492
493    def __getitem__(self, identifier):
494        item, key_path, value = self.map(identifier)
495        if self._recursive:
496            return self.substitute(value, item, os.path.dirname(key_path))
497        return value
498
499    def substitute(self,
500                   text: Optional[str],
501                   item: Optional[Item] = None,
502                   prefix: Optional[str] = None) -> str:
503        """
504        Performs a variable substitution using the item mapper with the item
505        and prefix.
506        """
507        if not text:
508            return ""
509        try:
510            context = _ItemMapperContext(self, item, prefix, self._recursive)
511            return ItemTemplate(text).substitute(context)
512        except Exception as err:
513            spec = self._item.spec if item is None else item.spec
514            if prefix is None:
515                prefix = "/".join(self._prefix)
516            msg = (f"substitution for {spec} using prefix '{prefix}' "
517                   f"failed for text: {text}")
518            raise ValueError(msg) from err
519
520
521class _SpecType(NamedTuple):
522    key: str
523    refinements: Dict[str, Any]
524
525
526def _gather_spec_refinements(item: Item) -> Optional[_SpecType]:
527    new_type = None  # type: Optional[_SpecType]
528    for link in item.links_to_children():
529        if link.role == "spec-refinement":
530            key = link["spec-key"]
531            if new_type is None:
532                new_type = _SpecType(key, {})
533            assert new_type.key == key
534            new_type.refinements[
535                link["spec-value"]] = _gather_spec_refinements(link.item)
536    return new_type
537
538
539def _load_item(path: str, uid: str) -> Any:
540    with open(path, "r") as src:
541        try:
542            data = yaml.safe_load(src.read())
543        except yaml.YAMLError as err:
544            msg = ("YAML error while loading specification item file "
545                   f"'{path}': {str(err)}")
546            raise IOError(msg) from err
547        data["_file"] = os.path.abspath(path)
548        data["_uid"] = uid
549    return data
550
551
552class ItemCache:
553    """ This class provides a cache of specification items. """
554    def __init__(self,
555                 config: Any,
556                 post_process_load: Optional[Callable[[ItemMap],
557                                                      None]] = None):
558        self._items = {}  # type: ItemMap
559        self._updates = 0
560        cache_dir = os.path.abspath(config["cache-directory"])
561        for index, path in enumerate(config["paths"]):
562            self._load_items_recursive(str(index), path, path, cache_dir)
563        if post_process_load:
564            post_process_load(self._items)
565        self._init_parents()
566        self._init_children()
567        spec_root = config["spec-type-root-uid"]
568        if spec_root:
569            self._root_type = _gather_spec_refinements(self[spec_root])
570        else:
571            self._root_type = None
572        for item in self._items.values():
573            self._set_type(item)
574
575    def __getitem__(self, uid: str) -> Item:
576        return self._items[uid]
577
578    @property
579    def updates(self) -> bool:
580        """
581        Returns true if the item cache updates occurred due to new, modified,
582        or removed files.
583        """
584        return self._updates > 0
585
586    @property
587    def all(self) -> ItemMap:
588        """ Returns the map of all specification items. """
589        return self._items
590
591    def add_volatile_item(self, path: str, uid: str) -> Item:
592        """
593        Adds an item stored in the specified file to the cache and returns it.
594
595        The item is not added to the persistent cache storage.
596        """
597        data = _load_item(path, uid)
598        item = self._add_item(uid, data)
599        item.init_parents(self)
600        item.init_children()
601        self._set_type(item)
602        return item
603
604    def _add_item(self, uid: str, data: Any) -> Item:
605        item = Item(self, uid, data)
606        self._items[uid] = item
607        return item
608
609    def _load_items_in_dir(self, base: str, path: str, cache_file: str,
610                           update_cache: bool) -> None:
611        data_by_uid = {}  # type: Dict[str, Any]
612        if update_cache:
613            self._updates += 1
614            for name in os.listdir(path):
615                path2 = os.path.join(path, name)
616                if name.endswith(".yml") and not name.startswith("."):
617                    uid = "/" + os.path.relpath(path2, base).replace(
618                        ".yml", "")
619                    data_by_uid[uid] = _load_item(path2, uid)
620            os.makedirs(os.path.dirname(cache_file), exist_ok=True)
621            with open(cache_file, "wb") as out:
622                pickle.dump(data_by_uid, out)
623        else:
624            with open(cache_file, "rb") as pickle_src:
625                data_by_uid = pickle.load(pickle_src)
626        for uid, data in iter(data_by_uid.items()):
627            self._add_item(uid, data)
628
629    def _load_items_recursive(self, index: str, base: str, path: str,
630                              cache_dir: str) -> None:
631        mid = os.path.abspath(path)
632        mid = mid.replace(os.path.commonpath([cache_dir, mid]), "").strip("/")
633        cache_file = os.path.join(cache_dir, index, mid, "spec.pickle")
634        try:
635            mtime = os.path.getmtime(cache_file)
636            update_cache = False
637        except FileNotFoundError:
638            update_cache = True
639        else:
640            update_cache = mtime <= os.path.getmtime(path)
641        for name in os.listdir(path):
642            path2 = os.path.join(path, name)
643            if name.endswith(".yml") and not name.startswith("."):
644                if not update_cache:
645                    update_cache = mtime <= os.path.getmtime(path2)
646            else:
647                if stat.S_ISDIR(os.lstat(path2).st_mode):
648                    self._load_items_recursive(index, base, path2, cache_dir)
649        self._load_items_in_dir(base, path, cache_file, update_cache)
650
651    def _init_parents(self) -> None:
652        for item in self._items.values():
653            item.init_parents(self)
654
655    def _init_children(self) -> None:
656        for uid in sorted(self._items):
657            self._items[uid].init_children()
658
659    def _set_type(self, item: Item) -> None:
660        spec_type = self._root_type
661        value = item.data
662        path = []  # type: List[str]
663        while spec_type is not None:
664            type_name = value[spec_type.key]
665            path.append(type_name)
666            spec_type = spec_type.refinements[type_name]
667        item["_type"] = "/".join(path)
668
669
670class EmptyItemCache(ItemCache):
671    """ This class provides a empty cache of specification items. """
672    def __init__(self):
673        super().__init__({
674            "cache-directory": ".",
675            "paths": [],
676            "spec-type-root-uid": None
677        })
678
679
680class EmptyItem(Item):
681    """ Objects of this class represent empty items. """
682    def __init__(self):
683        super().__init__(EmptyItemCache(), "", {})
Note: See TracBrowser for help on using the repository browser.