source: rtems-central/rtemsqual/items.py @ 833470c

Last change on this file since 833470c was 833470c, checked in by Sebastian Huber <sebastian.huber@…>, on May 6, 2020 at 10:36:53 AM

items: Add Item.get()

  • Property mode set to 100644
File size: 13.4 KB
Line 
1# SPDX-License-Identifier: BSD-2-Clause
2""" This module provides specification items and an item cache. """
3
4# Copyright (C) 2019, 2020 embedded brains GmbH (http://www.embedded-brains.de)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25# POSSIBILITY OF SUCH DAMAGE.
26
27from contextlib import contextmanager
28import os
29import pickle
30import string
31import stat
32from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional, \
33    Tuple
34import yaml
35
36ItemMap = Dict[str, "Item"]
37ItemGetValue = Callable[["Item", str, Any, str, Optional[int]], Any]
38
39
40def _is_enabled_op_and(enabled: List[str], enabled_by: Any) -> bool:
41    for next_enabled_by in enabled_by:
42        if not _is_enabled(enabled, next_enabled_by):
43            return False
44    return True
45
46
47def _is_enabled_op_false(_enabled: List[str], _enabled_by: Any) -> bool:
48    return False
49
50
51def _is_enabled_op_not(enabled: List[str], enabled_by: Any) -> bool:
52    return not _is_enabled(enabled, enabled_by)
53
54
55def _is_enabled_op_or(enabled: List[str], enabled_by: Any) -> bool:
56    for next_enabled_by in enabled_by:
57        if _is_enabled(enabled, next_enabled_by):
58            return True
59    return False
60
61
62_IS_ENABLED_OP = {
63    "and": _is_enabled_op_and,
64    "not": _is_enabled_op_not,
65    "or": _is_enabled_op_or
66}
67
68
69def _is_enabled(enabled: List[str], enabled_by: Any) -> bool:
70    if enabled_by:
71        if isinstance(enabled_by, list):
72            return _is_enabled_op_or(enabled, enabled_by)
73        if isinstance(enabled_by, dict):
74            if len(enabled_by) == 1:
75                key = next(iter(enabled_by))
76                return _IS_ENABLED_OP.get(key, _is_enabled_op_false)(
77                    enabled, enabled_by[key])
78            return False
79        return enabled_by in enabled
80    return True
81
82
83def _str_representer(dumper, data):
84    return dumper.represent_scalar("tag:yaml.org,2002:str",
85                                   data,
86                                   style="|" if "\n" in data else "")
87
88
89yaml.add_representer(str, _str_representer)
90
91
92class Link:
93    """ A link to an item. """
94    def __init__(self, item: "Item", data: Any):
95        self._item = item
96        self._data = data
97
98    @classmethod
99    def create(cls, link: "Link", item: "Item") -> "Link":
100        """ Creates a link using an existing link with a new target item. """
101        return cls(item, link._data)  # pylint: disable=protected-access
102
103    def __getitem__(self, name: str) -> Any:
104        return self._data[name]
105
106    @property
107    def item(self) -> "Item":
108        """ The item referenced by this link. """
109        return self._item
110
111
112def _get_value(_item: "Item", _path: str, value: Any, key: str,
113               index: Optional[int]) -> str:
114    value = value[key]
115    if index is not None:
116        value = value[index]
117    return value
118
119
120class Item:
121    """ Objects of this class represent a specification item. """
122    def __init__(self, item_cache: "ItemCache", uid: str, data: Any):
123        self._item_cache = item_cache
124        self._uid = uid
125        self._data = data
126        self._links_to_parents = []  # type: List[Link]
127        self._links_to_children = []  # type: List[Link]
128
129    def __contains__(self, key: str) -> bool:
130        return key in self._data
131
132    def __getitem__(self, key: str) -> Any:
133        return self._data[key]
134
135    def get(self, key: str, default: Any) -> Any:
136        """
137        Gets the attribute value if the attribute exists, otherwise the
138        specified default value is returned.
139        """
140        return self._data.get(key, default)
141
142    def get_by_key_path(self,
143                        key_path: str,
144                        prefix: str = "",
145                        get_value: ItemGetValue = _get_value) -> Any:
146        """ Gets the attribute value corresponding to the key path. """
147        if not os.path.isabs(key_path):
148            key_path = os.path.join(prefix, key_path)
149        key_path = os.path.normpath(key_path)
150        path = "/"
151        value = self._data
152        for key in key_path.strip("/").split("/"):
153            parts = key.split("[")
154            try:
155                index = int(parts[1].split("]")[0])  # type: Optional[int]
156            except IndexError:
157                index = None
158            try:
159                value = get_value(self, path, value, parts[0], index)
160            except KeyError:
161                value = _get_value(self, path, value, parts[0], index)
162            path = os.path.join(path, key)
163        return value
164
165    @property
166    def uid(self) -> str:
167        """ Returns the UID of the item. """
168        return self._uid
169
170    def to_abs_uid(self, abs_or_rel_uid: str) -> str:
171        """
172        Returns the absolute UID of an absolute UID or an UID relative to this
173        item.
174        """
175        if abs_or_rel_uid == ".":
176            return self._uid
177        if os.path.isabs(abs_or_rel_uid):
178            return abs_or_rel_uid
179        return os.path.normpath(
180            os.path.join(os.path.dirname(self.uid), abs_or_rel_uid))
181
182    def map(self, abs_or_rel_uid: str) -> "Item":
183        """
184        Maps the absolute UID or the UID relative to this item to the
185        corresponding item.
186        """
187        return self._item_cache[self.to_abs_uid(abs_or_rel_uid)]
188
189    def links_to_parents(self) -> Iterator[Link]:
190        """ Yields the links to the parents of this items. """
191        yield from self._links_to_parents
192
193    def parents(self) -> Iterator["Item"]:
194        """ Yields the parents of this items. """
195        for link in self._links_to_parents:
196            yield link.item
197
198    def links_to_children(self) -> Iterator[Link]:
199        """ Yields the links to the children of this items. """
200        yield from self._links_to_children
201
202    def children(self) -> Iterator["Item"]:
203        """ Yields the children of this items. """
204        for link in self._links_to_children:
205            yield link.item
206
207    def init_parents(self, item_cache: "ItemCache"):
208        """ Initializes the list of links to parents of this items. """
209        for data in self._data["links"]:
210            link = Link(item_cache[self.to_abs_uid(data["uid"])], data)
211            self._links_to_parents.append(link)
212
213    def add_link_to_child(self, link: Link):
214        """ Adds a link to a child item of this items. """
215        self._links_to_children.append(link)
216
217    def is_enabled(self, enabled: List[str]):
218        """ Returns true if the item is enabled by the specified enables. """
219        return _is_enabled(enabled, self["enabled-by"])
220
221    @property
222    def file(self) -> str:
223        """ Returns the file of the item. """
224        return self._data["_file"]
225
226    @file.setter
227    def file(self, value: str):
228        """ Sets the file of the item. """
229        self._data["_file"] = value
230
231    def save(self):
232        """ Saves the item to the corresponding file. """
233        with open(self.file, "w") as dst:
234            data = self._data.copy()
235            del data["_file"]
236            dst.write(
237                yaml.dump(data, default_flow_style=False, allow_unicode=True))
238
239    def load(self):
240        """ Loads the item from the corresponding file. """
241        filename = self.file
242        with open(filename, "r") as src:
243            self._data = yaml.safe_load(src.read())
244            self._data["_file"] = filename
245
246
247class ItemTemplate(string.Template):
248    """ String template for item mapper identifiers. """
249    idpattern = "[a-zA-Z0-9._/-]+:[][a-zA-Z0-9._/-]+(|[a-zA-Z0-9_]+)*"
250
251
252class ItemMapper(Mapping[str, object]):
253    """ Maps identifiers to items and attribute values. """
254    def __init__(self, item: "Item"):
255        self._item = item
256        self._prefix = [""]
257
258    def push_prefix(self, prefix: str) -> None:
259        """ Pushes a key path prefix. """
260        self._prefix.append(prefix)
261
262    def pop_prefix(self) -> None:
263        """ Pops a key path prefix. """
264        self._prefix.pop()
265
266    @contextmanager
267    def prefix(self, prefix: str) -> Iterator[None]:
268        """ Opens a key path prefix context. """
269        self.push_prefix(prefix)
270        yield
271        self.pop_prefix()
272
273    def map(self, identifier: str) -> Tuple[Item, Any]:
274        """
275        Maps an identifier to the corresponding item and attribute value.
276        """
277        parts = identifier.split("|")
278        uid, key_path = parts[0].split(":")
279        if uid == ".":
280            item = self._item
281            prefix = "/".join(self._prefix)
282        else:
283            item = self._item.map(uid)
284            prefix = ""
285        value = item.get_by_key_path(key_path, prefix, self.get_value)
286        for func in parts[1:]:
287            value = getattr(self, func)(value)
288        return item, value
289
290    def __getitem__(self, identifier):
291        return self.map(identifier)[1]
292
293    def __iter__(self):
294        raise StopIteration
295
296    def __len__(self):
297        raise AttributeError
298
299    def substitute(self, text: str) -> str:
300        """ Performs a variable substitution using the item mapper. """
301        return ItemTemplate(text).substitute(self)
302
303    def get_value(self, _item: Item, _path: str, _value: Any, _key: str,
304                  _index: Optional[int]) -> Any:
305        """ Gets a value by key and optional index. """
306        # pylint: disable=no-self-use
307        raise KeyError
308
309
310class ItemCache:
311    """ This class provides a cache of specification items. """
312    def __init__(self, config: Any):
313        self._items = {}  # type: ItemMap
314        self._top_level = {}  # type: ItemMap
315        self._load_items(config)
316
317    def __getitem__(self, uid: str) -> Item:
318        return self._items[uid]
319
320    @property
321    def all(self) -> ItemMap:
322        """ Returns the map of all specification items. """
323        return self._items
324
325    @property
326    def top_level(self) -> ItemMap:
327        """ Returns the map of top-level specification items. """
328        return self._top_level
329
330    def _load_items_in_dir(self, base: str, path: str, cache_file: str,
331                           update_cache: bool) -> None:
332        data_by_uid = {}  # type: Dict[str, Any]
333        if update_cache:
334            for name in os.listdir(path):
335                path2 = os.path.join(path, name)
336                if name.endswith(".yml") and not name.startswith("."):
337                    uid = "/" + os.path.relpath(path2, base).replace(
338                        ".yml", "")
339                    with open(path2, "r") as yaml_src:
340                        data = yaml.safe_load(yaml_src.read())
341                        data["_file"] = os.path.abspath(path2)
342                        data_by_uid[uid] = data
343            os.makedirs(os.path.dirname(cache_file), exist_ok=True)
344            with open(cache_file, "wb") as out:
345                pickle.dump(data_by_uid, out)
346        else:
347            with open(cache_file, "rb") as pickle_src:
348                data_by_uid = pickle.load(pickle_src)
349        for uid, data in data_by_uid.items():
350            item = Item(self, uid, data)
351            self._items[uid] = item
352            if not item["links"]:
353                self._top_level[uid] = item
354
355    def _load_items_recursive(self, base: str, path: str,
356                              cache_dir: str) -> None:
357        mid = os.path.abspath(path)
358        mid = mid.replace(os.path.commonprefix([cache_dir, mid]), "")
359        cache_file = os.path.join(cache_dir, mid, "spec.pickle")
360        try:
361            mtime = os.path.getmtime(cache_file)
362            update_cache = False
363        except FileNotFoundError:
364            update_cache = True
365        for name in os.listdir(path):
366            path2 = os.path.join(path, name)
367            if name.endswith(".yml") and not name.startswith("."):
368                update_cache = update_cache or mtime <= os.path.getmtime(path2)
369            else:
370                if stat.S_ISDIR(os.lstat(path2).st_mode):
371                    self._load_items_recursive(base, path2, cache_dir)
372        self._load_items_in_dir(base, path, cache_file, update_cache)
373
374    def _init_parents(self) -> None:
375        for item in self._items.values():
376            item.init_parents(self)
377
378    def _init_children(self) -> None:
379        for item in self._items.values():
380            for link in item.links_to_parents():
381                link.item.add_link_to_child(Link.create(link, item))
382
383    def _load_items(self, config: Any) -> None:
384        cache_dir = os.path.abspath(config["cache-directory"])
385        for path in config["paths"]:
386            self._load_items_recursive(path, path, cache_dir)
387        self._init_parents()
388        self._init_children()
Note: See TracBrowser for help on using the repository browser.