source: rtems-central/rtemsspec/items.py @ bb633e2

Last change on this file since bb633e2 was bb633e2, checked in by Sebastian Huber <sebastian.huber@…>, on 07/28/22 at 09:53:19

items: Simplify code

  • Property mode set to 100644
File size: 28.6 KB
Line 
1# SPDX-License-Identifier: BSD-2-Clause
2""" This module provides specification items and an item cache. """
3
4# Copyright (C) 2019, 2022 embedded brains GmbH (http://www.embedded-brains.de)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25# POSSIBILITY OF SUCH DAMAGE.
26
27from contextlib import contextmanager
28import hashlib
29import os
30import pickle
31import string
32import stat
33from typing import Any, Callable, Dict, Iterable, Iterator, List, NamedTuple, \
34    Optional, Set, TextIO, Tuple, Union
35import json
36import yaml
37
38
39class ItemGetValueContext(NamedTuple):
40    """ Context used to get an item value. """
41    item: "Item"
42    path: str
43    value: Any
44    key: str
45    index: Any  # should be int, but this triggers a mypy error
46    args: Optional[str]
47
48    def arg(self, name: str, value: Optional[str] = None) -> str:
49        """ Get argument value by name. """
50        args = dict(
51            kv.split("=")  # type: ignore
52            for kv in self.args.split(","))  # type: ignore
53        if value:
54            return args.get(name, value)
55        return args[name]
56
57
58ItemMap = Dict[str, "Item"]
59ItemGetValue = Callable[[ItemGetValueContext], Any]
60ItemGetValueMap = Dict[str, Tuple[ItemGetValue, Any]]
61
62
63def _is_enabled_op_and(enabled: List[str], enabled_by: Any) -> bool:
64    for next_enabled_by in enabled_by:
65        if not is_enabled(enabled, next_enabled_by):
66            return False
67    return True
68
69
70def _is_enabled_op_not(enabled: List[str], enabled_by: Any) -> bool:
71    return not is_enabled(enabled, enabled_by)
72
73
74def _is_enabled_op_or(enabled: List[str], enabled_by: Any) -> bool:
75    for next_enabled_by in enabled_by:
76        if is_enabled(enabled, next_enabled_by):
77            return True
78    return False
79
80
81_IS_ENABLED_OP = {
82    "and": _is_enabled_op_and,
83    "not": _is_enabled_op_not,
84    "or": _is_enabled_op_or
85}
86
87
88def is_enabled(enabled: List[str], enabled_by: Any) -> bool:
89    """ Verifies if the given parameter is enabled by specific enables. """
90    if isinstance(enabled_by, bool):
91        return enabled_by
92    if isinstance(enabled_by, list):
93        return _is_enabled_op_or(enabled, enabled_by)
94    if isinstance(enabled_by, dict):
95        key, value = next(iter(enabled_by.items()))
96        return _IS_ENABLED_OP[key](enabled, value)
97    return enabled_by in enabled
98
99
100def _str_representer(dumper, data):
101    return dumper.represent_scalar("tag:yaml.org,2002:str",
102                                   data,
103                                   style="|" if "\n" in data else "")
104
105
106yaml.add_representer(str, _str_representer)
107
108
109class Link:
110    """ A link to an item. """
111    def __init__(self, item: "Item", data: Any):
112        self._item = item
113        self._data = data
114
115    @classmethod
116    def create(cls, link: "Link", item: "Item") -> "Link":
117        """ Creates a link using an existing link with a new target item. """
118        return cls(item, link._data)  # pylint: disable=protected-access
119
120    def __getitem__(self, name: str) -> Any:
121        return self._data[name]
122
123    def __setitem__(self, key: str, value: Any) -> None:
124        self._data[key] = value
125
126    @property
127    def item(self) -> "Item":
128        """ The item referenced by this link. """
129        return self._item
130
131    @property
132    def role(self) -> str:
133        """ The link role. """
134        return self._data["role"]
135
136
137def _get_value(ctx: ItemGetValueContext) -> Any:
138    value = ctx.value[ctx.key]
139    if ctx.index >= 0:
140        return value[ctx.index]
141    return value
142
143
144def normalize_key_path(key_path: str, prefix: str = "") -> str:
145    """ Normalizes the key path with an optional prefix path. """
146    if not os.path.isabs(key_path):
147        key_path = os.path.join(prefix, key_path)
148    return os.path.normpath(key_path)
149
150
151_TYPES = {
152    type(True): "B".encode("utf-8"),
153    type(1.0): "F".encode("utf-8"),
154    type(1): "I".encode("utf-8"),
155    type(None): "N".encode("utf-8"),
156    type(""): "S".encode("utf-8"),
157}
158
159
160def _hash_data(data, state) -> None:
161    if isinstance(data, list):
162        for value in data:
163            _hash_data(value, state)
164    elif isinstance(data, dict):
165        for key, value in sorted(data.items()):
166            if not key.startswith("_"):
167                state.update(key.encode("utf-8"))
168                _hash_data(value, state)
169    else:
170        state.update(_TYPES[type(data)])
171        state.update(str(data).encode("utf-8"))
172
173
174class Item:
175    """ Objects of this class represent a specification item. """
176
177    # pylint: disable=too-many-public-methods
178    def __init__(self, item_cache: "ItemCache", uid: str, data: Any):
179        self._cache = item_cache
180        self._uid = uid
181        self._data = data
182        self._links_to_parents = []  # type: List[Link]
183        self._links_to_children = []  # type: List[Link]
184
185    def __eq__(self, other: Any) -> bool:
186        if not isinstance(other, Item):
187            return NotImplemented
188        return self._uid == other._uid  # pylint: disable=protected-access
189
190    def __lt__(self, other: Any) -> bool:
191        if not isinstance(other, Item):
192            return NotImplemented
193        return self._uid < other._uid  # pylint: disable=protected-access
194
195    def __hash__(self) -> int:
196        return hash(self._uid)
197
198    def __contains__(self, key: str) -> bool:
199        return key in self._data
200
201    def __getitem__(self, key: str) -> Any:
202        return self._data[key]
203
204    def __setitem__(self, key: str, value: Any) -> None:
205        self._data[key] = value
206
207    @property
208    def cache(self) -> "ItemCache":
209        """ Returns the cache of the items. """
210        return self._cache
211
212    @property
213    def digest(self) -> str:
214        """ Returns the digest of the item data. """
215        state = hashlib.sha512()
216        _hash_data(self._data, state)
217        return state.hexdigest()
218
219    def get(self, key: str, default: Any) -> Any:
220        """
221        Gets the attribute value if the attribute exists, otherwise the
222        specified default value is returned.
223        """
224        return self._data.get(key, default)
225
226    def get_by_normalized_key_path(self, normalized_key_path: str,
227                                   args: Optional[str],
228                                   get_value_map: ItemGetValueMap) -> Any:
229        """
230        Gets the attribute value corresponding to the normalized key path.
231        """
232        path = "/"
233        value = self._data
234        for key in normalized_key_path.strip("/").split("/"):
235            parts = key.split("[")
236            try:
237                index = int(parts[1].split("]")[0])
238            except IndexError:
239                index = -1
240            ctx = ItemGetValueContext(self, path, value, parts[0], index, args)
241            get_value, get_value_map = get_value_map.get(
242                parts[0], (_get_value, {}))
243            value = get_value(ctx)
244            path = os.path.join(path, key)
245        return value
246
247    def get_by_key_path(self,
248                        key_path: str,
249                        prefix: str = "",
250                        args: Optional[str] = None) -> Any:
251        """ Gets the attribute value corresponding to the key path. """
252        return self.get_by_normalized_key_path(
253            normalize_key_path(key_path, prefix), args, {})
254
255    @property
256    def uid(self) -> str:
257        """ Returns the UID of the item. """
258        return self._uid
259
260    @property
261    def spec(self) -> str:
262        """ Returns the UID of the item with an URL-like format. """
263        return f"spec:{self._uid}"
264
265    def to_abs_uid(self, abs_or_rel_uid: str) -> str:
266        """
267        Returns the absolute UID of an absolute UID or an UID relative to this
268        item.
269        """
270        if abs_or_rel_uid == ".":
271            return self._uid
272        if os.path.isabs(abs_or_rel_uid):
273            return abs_or_rel_uid
274        return os.path.normpath(
275            os.path.join(os.path.dirname(self.uid), abs_or_rel_uid))
276
277    def map(self, abs_or_rel_uid: str) -> "Item":
278        """
279        Maps the absolute UID or the UID relative to this item to the
280        corresponding item.
281        """
282        return self._cache[self.to_abs_uid(abs_or_rel_uid)]
283
284    def links_to_parents(
285            self,
286            role: Optional[Union[str,
287                                 Iterable[str]]] = None) -> Iterator[Link]:
288        """ Yields the links to the parents of this items. """
289        if role is None:
290            for link in self._links_to_parents:
291                yield link
292        elif isinstance(role, str):
293            for link in self._links_to_parents:
294                if link.role == role:
295                    yield link
296        else:
297            for link in self._links_to_parents:
298                if link.role in role:
299                    yield link
300
301    def parents(
302            self,
303            role: Optional[Union[str,
304                                 Iterable[str]]] = None) -> Iterator["Item"]:
305        """ Yields the parents of this items. """
306        for link in self.links_to_parents(role):
307            yield link.item
308
309    def parent(self,
310               role: Optional[Union[str, Iterable[str]]] = None,
311               index: Optional[int] = 0) -> "Item":
312        """ Returns the parent with the specified role and index. """
313        for item_index, item in enumerate(self.parents(role)):
314            if item_index == index:
315                return item
316        raise IndexError
317
318    def parent_link(self,
319                    role: Optional[Union[str, Iterable[str]]] = None,
320                    index: Optional[int] = 0) -> Link:
321        """ Returns the parent link with the specified role and index. """
322        for link_index, link in enumerate(self.links_to_parents(role)):
323            if link_index == index:
324                return link
325        raise IndexError
326
327    def links_to_children(
328            self,
329            role: Optional[Union[str,
330                                 Iterable[str]]] = None) -> Iterator[Link]:
331        """ Yields the links to the children of this items. """
332        if role is None:
333            for link in self._links_to_children:
334                yield link
335        elif isinstance(role, str):
336            for link in self._links_to_children:
337                if link.role == role:
338                    yield link
339        else:
340            for link in self._links_to_children:
341                if link.role in role:
342                    yield link
343
344    def children(
345            self,
346            role: Optional[Union[str,
347                                 Iterable[str]]] = None) -> Iterator["Item"]:
348        """ Yields the children of this items. """
349        for link in self.links_to_children(role):
350            yield link.item
351
352    def child(self,
353              role: Optional[Union[str, Iterable[str]]] = None,
354              index: Optional[int] = 0) -> "Item":
355        """ Returns the child with the specified role and index. """
356        for item_index, item in enumerate(self.children(role)):
357            if item_index == index:
358                return item
359        raise IndexError
360
361    def child_link(self,
362                   role: Optional[Union[str, Iterable[str]]] = None,
363                   index: Optional[int] = 0) -> Link:
364        """ Returns the child link with the specified role and index. """
365        for link_index, link in enumerate(self.links_to_children(role)):
366            if link_index == index:
367                return link
368        raise IndexError
369
370    def init_parents(self, item_cache: "ItemCache") -> None:
371        """ Initializes the list of links to parents of this items. """
372        for data in self._data["links"]:
373            try:
374                link = Link(item_cache[self.to_abs_uid(data["uid"])], data)
375                self._links_to_parents.append(link)
376            except KeyError as err:
377                msg = (f"item '{self.uid}' links "
378                       f"to non-existing item '{data['uid']}'")
379                raise KeyError(msg) from err
380
381    def init_children(self) -> None:
382        """ Initializes the list of links to children of this items. """
383        for link in self.links_to_parents():
384            link.item.add_link_to_child(Link.create(link, self))
385
386    def add_link_to_parent(self, link: Link):
387        """ Adds the link as a parent item link to this item. """
388        self._links_to_parents.append(link)
389
390    def add_link_to_child(self, link: Link):
391        """ Adds the link as a child item link to this item. """
392        self._links_to_children.append(link)
393
394    def is_enabled(self, enabled: List[str]):
395        """ Returns true if the item is enabled by the specified enables. """
396        return is_enabled(enabled, self["enabled-by"])
397
398    @property
399    def data(self) -> Any:
400        """ The item data. """
401        return self._data
402
403    @property
404    def file(self) -> str:
405        """ Returns the file of the item. """
406        return self._data["_file"]
407
408    @file.setter
409    def file(self, value: str):
410        """ Sets the file of the item. """
411        self._data["_file"] = value
412
413    @property
414    def type(self) -> str:
415        """ Returns the type of the item. """
416        return self._data["_type"]
417
418    def save(self):
419        """ Saves the item to the corresponding file. """
420        self._cache.save_data(self.file, self._data)
421
422    def load(self):
423        """ Loads the item from the corresponding file. """
424        self._data = self._cache.load_data(self.file, self._uid)
425
426
427class ItemTemplate(string.Template):
428    """ String template for item mapper identifiers. """
429    idpattern = "[a-zA-Z0-9._/-]+(:[a-zA-Z0-9._/-]+)?(:[^${}]*)?"
430
431
432class _ItemMapperContext(dict):
433    """ Context to map identifiers to items and attribute values. """
434    def __init__(self, mapper: "ItemMapper", item: Optional[Item],
435                 prefix: Optional[str], recursive: bool):
436        super().__init__()
437        self._mapper = mapper
438        self._item = item
439        self._prefix = prefix
440        self._recursive = recursive
441
442    def __getitem__(self, identifier):
443        item, key_path, value = self._mapper.map(identifier, self._item,
444                                                 self._prefix)
445        if self._recursive:
446            return self._mapper.substitute(value, item,
447                                           os.path.dirname(key_path))
448        return value
449
450
451class _GetValueDictionary(dict):
452    def __init__(self, get_value: ItemGetValue):
453        super().__init__()
454        self._get_value = get_value
455
456    def get(self, _key, _default):
457        return (self._get_value, {})
458
459
460class ItemMapper:
461    """ Maps identifiers to items and attribute values. """
462    def __init__(self, item: Item, recursive: bool = False):
463        self._item = item
464        self._recursive = recursive
465        self._prefix = [""]
466        self._get_value_map = {}  # type: Dict[str, ItemGetValueMap]
467
468    @property
469    def item(self) -> Item:
470        """ The item of the mapper. """
471        return self._item
472
473    @item.setter
474    def item(self, item: Item) -> None:
475        """ Sets the item of the mapper. """
476        self._item = item
477
478    def _add_get_value_map(
479            self, type_path_key: str, new_get_value_map: Tuple[ItemGetValue,
480                                                               Dict]) -> None:
481        type_name, path_key = type_path_key.split(":")
482        keys = path_key.strip("/").split("/")
483        get_value_map = self._get_value_map.setdefault(type_name, {})
484        for key in keys[:-1]:
485            _, get_value_map = get_value_map.setdefault(key, (_get_value, {}))
486        get_value_map[keys[-1]] = new_get_value_map
487
488    def add_get_value(self, type_path_key: str,
489                      get_value: ItemGetValue) -> None:
490        """
491        Adds a get value for the specified type and key path.
492        """
493        self._add_get_value_map(type_path_key, (get_value, {}))
494
495    def add_get_value_dictionary(self, type_path_key: str,
496                                 get_value: ItemGetValue) -> None:
497        """
498        Adds a get value dictionary for the specified type and key path.
499        """
500        self._add_get_value_map(type_path_key,
501                                (_get_value, _GetValueDictionary(get_value)))
502
503    def push_prefix(self, prefix: str) -> None:
504        """ Pushes a key path prefix. """
505        self._prefix.append(prefix)
506
507    def pop_prefix(self) -> None:
508        """ Pops a key path prefix. """
509        self._prefix.pop()
510
511    @contextmanager
512    def prefix(self, prefix: str) -> Iterator[None]:
513        """ Opens a key path prefix context. """
514        self.push_prefix(prefix)
515        yield
516        self.pop_prefix()
517
518    def get_value_map(self, item: Item) -> ItemGetValueMap:
519        """ Returns the get value map for the item. """
520        return self._get_value_map.get(item.type, {})
521
522    def map(self,
523            identifier: str,
524            item: Optional[Item] = None,
525            prefix: Optional[str] = None) -> Tuple[Item, str, Any]:
526        """
527        Maps an identifier with item and prefix to the corresponding item and
528        attribute value.
529        """
530        colon = identifier.find(":")
531        if colon >= 0:
532            uid = identifier[:colon]
533            more = identifier[colon + 1:]
534            colon = more.find(":")
535            if colon < 0:
536                key_path = more
537                args = None
538            else:
539                key_path = more[:colon]
540                args = more[colon + 1:]
541        else:
542            uid = identifier
543            key_path = "/_uid"
544            args = None
545        if item is None:
546            item = self._item
547        if uid == ".":
548            if prefix is None:
549                prefix = "/".join(self._prefix)
550        else:
551            prefix = ""
552            try:
553                item = item.map(uid)
554            except KeyError as err:
555                msg = (f"item '{uid}' relative to {item.spec} "
556                       f"specified by '{identifier}' does not exist")
557                raise ValueError(msg) from err
558        key_path = normalize_key_path(key_path, prefix)
559        try:
560            value = item.get_by_normalized_key_path(key_path, args,
561                                                    self.get_value_map(item))
562        except Exception as err:
563            msg = (f"cannot get value for '{key_path}' of {item.spec} "
564                   f"specified by '{identifier}'")
565            raise ValueError(msg) from err
566        return item, key_path, value
567
568    def __getitem__(self, identifier):
569        item, key_path, value = self.map(identifier)
570        if self._recursive:
571            return self.substitute(value, item, os.path.dirname(key_path))
572        return value
573
574    def substitute(self,
575                   text: Optional[str],
576                   item: Optional[Item] = None,
577                   prefix: Optional[str] = None) -> str:
578        """
579        Performs a variable substitution using the item mapper with the item
580        and prefix.
581        """
582        if not text:
583            return ""
584        try:
585            context = _ItemMapperContext(self, item, prefix, self._recursive)
586            return ItemTemplate(text).substitute(context)
587        except Exception as err:
588            spec = self._item.spec if item is None else item.spec
589            if prefix is None:
590                prefix = "/".join(self._prefix)
591            msg = (f"substitution for {spec} using prefix '{prefix}' "
592                   f"failed for text: {text}")
593            raise ValueError(msg) from err
594
595
596class _SpecType(NamedTuple):
597    key: str
598    refinements: Dict[str, Any]
599
600
601def _gather_spec_refinements(item: Item) -> Optional[_SpecType]:
602    new_type = None  # type: Optional[_SpecType]
603    for link in item.links_to_children():
604        if link.role == "spec-refinement":
605            key = link["spec-key"]
606            if new_type is None:
607                new_type = _SpecType(key, {})
608            assert new_type.key == key
609            new_type.refinements[
610                link["spec-value"]] = _gather_spec_refinements(link.item)
611    return new_type
612
613
614def _load_yaml_data(path: str, uid: str) -> Any:
615    with open(path, "r") as src:
616        try:
617            data = yaml.safe_load(src.read())
618        except yaml.YAMLError as err:
619            msg = ("YAML error while loading specification item file "
620                   f"'{path}': {str(err)}")
621            raise IOError(msg) from err
622        data["_file"] = os.path.abspath(path)
623        data["_uid"] = uid
624    return data
625
626
627def _load_json_data(path: str, uid: str) -> Any:
628    with open(path, "r") as src:
629        try:
630            data = json.load(src)
631        except json.JSONDecodeError as err:
632            msg = ("JSON error while loading specification item file "
633                   f"'{path}': {str(err)}")
634            raise IOError(msg) from err
635        data["_file"] = os.path.abspath(path)
636        data["_uid"] = uid
637    return data
638
639
640class ItemCache:
641    """ This class provides a cache of specification items. """
642    def __init__(self,
643                 config: Any,
644                 post_process_load: Optional[Callable[[ItemMap],
645                                                      None]] = None):
646        self._items = {}  # type: ItemMap
647        self._types = set()  # type: Set[str]
648        self._updates = 0
649        self._load_items(config)
650        if post_process_load:
651            post_process_load(self._items)
652        self._init_parents()
653        self._init_children()
654        spec_root = config["spec-type-root-uid"]
655        if spec_root:
656            self._root_type = _gather_spec_refinements(self[spec_root])
657        else:
658            self._root_type = None
659        for item in self._items.values():
660            self._set_type(item)
661
662    def __getitem__(self, uid: str) -> Item:
663        return self._items[uid]
664
665    @property
666    def updates(self) -> bool:
667        """
668        Returns true if the item cache updates occurred due to new, modified,
669        or removed files.
670        """
671        return self._updates > 0
672
673    @property
674    def all(self) -> ItemMap:
675        """ Returns the map of all specification items. """
676        return self._items
677
678    @property
679    def types(self) -> Set[str]:
680        """ Returns the types of the items. """
681        return self._types
682
683    def add_volatile_item(self, uid: str, data: Any) -> Item:
684        """
685        Adds an item with the specified data to the cache and returns it.
686
687        The item is not added to the persistent cache storage.
688        """
689        item = self._add_item(uid, data)
690        item.init_parents(self)
691        item.init_children()
692        self._set_type(item)
693        return item
694
695    def add_volatile_item_from_file(self, uid: str, path: str) -> Item:
696        """
697        Adds an item stored in the specified file to the cache and returns it.
698
699        The item is not added to the persistent cache storage.
700        """
701        return self.add_volatile_item(uid, self.load_data(path, uid))
702
703    def _add_item(self, uid: str, data: Any) -> Item:
704        item = Item(self, uid, data)
705        self._items[uid] = item
706        return item
707
708    def _load_items_in_dir(self, base: str, path: str, cache_file: str,
709                           update_cache: bool) -> None:
710        data_by_uid = {}  # type: Dict[str, Any]
711        if update_cache:
712            self._updates += 1
713            for name in os.listdir(path):
714                path2 = os.path.join(path, name)
715                if name.endswith(".yml") and not name.startswith("."):
716                    uid = "/" + os.path.relpath(path2, base).replace(
717                        ".yml", "")
718                    data_by_uid[uid] = _load_yaml_data(path2, uid)
719            os.makedirs(os.path.dirname(cache_file), exist_ok=True)
720            with open(cache_file, "wb") as out:
721                pickle.dump(data_by_uid, out)
722        else:
723            with open(cache_file, "rb") as pickle_src:
724                data_by_uid = pickle.load(pickle_src)
725        for uid, data in iter(data_by_uid.items()):
726            self._add_item(uid, data)
727
728    def _load_items_recursive(self, index: str, base: str, path: str,
729                              cache_dir: str) -> None:
730        mid = os.path.abspath(path)
731        mid = mid.replace(os.path.commonpath([cache_dir, mid]), "").strip("/")
732        cache_file = os.path.join(cache_dir, index, mid, "spec.pickle")
733        try:
734            mtime = os.path.getmtime(cache_file)
735            update_cache = False
736        except FileNotFoundError:
737            update_cache = True
738        else:
739            update_cache = mtime <= os.path.getmtime(path)
740        for name in os.listdir(path):
741            path2 = os.path.join(path, name)
742            if name.endswith(".yml") and not name.startswith("."):
743                if not update_cache:
744                    update_cache = mtime <= os.path.getmtime(path2)
745            elif stat.S_ISDIR(os.lstat(path2).st_mode):
746                self._load_items_recursive(index, base, path2, cache_dir)
747        self._load_items_in_dir(base, path, cache_file, update_cache)
748
749    def _load_items(self, config: Any):
750        cache_dir = os.path.abspath(config["cache-directory"])
751        for index, path in enumerate(config["paths"]):
752            self._load_items_recursive(str(index), path, path, cache_dir)
753
754    def load_data(self, path: str, uid: str) -> Any:
755        """ Loads the item data from the file specified by path. """
756        # pylint: disable=no-self-use
757        return _load_yaml_data(path, uid)
758
759    def _save_data(self, file: TextIO, data: Any) -> None:
760        # pylint: disable=no-self-use
761        file.write(
762            yaml.dump(data, default_flow_style=False, allow_unicode=True))
763
764    def save_data(self, path: str, data: Any) -> None:
765        """ Saves the item data to the file specified by path. """
766        print('save-data', path, data)
767        with open(path, "w") as file:
768            data2 = {}
769            for key, value in data.items():
770                if not key.startswith("_"):
771                    data2[key] = value
772            self._save_data(file, data2)
773
774    def _init_parents(self) -> None:
775        for item in self._items.values():
776            item.init_parents(self)
777
778    def _init_children(self) -> None:
779        for uid in sorted(self._items):
780            self._items[uid].init_children()
781
782    def _set_type(self, item: Item) -> None:
783        spec_type = self._root_type
784        value = item.data
785        path = []  # type: List[str]
786        while spec_type is not None:
787            type_name = value[spec_type.key]
788            path.append(type_name)
789            spec_type = spec_type.refinements[type_name]
790        the_type = "/".join(path)
791        item["_type"] = the_type
792        self._types.add(the_type)
793
794
795class EmptyItemCache(ItemCache):
796    """ This class provides a empty cache of specification items. """
797    def __init__(self):
798        super().__init__({
799            "cache-directory": ".",
800            "paths": [],
801            "spec-type-root-uid": None
802        })
803
804
805class JSONItemCache(ItemCache):
806    """ This class provides a cache of specification items using JSON. """
807    def _load_json_items(self, base: str, path: str) -> None:
808        for name in os.listdir(path):
809            path2 = os.path.join(path, name)
810            if name.endswith(".json") and not name.startswith("."):
811                uid = "/" + os.path.relpath(path2, base).replace(".json", "")
812                self._add_item(uid, _load_json_data(path2, uid))
813            elif stat.S_ISDIR(os.lstat(path2).st_mode):
814                self._load_json_items(base, path2)
815
816    def _load_items(self, config: Any):
817        for path in config["paths"]:
818            self._load_json_items(path, path)
819
820    def load_data(self, path: str, uid: str) -> Any:
821        # pylint: disable=no-self-use
822        return _load_json_data(path, uid)
823
824    def _save_data(self, file: TextIO, data: Any) -> None:
825        # pylint: disable=no-self-use
826        json.dump(data, file, sort_keys=True, indent=2)
827
828
829class EmptyItem(Item):
830    """ Objects of this class represent empty items. """
831    def __init__(self):
832        super().__init__(EmptyItemCache(), "", {})
Note: See TracBrowser for help on using the repository browser.