source: rtems-central/rtemsqual/items.py @ fa46b668

Last change on this file since fa46b668 was fa46b668, checked in by Sebastian Huber <sebastian.huber@…>, on 05/06/20 at 09:40:24

Move license and copyright registration of items

This avoids a future cyclic dependency between the items and content
modules.

  • Property mode set to 100644
File size: 13.2 KB
Line 
1# SPDX-License-Identifier: BSD-2-Clause
2""" This module provides specification items and an item cache. """
3
4# Copyright (C) 2019, 2020 embedded brains GmbH (http://www.embedded-brains.de)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25# POSSIBILITY OF SUCH DAMAGE.
26
27from contextlib import contextmanager
28import os
29import pickle
30import string
31import stat
32from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional, \
33    Tuple
34import yaml
35
36ItemMap = Dict[str, "Item"]
37ItemGetValue = Callable[["Item", str, Any, str, Optional[int]], Any]
38
39
40def _is_enabled_op_and(enabled: List[str], enabled_by: Any) -> bool:
41    for next_enabled_by in enabled_by:
42        if not _is_enabled(enabled, next_enabled_by):
43            return False
44    return True
45
46
47def _is_enabled_op_false(_enabled: List[str], _enabled_by: Any) -> bool:
48    return False
49
50
51def _is_enabled_op_not(enabled: List[str], enabled_by: Any) -> bool:
52    return not _is_enabled(enabled, enabled_by)
53
54
55def _is_enabled_op_or(enabled: List[str], enabled_by: Any) -> bool:
56    for next_enabled_by in enabled_by:
57        if _is_enabled(enabled, next_enabled_by):
58            return True
59    return False
60
61
62_IS_ENABLED_OP = {
63    "and": _is_enabled_op_and,
64    "not": _is_enabled_op_not,
65    "or": _is_enabled_op_or
66}
67
68
69def _is_enabled(enabled: List[str], enabled_by: Any) -> bool:
70    if enabled_by:
71        if isinstance(enabled_by, list):
72            return _is_enabled_op_or(enabled, enabled_by)
73        if isinstance(enabled_by, dict):
74            if len(enabled_by) == 1:
75                key = next(iter(enabled_by))
76                return _IS_ENABLED_OP.get(key, _is_enabled_op_false)(
77                    enabled, enabled_by[key])
78            return False
79        return enabled_by in enabled
80    return True
81
82
83def _str_representer(dumper, data):
84    return dumper.represent_scalar("tag:yaml.org,2002:str",
85                                   data,
86                                   style="|" if "\n" in data else "")
87
88
89yaml.add_representer(str, _str_representer)
90
91
92class Link:
93    """ A link to an item. """
94    def __init__(self, item: "Item", data: Any):
95        self._item = item
96        self._data = data
97
98    @classmethod
99    def create(cls, link: "Link", item: "Item") -> "Link":
100        """ Creates a link using an existing link with a new target item. """
101        return cls(item, link._data)  # pylint: disable=protected-access
102
103    def __getitem__(self, name: str) -> Any:
104        return self._data[name]
105
106    @property
107    def item(self) -> "Item":
108        """ The item referenced by this link. """
109        return self._item
110
111
112def _get_value(_item: "Item", _path: str, value: Any, key: str,
113               index: Optional[int]) -> str:
114    value = value[key]
115    if index is not None:
116        value = value[index]
117    return value
118
119
120class Item:
121    """ Objects of this class represent a specification item. """
122    def __init__(self, item_cache: "ItemCache", uid: str, data: Any):
123        self._item_cache = item_cache
124        self._uid = uid
125        self._data = data
126        self._links_to_parents = []  # type: List[Link]
127        self._links_to_children = []  # type: List[Link]
128
129    def __contains__(self, key: str) -> bool:
130        return key in self._data
131
132    def __getitem__(self, name: str) -> Any:
133        return self._data[name]
134
135    def get_by_key_path(self,
136                        key_path: str,
137                        prefix: str = "",
138                        get_value: ItemGetValue = _get_value) -> Any:
139        """ Gets the attribute value corresponding to the key path. """
140        if not os.path.isabs(key_path):
141            key_path = os.path.join(prefix, key_path)
142        key_path = os.path.normpath(key_path)
143        path = "/"
144        value = self._data
145        for key in key_path.strip("/").split("/"):
146            parts = key.split("[")
147            try:
148                index = int(parts[1].split("]")[0])  # type: Optional[int]
149            except IndexError:
150                index = None
151            try:
152                value = get_value(self, path, value, parts[0], index)
153            except KeyError:
154                value = _get_value(self, path, value, parts[0], index)
155            path = os.path.join(path, key)
156        return value
157
158    @property
159    def uid(self) -> str:
160        """ Returns the UID of the item. """
161        return self._uid
162
163    def to_abs_uid(self, abs_or_rel_uid: str) -> str:
164        """
165        Returns the absolute UID of an absolute UID or an UID relative to this
166        item.
167        """
168        if abs_or_rel_uid == ".":
169            return self._uid
170        if os.path.isabs(abs_or_rel_uid):
171            return abs_or_rel_uid
172        return os.path.normpath(
173            os.path.join(os.path.dirname(self.uid), abs_or_rel_uid))
174
175    def map(self, abs_or_rel_uid: str) -> "Item":
176        """
177        Maps the absolute UID or the UID relative to this item to the
178        corresponding item.
179        """
180        return self._item_cache[self.to_abs_uid(abs_or_rel_uid)]
181
182    def links_to_parents(self) -> Iterator[Link]:
183        """ Yields the links to the parents of this items. """
184        yield from self._links_to_parents
185
186    def parents(self) -> Iterator["Item"]:
187        """ Yields the parents of this items. """
188        for link in self._links_to_parents:
189            yield link.item
190
191    def links_to_children(self) -> Iterator[Link]:
192        """ Yields the links to the children of this items. """
193        yield from self._links_to_children
194
195    def children(self) -> Iterator["Item"]:
196        """ Yields the children of this items. """
197        for link in self._links_to_children:
198            yield link.item
199
200    def init_parents(self, item_cache: "ItemCache"):
201        """ Initializes the list of links to parents of this items. """
202        for data in self._data["links"]:
203            link = Link(item_cache[self.to_abs_uid(data["uid"])], data)
204            self._links_to_parents.append(link)
205
206    def add_link_to_child(self, link: Link):
207        """ Adds a link to a child item of this items. """
208        self._links_to_children.append(link)
209
210    def is_enabled(self, enabled: List[str]):
211        """ Returns true if the item is enabled by the specified enables. """
212        return _is_enabled(enabled, self["enabled-by"])
213
214    @property
215    def file(self) -> str:
216        """ Returns the file of the item. """
217        return self._data["_file"]
218
219    @file.setter
220    def file(self, value: str):
221        """ Sets the file of the item. """
222        self._data["_file"] = value
223
224    def save(self):
225        """ Saves the item to the corresponding file. """
226        with open(self.file, "w") as dst:
227            data = self._data.copy()
228            del data["_file"]
229            dst.write(
230                yaml.dump(data, default_flow_style=False, allow_unicode=True))
231
232    def load(self):
233        """ Loads the item from the corresponding file. """
234        filename = self.file
235        with open(filename, "r") as src:
236            self._data = yaml.safe_load(src.read())
237            self._data["_file"] = filename
238
239
240class ItemTemplate(string.Template):
241    """ String template for item mapper identifiers. """
242    idpattern = "[a-zA-Z0-9._/-]+:[][a-zA-Z0-9._/-]+(|[a-zA-Z0-9_]+)*"
243
244
245class ItemMapper(Mapping[str, object]):
246    """ Maps identifiers to items and attribute values. """
247    def __init__(self, item: "Item"):
248        self._item = item
249        self._prefix = [""]
250
251    def push_prefix(self, prefix: str) -> None:
252        """ Pushes a key path prefix. """
253        self._prefix.append(prefix)
254
255    def pop_prefix(self) -> None:
256        """ Pops a key path prefix. """
257        self._prefix.pop()
258
259    @contextmanager
260    def prefix(self, prefix: str) -> Iterator[None]:
261        """ Opens a key path prefix context. """
262        self.push_prefix(prefix)
263        yield
264        self.pop_prefix()
265
266    def map(self, identifier: str) -> Tuple[Item, Any]:
267        """
268        Maps an identifier to the corresponding item and attribute value.
269        """
270        parts = identifier.split("|")
271        uid, key_path = parts[0].split(":")
272        if uid == ".":
273            item = self._item
274            prefix = "/".join(self._prefix)
275        else:
276            item = self._item.map(uid)
277            prefix = ""
278        value = item.get_by_key_path(key_path, prefix, self.get_value)
279        for func in parts[1:]:
280            value = getattr(self, func)(value)
281        return item, value
282
283    def __getitem__(self, identifier):
284        return self.map(identifier)[1]
285
286    def __iter__(self):
287        raise StopIteration
288
289    def __len__(self):
290        raise AttributeError
291
292    def substitute(self, text: str) -> str:
293        """ Performs a variable substitution using the item mapper. """
294        return ItemTemplate(text).substitute(self)
295
296    def get_value(self, _item: Item, _path: str, _value: Any, _key: str,
297                  _index: Optional[int]) -> Any:
298        """ Gets a value by key and optional index. """
299        # pylint: disable=no-self-use
300        raise KeyError
301
302
303class ItemCache:
304    """ This class provides a cache of specification items. """
305    def __init__(self, config: Any):
306        self._items = {}  # type: ItemMap
307        self._top_level = {}  # type: ItemMap
308        self._load_items(config)
309
310    def __getitem__(self, uid: str) -> Item:
311        return self._items[uid]
312
313    @property
314    def all(self) -> ItemMap:
315        """ Returns the map of all specification items. """
316        return self._items
317
318    @property
319    def top_level(self) -> ItemMap:
320        """ Returns the map of top-level specification items. """
321        return self._top_level
322
323    def _load_items_in_dir(self, base: str, path: str, cache_file: str,
324                           update_cache: bool) -> None:
325        data_by_uid = {}  # type: Dict[str, Any]
326        if update_cache:
327            for name in os.listdir(path):
328                path2 = os.path.join(path, name)
329                if name.endswith(".yml") and not name.startswith("."):
330                    uid = "/" + os.path.relpath(path2, base).replace(
331                        ".yml", "")
332                    with open(path2, "r") as yaml_src:
333                        data = yaml.safe_load(yaml_src.read())
334                        data["_file"] = os.path.abspath(path2)
335                        data_by_uid[uid] = data
336            os.makedirs(os.path.dirname(cache_file), exist_ok=True)
337            with open(cache_file, "wb") as out:
338                pickle.dump(data_by_uid, out)
339        else:
340            with open(cache_file, "rb") as pickle_src:
341                data_by_uid = pickle.load(pickle_src)
342        for uid, data in data_by_uid.items():
343            item = Item(self, uid, data)
344            self._items[uid] = item
345            if not item["links"]:
346                self._top_level[uid] = item
347
348    def _load_items_recursive(self, base: str, path: str,
349                              cache_dir: str) -> None:
350        mid = os.path.abspath(path)
351        mid = mid.replace(os.path.commonprefix([cache_dir, mid]), "")
352        cache_file = os.path.join(cache_dir, mid, "spec.pickle")
353        try:
354            mtime = os.path.getmtime(cache_file)
355            update_cache = False
356        except FileNotFoundError:
357            update_cache = True
358        for name in os.listdir(path):
359            path2 = os.path.join(path, name)
360            if name.endswith(".yml") and not name.startswith("."):
361                update_cache = update_cache or mtime <= os.path.getmtime(path2)
362            else:
363                if stat.S_ISDIR(os.lstat(path2).st_mode):
364                    self._load_items_recursive(base, path2, cache_dir)
365        self._load_items_in_dir(base, path, cache_file, update_cache)
366
367    def _init_parents(self) -> None:
368        for item in self._items.values():
369            item.init_parents(self)
370
371    def _init_children(self) -> None:
372        for item in self._items.values():
373            for link in item.links_to_parents():
374                link.item.add_link_to_child(Link.create(link, item))
375
376    def _load_items(self, config: Any) -> None:
377        cache_dir = os.path.abspath(config["cache-directory"])
378        for path in config["paths"]:
379            self._load_items_recursive(path, path, cache_dir)
380        self._init_parents()
381        self._init_children()
Note: See TracBrowser for help on using the repository browser.