source: rtems-central/rtemsqual/items.py @ 05246b3

Last change on this file since 05246b3 was 05246b3, checked in by Sebastian Huber <sebastian.huber@…>, on May 11, 2020 at 7:00:49 AM

items: Add and use Link.role

  • Property mode set to 100644
File size: 13.5 KB
Line 
1# SPDX-License-Identifier: BSD-2-Clause
2""" This module provides specification items and an item cache. """
3
4# Copyright (C) 2019, 2020 embedded brains GmbH (http://www.embedded-brains.de)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25# POSSIBILITY OF SUCH DAMAGE.
26
27from contextlib import contextmanager
28import os
29import pickle
30import string
31import stat
32from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional, \
33    Tuple
34import yaml
35
36ItemMap = Dict[str, "Item"]
37ItemGetValue = Callable[["Item", str, Any, str, Optional[int]], Any]
38
39
40def _is_enabled_op_and(enabled: List[str], enabled_by: Any) -> bool:
41    for next_enabled_by in enabled_by:
42        if not _is_enabled(enabled, next_enabled_by):
43            return False
44    return True
45
46
47def _is_enabled_op_false(_enabled: List[str], _enabled_by: Any) -> bool:
48    return False
49
50
51def _is_enabled_op_not(enabled: List[str], enabled_by: Any) -> bool:
52    return not _is_enabled(enabled, enabled_by)
53
54
55def _is_enabled_op_or(enabled: List[str], enabled_by: Any) -> bool:
56    for next_enabled_by in enabled_by:
57        if _is_enabled(enabled, next_enabled_by):
58            return True
59    return False
60
61
62_IS_ENABLED_OP = {
63    "and": _is_enabled_op_and,
64    "not": _is_enabled_op_not,
65    "or": _is_enabled_op_or
66}
67
68
69def _is_enabled(enabled: List[str], enabled_by: Any) -> bool:
70    if enabled_by:
71        if isinstance(enabled_by, list):
72            return _is_enabled_op_or(enabled, enabled_by)
73        if isinstance(enabled_by, dict):
74            if len(enabled_by) == 1:
75                key = next(iter(enabled_by))
76                return _IS_ENABLED_OP.get(key, _is_enabled_op_false)(
77                    enabled, enabled_by[key])
78            return False
79        return enabled_by in enabled
80    return True
81
82
83def _str_representer(dumper, data):
84    return dumper.represent_scalar("tag:yaml.org,2002:str",
85                                   data,
86                                   style="|" if "\n" in data else "")
87
88
89yaml.add_representer(str, _str_representer)
90
91
92class Link:
93    """ A link to an item. """
94    def __init__(self, item: "Item", data: Any):
95        self._item = item
96        self._data = data
97
98    @classmethod
99    def create(cls, link: "Link", item: "Item") -> "Link":
100        """ Creates a link using an existing link with a new target item. """
101        return cls(item, link._data)  # pylint: disable=protected-access
102
103    def __getitem__(self, name: str) -> Any:
104        return self._data[name]
105
106    @property
107    def item(self) -> "Item":
108        """ The item referenced by this link. """
109        return self._item
110
111    @property
112    def role(self) -> str:
113        """ The link role. """
114        return self._data["role"]
115
116
117def _get_value(_item: "Item", _path: str, value: Any, key: str,
118               index: Optional[int]) -> str:
119    value = value[key]
120    if index is not None:
121        value = value[index]
122    return value
123
124
125class Item:
126    """ Objects of this class represent a specification item. """
127    def __init__(self, item_cache: "ItemCache", uid: str, data: Any):
128        self._item_cache = item_cache
129        self._uid = uid
130        self._data = data
131        self._links_to_parents = []  # type: List[Link]
132        self._links_to_children = []  # type: List[Link]
133
134    def __contains__(self, key: str) -> bool:
135        return key in self._data
136
137    def __getitem__(self, key: str) -> Any:
138        return self._data[key]
139
140    def get(self, key: str, default: Any) -> Any:
141        """
142        Gets the attribute value if the attribute exists, otherwise the
143        specified default value is returned.
144        """
145        return self._data.get(key, default)
146
147    def get_by_key_path(self,
148                        key_path: str,
149                        prefix: str = "",
150                        get_value: ItemGetValue = _get_value) -> Any:
151        """ Gets the attribute value corresponding to the key path. """
152        if not os.path.isabs(key_path):
153            key_path = os.path.join(prefix, key_path)
154        key_path = os.path.normpath(key_path)
155        path = "/"
156        value = self._data
157        for key in key_path.strip("/").split("/"):
158            parts = key.split("[")
159            try:
160                index = int(parts[1].split("]")[0])  # type: Optional[int]
161            except IndexError:
162                index = None
163            try:
164                value = get_value(self, path, value, parts[0], index)
165            except KeyError:
166                value = _get_value(self, path, value, parts[0], index)
167            path = os.path.join(path, key)
168        return value
169
170    @property
171    def uid(self) -> str:
172        """ Returns the UID of the item. """
173        return self._uid
174
175    def to_abs_uid(self, abs_or_rel_uid: str) -> str:
176        """
177        Returns the absolute UID of an absolute UID or an UID relative to this
178        item.
179        """
180        if abs_or_rel_uid == ".":
181            return self._uid
182        if os.path.isabs(abs_or_rel_uid):
183            return abs_or_rel_uid
184        return os.path.normpath(
185            os.path.join(os.path.dirname(self.uid), abs_or_rel_uid))
186
187    def map(self, abs_or_rel_uid: str) -> "Item":
188        """
189        Maps the absolute UID or the UID relative to this item to the
190        corresponding item.
191        """
192        return self._item_cache[self.to_abs_uid(abs_or_rel_uid)]
193
194    def links_to_parents(self) -> Iterator[Link]:
195        """ Yields the links to the parents of this items. """
196        yield from self._links_to_parents
197
198    def parents(self) -> Iterator["Item"]:
199        """ Yields the parents of this items. """
200        for link in self._links_to_parents:
201            yield link.item
202
203    def links_to_children(self) -> Iterator[Link]:
204        """ Yields the links to the children of this items. """
205        yield from self._links_to_children
206
207    def children(self) -> Iterator["Item"]:
208        """ Yields the children of this items. """
209        for link in self._links_to_children:
210            yield link.item
211
212    def init_parents(self, item_cache: "ItemCache"):
213        """ Initializes the list of links to parents of this items. """
214        for data in self._data["links"]:
215            link = Link(item_cache[self.to_abs_uid(data["uid"])], data)
216            self._links_to_parents.append(link)
217
218    def add_link_to_child(self, link: Link):
219        """ Adds a link to a child item of this items. """
220        self._links_to_children.append(link)
221
222    def is_enabled(self, enabled: List[str]):
223        """ Returns true if the item is enabled by the specified enables. """
224        return _is_enabled(enabled, self["enabled-by"])
225
226    @property
227    def file(self) -> str:
228        """ Returns the file of the item. """
229        return self._data["_file"]
230
231    @file.setter
232    def file(self, value: str):
233        """ Sets the file of the item. """
234        self._data["_file"] = value
235
236    def save(self):
237        """ Saves the item to the corresponding file. """
238        with open(self.file, "w") as dst:
239            data = self._data.copy()
240            del data["_file"]
241            dst.write(
242                yaml.dump(data, default_flow_style=False, allow_unicode=True))
243
244    def load(self):
245        """ Loads the item from the corresponding file. """
246        filename = self.file
247        with open(filename, "r") as src:
248            self._data = yaml.safe_load(src.read())
249            self._data["_file"] = filename
250
251
252class ItemTemplate(string.Template):
253    """ String template for item mapper identifiers. """
254    idpattern = "[a-zA-Z0-9._/-]+:[][a-zA-Z0-9._/-]+(|[a-zA-Z0-9_]+)*"
255
256
257class ItemMapper(Mapping[str, object]):
258    """ Maps identifiers to items and attribute values. """
259    def __init__(self, item: "Item"):
260        self._item = item
261        self._prefix = [""]
262
263    def push_prefix(self, prefix: str) -> None:
264        """ Pushes a key path prefix. """
265        self._prefix.append(prefix)
266
267    def pop_prefix(self) -> None:
268        """ Pops a key path prefix. """
269        self._prefix.pop()
270
271    @contextmanager
272    def prefix(self, prefix: str) -> Iterator[None]:
273        """ Opens a key path prefix context. """
274        self.push_prefix(prefix)
275        yield
276        self.pop_prefix()
277
278    def map(self, identifier: str) -> Tuple[Item, Any]:
279        """
280        Maps an identifier to the corresponding item and attribute value.
281        """
282        parts = identifier.split("|")
283        uid, key_path = parts[0].split(":")
284        if uid == ".":
285            item = self._item
286            prefix = "/".join(self._prefix)
287        else:
288            item = self._item.map(uid)
289            prefix = ""
290        value = item.get_by_key_path(key_path, prefix, self.get_value)
291        for func in parts[1:]:
292            value = getattr(self, func)(value)
293        return item, value
294
295    def __getitem__(self, identifier):
296        return self.map(identifier)[1]
297
298    def __iter__(self):
299        raise StopIteration
300
301    def __len__(self):
302        raise AttributeError
303
304    def substitute(self, text: str) -> str:
305        """ Performs a variable substitution using the item mapper. """
306        return ItemTemplate(text).substitute(self)
307
308    def get_value(self, _item: Item, _path: str, _value: Any, _key: str,
309                  _index: Optional[int]) -> Any:
310        """ Gets a value by key and optional index. """
311        # pylint: disable=no-self-use
312        raise KeyError
313
314
315class ItemCache:
316    """ This class provides a cache of specification items. """
317    def __init__(self, config: Any):
318        self._items = {}  # type: ItemMap
319        self._top_level = {}  # type: ItemMap
320        self._load_items(config)
321
322    def __getitem__(self, uid: str) -> Item:
323        return self._items[uid]
324
325    @property
326    def all(self) -> ItemMap:
327        """ Returns the map of all specification items. """
328        return self._items
329
330    @property
331    def top_level(self) -> ItemMap:
332        """ Returns the map of top-level specification items. """
333        return self._top_level
334
335    def _load_items_in_dir(self, base: str, path: str, cache_file: str,
336                           update_cache: bool) -> None:
337        data_by_uid = {}  # type: Dict[str, Any]
338        if update_cache:
339            for name in os.listdir(path):
340                path2 = os.path.join(path, name)
341                if name.endswith(".yml") and not name.startswith("."):
342                    uid = "/" + os.path.relpath(path2, base).replace(
343                        ".yml", "")
344                    with open(path2, "r") as yaml_src:
345                        data = yaml.safe_load(yaml_src.read())
346                        data["_file"] = os.path.abspath(path2)
347                        data_by_uid[uid] = data
348            os.makedirs(os.path.dirname(cache_file), exist_ok=True)
349            with open(cache_file, "wb") as out:
350                pickle.dump(data_by_uid, out)
351        else:
352            with open(cache_file, "rb") as pickle_src:
353                data_by_uid = pickle.load(pickle_src)
354        for uid, data in data_by_uid.items():
355            item = Item(self, uid, data)
356            self._items[uid] = item
357            if not item["links"]:
358                self._top_level[uid] = item
359
360    def _load_items_recursive(self, base: str, path: str,
361                              cache_dir: str) -> None:
362        mid = os.path.abspath(path)
363        mid = mid.replace(os.path.commonprefix([cache_dir, mid]), "")
364        cache_file = os.path.join(cache_dir, mid, "spec.pickle")
365        try:
366            mtime = os.path.getmtime(cache_file)
367            update_cache = False
368        except FileNotFoundError:
369            update_cache = True
370        for name in os.listdir(path):
371            path2 = os.path.join(path, name)
372            if name.endswith(".yml") and not name.startswith("."):
373                update_cache = update_cache or mtime <= os.path.getmtime(path2)
374            else:
375                if stat.S_ISDIR(os.lstat(path2).st_mode):
376                    self._load_items_recursive(base, path2, cache_dir)
377        self._load_items_in_dir(base, path, cache_file, update_cache)
378
379    def _init_parents(self) -> None:
380        for item in self._items.values():
381            item.init_parents(self)
382
383    def _init_children(self) -> None:
384        for item in self._items.values():
385            for link in item.links_to_parents():
386                link.item.add_link_to_child(Link.create(link, item))
387
388    def _load_items(self, config: Any) -> None:
389        cache_dir = os.path.abspath(config["cache-directory"])
390        for path in config["paths"]:
391            self._load_items_recursive(path, path, cache_dir)
392        self._init_parents()
393        self._init_children()
Note: See TracBrowser for help on using the repository browser.