source: rtems-central/rtemsspec/items.py @ 702a61c

Last change on this file since 702a61c was 702a61c, checked in by Sebastian Huber <sebastian.huber@…>, on 12/15/20 at 07:24:09

items: Add Item parent_link() and child_link()

  • Property mode set to 100644
File size: 24.2 KB
Line 
1# SPDX-License-Identifier: BSD-2-Clause
2""" This module provides specification items and an item cache. """
3
4# Copyright (C) 2019, 2020 embedded brains GmbH (http://www.embedded-brains.de)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25# POSSIBILITY OF SUCH DAMAGE.
26
27from contextlib import contextmanager
28import hashlib
29import os
30import pickle
31import string
32import stat
33from typing import Any, Callable, Dict, Iterable, Iterator, List, NamedTuple, \
34    Optional, Tuple, Union
35import yaml
36
37
38class ItemGetValueContext(NamedTuple):
39    """ Context used to get an item value. """
40    item: "Item"
41    path: str
42    value: Any
43    key: str
44    index: Any  # should be int, but this triggers a mypy error
45
46
47ItemMap = Dict[str, "Item"]
48ItemGetValue = Callable[[ItemGetValueContext], Any]
49ItemGetValueMap = Dict[str, Tuple[ItemGetValue, Any]]
50
51
52def _is_enabled_op_and(enabled: List[str], enabled_by: Any) -> bool:
53    for next_enabled_by in enabled_by:
54        if not is_enabled(enabled, next_enabled_by):
55            return False
56    return True
57
58
59def _is_enabled_op_not(enabled: List[str], enabled_by: Any) -> bool:
60    return not is_enabled(enabled, enabled_by)
61
62
63def _is_enabled_op_or(enabled: List[str], enabled_by: Any) -> bool:
64    for next_enabled_by in enabled_by:
65        if is_enabled(enabled, next_enabled_by):
66            return True
67    return False
68
69
70_IS_ENABLED_OP = {
71    "and": _is_enabled_op_and,
72    "not": _is_enabled_op_not,
73    "or": _is_enabled_op_or
74}
75
76
77def is_enabled(enabled: List[str], enabled_by: Any) -> bool:
78    """ Verifies if the given parameter is enabled by specific enables. """
79    if isinstance(enabled_by, bool):
80        return enabled_by
81    if isinstance(enabled_by, list):
82        return _is_enabled_op_or(enabled, enabled_by)
83    if isinstance(enabled_by, dict):
84        key, value = next(iter(enabled_by.items()))
85        return _IS_ENABLED_OP[key](enabled, value)
86    return enabled_by in enabled
87
88
89def _str_representer(dumper, data):
90    return dumper.represent_scalar("tag:yaml.org,2002:str",
91                                   data,
92                                   style="|" if "\n" in data else "")
93
94
95yaml.add_representer(str, _str_representer)
96
97
98class Link:
99    """ A link to an item. """
100    def __init__(self, item: "Item", data: Any):
101        self._item = item
102        self._data = data
103
104    @classmethod
105    def create(cls, link: "Link", item: "Item") -> "Link":
106        """ Creates a link using an existing link with a new target item. """
107        return cls(item, link._data)  # pylint: disable=protected-access
108
109    def __getitem__(self, name: str) -> Any:
110        return self._data[name]
111
112    def __setitem__(self, key: str, value: Any) -> None:
113        self._data[key] = value
114
115    @property
116    def item(self) -> "Item":
117        """ The item referenced by this link. """
118        return self._item
119
120    @property
121    def role(self) -> str:
122        """ The link role. """
123        return self._data["role"]
124
125
126def _get_value(ctx: ItemGetValueContext) -> Any:
127    value = ctx.value[ctx.key]
128    if ctx.index >= 0:
129        return value[ctx.index]
130    return value
131
132
133def normalize_key_path(key_path: str, prefix: str = "") -> str:
134    """ Normalizes the key path with an optional prefix path. """
135    if not os.path.isabs(key_path):
136        key_path = os.path.join(prefix, key_path)
137    return os.path.normpath(key_path)
138
139
140_TYPES = {
141    type(True): "B".encode("utf-8"),
142    type(1.0): "F".encode("utf-8"),
143    type(1): "I".encode("utf-8"),
144    type(None): "N".encode("utf-8"),
145    type(""): "S".encode("utf-8"),
146}
147
148
149def _hash_data(data, state) -> None:
150    if isinstance(data, list):
151        for value in data:
152            _hash_data(value, state)
153    elif isinstance(data, dict):
154        for key, value in sorted(data.items()):
155            if not key.startswith("_"):
156                state.update(key.encode("utf-8"))
157                _hash_data(value, state)
158    else:
159        state.update(_TYPES[type(data)])
160        state.update(str(data).encode("utf-8"))
161
162
163class Item:
164    """ Objects of this class represent a specification item. """
165
166    # pylint: disable=too-many-public-methods
167    def __init__(self, item_cache: "ItemCache", uid: str, data: Any):
168        self._cache = item_cache
169        self._uid = uid
170        self._data = data
171        self._links_to_parents = []  # type: List[Link]
172        self._links_to_children = []  # type: List[Link]
173
174    def __eq__(self, other: Any) -> bool:
175        if not isinstance(other, Item):
176            return NotImplemented
177        return self._uid == other._uid  # pylint: disable=protected-access
178
179    def __lt__(self, other: Any) -> bool:
180        if not isinstance(other, Item):
181            return NotImplemented
182        return self._uid < other._uid  # pylint: disable=protected-access
183
184    def __hash__(self) -> int:
185        return hash(self._uid)
186
187    def __contains__(self, key: str) -> bool:
188        return key in self._data
189
190    def __getitem__(self, key: str) -> Any:
191        return self._data[key]
192
193    def __setitem__(self, key: str, value: Any) -> None:
194        self._data[key] = value
195
196    @property
197    def cache(self) -> "ItemCache":
198        """ Returns the cache of the items. """
199        return self._cache
200
201    @property
202    def digest(self) -> str:
203        """ Returns the digest of the item data. """
204        state = hashlib.sha512()
205        _hash_data(self._data, state)
206        return state.hexdigest()
207
208    def get(self, key: str, default: Any) -> Any:
209        """
210        Gets the attribute value if the attribute exists, otherwise the
211        specified default value is returned.
212        """
213        return self._data.get(key, default)
214
215    def get_by_normalized_key_path(self, normalized_key_path: str,
216                                   get_value_map: ItemGetValueMap) -> Any:
217        """
218        Gets the attribute value corresponding to the normalized key path.
219        """
220        path = "/"
221        value = self._data
222        for key in normalized_key_path.strip("/").split("/"):
223            parts = key.split("[")
224            try:
225                index = int(parts[1].split("]")[0])
226            except IndexError:
227                index = -1
228            ctx = ItemGetValueContext(self, path, value, parts[0], index)
229            get_value, get_value_map = get_value_map.get(
230                parts[0], (_get_value, {}))
231            value = get_value(ctx)
232            path = os.path.join(path, key)
233        return value
234
235    def get_by_key_path(self, key_path: str, prefix: str = "") -> Any:
236        """ Gets the attribute value corresponding to the key path. """
237        return self.get_by_normalized_key_path(
238            normalize_key_path(key_path, prefix), {})
239
240    @property
241    def uid(self) -> str:
242        """ Returns the UID of the item. """
243        return self._uid
244
245    @property
246    def spec(self) -> str:
247        """ Returns the UID of the item with an URL-like format. """
248        return f"spec:{self._uid}"
249
250    def to_abs_uid(self, abs_or_rel_uid: str) -> str:
251        """
252        Returns the absolute UID of an absolute UID or an UID relative to this
253        item.
254        """
255        if abs_or_rel_uid == ".":
256            return self._uid
257        if os.path.isabs(abs_or_rel_uid):
258            return abs_or_rel_uid
259        return os.path.normpath(
260            os.path.join(os.path.dirname(self.uid), abs_or_rel_uid))
261
262    def map(self, abs_or_rel_uid: str) -> "Item":
263        """
264        Maps the absolute UID or the UID relative to this item to the
265        corresponding item.
266        """
267        return self._cache[self.to_abs_uid(abs_or_rel_uid)]
268
269    def links_to_parents(
270            self,
271            role: Optional[Union[str,
272                                 Iterable[str]]] = None) -> Iterator[Link]:
273        """ Yields the links to the parents of this items. """
274        if role is None:
275            for link in self._links_to_parents:
276                yield link
277        elif isinstance(role, str):
278            for link in self._links_to_parents:
279                if link.role == role:
280                    yield link
281        else:
282            for link in self._links_to_parents:
283                if link.role in role:
284                    yield link
285
286    def parents(
287            self,
288            role: Optional[Union[str,
289                                 Iterable[str]]] = None) -> Iterator["Item"]:
290        """ Yields the parents of this items. """
291        for link in self.links_to_parents(role):
292            yield link.item
293
294    def parent(self,
295               role: Optional[Union[str, Iterable[str]]] = None,
296               index: Optional[int] = 0) -> "Item":
297        """ Returns the parent with the specified role and index. """
298        for item_index, item in enumerate(self.parents(role)):
299            if item_index == index:
300                return item
301        raise IndexError
302
303    def parent_link(self,
304                    role: Optional[Union[str, Iterable[str]]] = None,
305                    index: Optional[int] = 0) -> Link:
306        """ Returns the parent link with the specified role and index. """
307        for link_index, link in enumerate(self.links_to_parents(role)):
308            if link_index == index:
309                return link
310        raise IndexError
311
312    def links_to_children(
313            self,
314            role: Optional[Union[str,
315                                 Iterable[str]]] = None) -> Iterator[Link]:
316        """ Yields the links to the children of this items. """
317        if role is None:
318            for link in self._links_to_children:
319                yield link
320        elif isinstance(role, str):
321            for link in self._links_to_children:
322                if link.role == role:
323                    yield link
324        else:
325            for link in self._links_to_children:
326                if link.role in role:
327                    yield link
328
329    def children(
330            self,
331            role: Optional[Union[str,
332                                 Iterable[str]]] = None) -> Iterator["Item"]:
333        """ Yields the children of this items. """
334        for link in self.links_to_children(role):
335            yield link.item
336
337    def child(self,
338              role: Optional[Union[str, Iterable[str]]] = None,
339              index: Optional[int] = 0) -> "Item":
340        """ Returns the child with the specified role and index. """
341        for item_index, item in enumerate(self.children(role)):
342            if item_index == index:
343                return item
344        raise IndexError
345
346    def child_link(self,
347                   role: Optional[Union[str, Iterable[str]]] = None,
348                   index: Optional[int] = 0) -> Link:
349        """ Returns the child link with the specified role and index. """
350        for link_index, link in enumerate(self.links_to_children(role)):
351            if link_index == index:
352                return link
353        raise IndexError
354
355    def init_parents(self, item_cache: "ItemCache") -> None:
356        """ Initializes the list of links to parents of this items. """
357        for data in self._data["links"]:
358            try:
359                link = Link(item_cache[self.to_abs_uid(data["uid"])], data)
360                self._links_to_parents.append(link)
361            except KeyError as err:
362                msg = (f"item '{self.uid}' links "
363                       f"to non-existing item '{data['uid']}'")
364                raise KeyError(msg) from err
365
366    def init_children(self) -> None:
367        """ Initializes the list of links to children of this items. """
368        for link in self.links_to_parents():
369            link.item.add_link_to_child(Link.create(link, self))
370
371    def add_link_to_child(self, link: Link):
372        """ Adds a link to a child item of this items. """
373        self._links_to_children.append(link)
374
375    def is_enabled(self, enabled: List[str]):
376        """ Returns true if the item is enabled by the specified enables. """
377        return is_enabled(enabled, self["enabled-by"])
378
379    @property
380    def data(self) -> Any:
381        """ The item data. """
382        return self._data
383
384    @property
385    def file(self) -> str:
386        """ Returns the file of the item. """
387        return self._data["_file"]
388
389    @file.setter
390    def file(self, value: str):
391        """ Sets the file of the item. """
392        self._data["_file"] = value
393
394    @property
395    def type(self) -> str:
396        """ Returns the type of the item. """
397        return self._data["_type"]
398
399    def save(self):
400        """ Saves the item to the corresponding file. """
401        with open(self.file, "w") as dst:
402            data = {}
403            for key, value in self._data.items():
404                if not key.startswith("_"):
405                    data[key] = value
406            dst.write(
407                yaml.dump(data, default_flow_style=False, allow_unicode=True))
408
409    def load(self):
410        """ Loads the item from the corresponding file. """
411        filename = self.file
412        with open(filename, "r") as src:
413            self._data = yaml.safe_load(src.read())
414            self._data["_file"] = filename
415
416
417class ItemTemplate(string.Template):
418    """ String template for item mapper identifiers. """
419    idpattern = "[a-zA-Z0-9._/-]+(:[][a-zA-Z0-9._/-]+)?(|[a-zA-Z0-9_]+)*"
420
421
422class _ItemMapperContext(dict):
423    """ Context to map identifiers to items and attribute values. """
424    def __init__(self, mapper: "ItemMapper", item: Optional[Item],
425                 prefix: Optional[str], recursive: bool):
426        super().__init__()
427        self._mapper = mapper
428        self._item = item
429        self._prefix = prefix
430        self._recursive = recursive
431
432    def __getitem__(self, identifier):
433        item, key_path, value = self._mapper.map(identifier, self._item,
434                                                 self._prefix)
435        if self._recursive:
436            return self._mapper.substitute(value, item,
437                                           os.path.dirname(key_path))
438        return value
439
440
441class ItemMapper:
442    """ Maps identifiers to items and attribute values. """
443    def __init__(self, item: Item, recursive: bool = False):
444        self._item = item
445        self._recursive = recursive
446        self._prefix = [""]
447        self._get_value_map = {}  # type: Dict[str, ItemGetValueMap]
448
449    @property
450    def item(self) -> Item:
451        """ The item of the mapper. """
452        return self._item
453
454    @item.setter
455    def item(self, item: Item) -> None:
456        """ Sets the item of the mapper. """
457        self._item = item
458
459    def add_get_value(self, type_path_key: str,
460                      get_value: ItemGetValue) -> None:
461        """
462        Adds a get value for the specified type and key path.
463        """
464        type_name, path_key = type_path_key.split(":")
465        keys = path_key.strip("/").split("/")
466        get_value_map = self._get_value_map.setdefault(type_name, {})
467        for key in keys[:-1]:
468            _, get_value_map = get_value_map.setdefault(key, (_get_value, {}))
469        get_value_map[keys[-1]] = (get_value, {})
470
471    def push_prefix(self, prefix: str) -> None:
472        """ Pushes a key path prefix. """
473        self._prefix.append(prefix)
474
475    def pop_prefix(self) -> None:
476        """ Pops a key path prefix. """
477        self._prefix.pop()
478
479    @contextmanager
480    def prefix(self, prefix: str) -> Iterator[None]:
481        """ Opens a key path prefix context. """
482        self.push_prefix(prefix)
483        yield
484        self.pop_prefix()
485
486    def get_value_map(self, item: Item) -> ItemGetValueMap:
487        """ Returns the get value map for the item. """
488        return self._get_value_map.get(item.type, {})
489
490    def map(self,
491            identifier: str,
492            item: Optional[Item] = None,
493            prefix: Optional[str] = None) -> Tuple[Item, str, Any]:
494        """
495        Maps an identifier with item and prefix to the corresponding item and
496        attribute value.
497        """
498        uid_key_path, *pipes = identifier.split("|")
499        colon = uid_key_path.find(":")
500        if colon >= 0:
501            uid, key_path = uid_key_path[:colon], uid_key_path[colon + 1:]
502        else:
503            uid, key_path = uid_key_path, "/_uid"
504        if uid == ".":
505            if item is None:
506                item = self._item
507            if prefix is None:
508                prefix = "/".join(self._prefix)
509        else:
510            item = self._item.map(uid)
511            prefix = ""
512        key_path = normalize_key_path(key_path, prefix)
513        value = item.get_by_normalized_key_path(key_path,
514                                                self.get_value_map(item))
515        for func in pipes:
516            value = getattr(self, func)(value)
517        return item, key_path, value
518
519    def __getitem__(self, identifier):
520        item, key_path, value = self.map(identifier)
521        if self._recursive:
522            return self.substitute(value, item, os.path.dirname(key_path))
523        return value
524
525    def substitute(self,
526                   text: Optional[str],
527                   item: Optional[Item] = None,
528                   prefix: Optional[str] = None) -> str:
529        """
530        Performs a variable substitution using the item mapper with the item
531        and prefix.
532        """
533        if not text:
534            return ""
535        try:
536            context = _ItemMapperContext(self, item, prefix, self._recursive)
537            return ItemTemplate(text).substitute(context)
538        except Exception as err:
539            spec = self._item.spec if item is None else item.spec
540            if prefix is None:
541                prefix = "/".join(self._prefix)
542            msg = (f"substitution for {spec} using prefix '{prefix}' "
543                   f"failed for text: {text}")
544            raise ValueError(msg) from err
545
546
547class _SpecType(NamedTuple):
548    key: str
549    refinements: Dict[str, Any]
550
551
552def _gather_spec_refinements(item: Item) -> Optional[_SpecType]:
553    new_type = None  # type: Optional[_SpecType]
554    for link in item.links_to_children():
555        if link.role == "spec-refinement":
556            key = link["spec-key"]
557            if new_type is None:
558                new_type = _SpecType(key, {})
559            assert new_type.key == key
560            new_type.refinements[
561                link["spec-value"]] = _gather_spec_refinements(link.item)
562    return new_type
563
564
565def _load_item(path: str, uid: str) -> Any:
566    with open(path, "r") as src:
567        try:
568            data = yaml.safe_load(src.read())
569        except yaml.YAMLError as err:
570            msg = ("YAML error while loading specification item file "
571                   f"'{path}': {str(err)}")
572            raise IOError(msg) from err
573        data["_file"] = os.path.abspath(path)
574        data["_uid"] = uid
575    return data
576
577
578class ItemCache:
579    """ This class provides a cache of specification items. """
580    def __init__(self,
581                 config: Any,
582                 post_process_load: Optional[Callable[[ItemMap],
583                                                      None]] = None):
584        self._items = {}  # type: ItemMap
585        self._updates = 0
586        cache_dir = os.path.abspath(config["cache-directory"])
587        for index, path in enumerate(config["paths"]):
588            self._load_items_recursive(str(index), path, path, cache_dir)
589        if post_process_load:
590            post_process_load(self._items)
591        self._init_parents()
592        self._init_children()
593        spec_root = config["spec-type-root-uid"]
594        if spec_root:
595            self._root_type = _gather_spec_refinements(self[spec_root])
596        else:
597            self._root_type = None
598        for item in self._items.values():
599            self._set_type(item)
600
601    def __getitem__(self, uid: str) -> Item:
602        return self._items[uid]
603
604    @property
605    def updates(self) -> bool:
606        """
607        Returns true if the item cache updates occurred due to new, modified,
608        or removed files.
609        """
610        return self._updates > 0
611
612    @property
613    def all(self) -> ItemMap:
614        """ Returns the map of all specification items. """
615        return self._items
616
617    def add_volatile_item(self, path: str, uid: str) -> Item:
618        """
619        Adds an item stored in the specified file to the cache and returns it.
620
621        The item is not added to the persistent cache storage.
622        """
623        data = _load_item(path, uid)
624        item = self._add_item(uid, data)
625        item.init_parents(self)
626        item.init_children()
627        self._set_type(item)
628        return item
629
630    def _add_item(self, uid: str, data: Any) -> Item:
631        item = Item(self, uid, data)
632        self._items[uid] = item
633        return item
634
635    def _load_items_in_dir(self, base: str, path: str, cache_file: str,
636                           update_cache: bool) -> None:
637        data_by_uid = {}  # type: Dict[str, Any]
638        if update_cache:
639            self._updates += 1
640            for name in os.listdir(path):
641                path2 = os.path.join(path, name)
642                if name.endswith(".yml") and not name.startswith("."):
643                    uid = "/" + os.path.relpath(path2, base).replace(
644                        ".yml", "")
645                    data_by_uid[uid] = _load_item(path2, uid)
646            os.makedirs(os.path.dirname(cache_file), exist_ok=True)
647            with open(cache_file, "wb") as out:
648                pickle.dump(data_by_uid, out)
649        else:
650            with open(cache_file, "rb") as pickle_src:
651                data_by_uid = pickle.load(pickle_src)
652        for uid, data in iter(data_by_uid.items()):
653            self._add_item(uid, data)
654
655    def _load_items_recursive(self, index: str, base: str, path: str,
656                              cache_dir: str) -> None:
657        mid = os.path.abspath(path)
658        mid = mid.replace(os.path.commonpath([cache_dir, mid]), "").strip("/")
659        cache_file = os.path.join(cache_dir, index, mid, "spec.pickle")
660        try:
661            mtime = os.path.getmtime(cache_file)
662            update_cache = False
663        except FileNotFoundError:
664            update_cache = True
665        else:
666            update_cache = mtime <= os.path.getmtime(path)
667        for name in os.listdir(path):
668            path2 = os.path.join(path, name)
669            if name.endswith(".yml") and not name.startswith("."):
670                if not update_cache:
671                    update_cache = mtime <= os.path.getmtime(path2)
672            else:
673                if stat.S_ISDIR(os.lstat(path2).st_mode):
674                    self._load_items_recursive(index, base, path2, cache_dir)
675        self._load_items_in_dir(base, path, cache_file, update_cache)
676
677    def _init_parents(self) -> None:
678        for item in self._items.values():
679            item.init_parents(self)
680
681    def _init_children(self) -> None:
682        for uid in sorted(self._items):
683            self._items[uid].init_children()
684
685    def _set_type(self, item: Item) -> None:
686        spec_type = self._root_type
687        value = item.data
688        path = []  # type: List[str]
689        while spec_type is not None:
690            type_name = value[spec_type.key]
691            path.append(type_name)
692            spec_type = spec_type.refinements[type_name]
693        item["_type"] = "/".join(path)
694
695
696class EmptyItemCache(ItemCache):
697    """ This class provides a empty cache of specification items. """
698    def __init__(self):
699        super().__init__({
700            "cache-directory": ".",
701            "paths": [],
702            "spec-type-root-uid": None
703        })
704
705
706class EmptyItem(Item):
707    """ Objects of this class represent empty items. """
708    def __init__(self):
709        super().__init__(EmptyItemCache(), "", {})
Note: See TracBrowser for help on using the repository browser.