source: rtems-central/rtemsspec/validation.py @ c9b1bba

Last change on this file since c9b1bba was c9b1bba, checked in by Sebastian Huber <sebastian.huber@…>, on 05/11/21 at 14:00:25

validation: Let skip reasons overwrite

Allow transition map variants with a skip reason to overwrite existing
variants with the same enabled-by attribute. This is important if
variants use N/A for some pre-conditions. It makes it also easier to
skip pre-conditon states which are controlled by build options.

  • Property mode set to 100644
File size: 68.0 KB
Line 
1# SPDX-License-Identifier: BSD-2-Clause
2""" This module provides functions for the generation of validation tests. """
3
4# Copyright (C) 2020, 2021 embedded brains GmbH (http://www.embedded-brains.de)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25# POSSIBILITY OF SUCH DAMAGE.
26
27# pylint: disable=too-many-lines
28
29import itertools
30import math
31import os
32import re
33import textwrap
34from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Tuple
35
36from rtemsspec.content import CContent, CInclude, enabled_by_to_exp, \
37    ExpressionMapper, GenericContent, get_value_params, get_value_plural, \
38    get_value_doxygen_group, get_value_doxygen_function, to_camel_case
39from rtemsspec.items import is_enabled, Item, ItemCache, \
40    ItemGetValueContext, ItemMapper
41
42ItemMap = Dict[str, Item]
43
44_STEPS = re.compile(r"^steps/([0-9]+)$")
45
46
47def _get_test_run(ctx: ItemGetValueContext) -> Any:
48    return f"{to_camel_case(ctx.item.uid[1:]).replace(' ', '')}_Run"
49
50
51class _Mapper(ItemMapper):
52    def __init__(self, item: Item):
53        super().__init__(item)
54        self._step = 0
55        self.add_get_value("glossary/term:/plural", get_value_plural)
56        self.add_get_value("interface/function:/name",
57                           get_value_doxygen_function)
58        self.add_get_value("interface/function:/params/name", get_value_params)
59        self.add_get_value("interface/group:/name", get_value_doxygen_group)
60        self.add_get_value("interface/macro:/name", get_value_doxygen_function)
61        self.add_get_value("interface/macro:/params/name", get_value_params)
62        self.add_get_value("requirement/functional/action:/test-run",
63                           _get_test_run)
64        self.add_get_value("test-case:/test-run", _get_test_run)
65
66    @property
67    def steps(self):
68        """ The count of test steps. """
69        return self._step
70
71    def reset(self):
72        """ Resets the test step counter. """
73        self._step = 0
74
75    def map(self,
76            identifier: str,
77            item: Optional[Item] = None,
78            prefix: Optional[str] = None) -> Tuple[Item, str, Any]:
79        if identifier == "step":
80            step = self._step
81            self._step = step + 1
82            return self._item, "step", str(step)
83        match = _STEPS.search(identifier)
84        if match:
85            inc = int(match.group(1))
86            self._step += inc
87            return self._item, "step", f"Accounts for {inc} test plan steps"
88        return super().map(identifier, item, prefix)
89
90
91def _add_ingroup(content: CContent, items: List["_TestItem"]) -> None:
92    content.add_ingroup([item.group_identifier for item in items])
93
94
95class _TestItem:
96    """ A test item with a default implementation for test cases. """
97
98    # pylint: disable=too-many-public-methods
99    def __init__(self, item: Item):
100        self._item = item
101        self._ident = to_camel_case(item.uid[1:])
102        self._context = f"{self._ident}_Context"
103        self._mapper = _Mapper(item)
104
105    def __getitem__(self, key: str):
106        return self._item[key]
107
108    @property
109    def item(self) -> Item:
110        """ Returns the item. """
111        return self._item
112
113    @property
114    def uid(self) -> str:
115        """ Returns the item UID. """
116        return self._item.uid
117
118    @property
119    def ident(self) -> str:
120        """ Returns the test identifier. """
121        return self._ident
122
123    @property
124    def context(self) -> str:
125        """ Returns the test case context type. """
126        return self._context
127
128    @property
129    def name(self) -> str:
130        """ Returns the name. """
131        return self._item.spec
132
133    @property
134    def includes(self) -> List[str]:
135        """ Returns the list of includes. """
136        return self._item["test-includes"]
137
138    @property
139    def local_includes(self) -> List[str]:
140        """ Returns the list of local includes. """
141        return self._item["test-local-includes"]
142
143    @property
144    def brief(self) -> str:
145        """ Returns the substituted brief description. """
146        return self.substitute_text(self["test-brief"])
147
148    @property
149    def description(self) -> str:
150        """ Returns the substituted description. """
151        return self.substitute_text(self["test-description"])
152
153    @property
154    def group_identifier(self) -> str:
155        """ Returns the group identifier. """
156        return f"RTEMSTestCase{self.ident}"
157
158    def substitute_code(self, text: Optional[str]) -> str:
159        """ Performs a variable substitution for code. """
160        return self._mapper.substitute(text)
161
162    def substitute_text(self,
163                        text: Optional[str],
164                        prefix: Optional[str] = None) -> str:
165        """
166        Performs a variable substitution for text with an optional prefix.
167        """
168        return self._mapper.substitute(text, prefix=prefix)
169
170    def add_test_case_description(
171            self, content: CContent,
172            test_case_to_suites: Dict[str, List["_TestItem"]]) -> None:
173        """ Adds the test case description. """
174        with content.defgroup_block(self.group_identifier, self.name):
175            try:
176                test_suites = test_case_to_suites[self.uid]
177            except KeyError as err:
178                msg = (f"the target file '{self['test-target']}' of "
179                       f"{self.item.spec} is not a source file of an item of "
180                       "type 'build/test-program'")
181                raise ValueError(msg) from err
182            _add_ingroup(content, test_suites)
183            content.add_brief_description(self.brief)
184            content.wrap(self.description)
185            self.add_test_case_action_description(content)
186            content.add("@{")
187
188    def add_test_case_action_description(self, content: CContent) -> None:
189        """ Adds the test case action description. """
190        actions = self["test-actions"]
191        if actions:
192            content.add("This test case performs the following actions:")
193            for action in actions:
194                content.wrap(self.substitute_text(action["action-brief"]),
195                             initial_indent="- ")
196                for check in action["checks"]:
197                    content.wrap(self.substitute_text(check["brief"]),
198                                 initial_indent="  - ",
199                                 subsequent_indent="    ")
200
201    def _add_test_case_actions(self, content: CContent) -> CContent:
202        actions = CContent()
203        for index, action in enumerate(self["test-actions"]):
204            method = f"{self._ident}_Action_{index}"
205            if self.context == "void":
206                args = []
207                params = []
208            else:
209                args = ["ctx"]
210                params = [f"{self.context} *ctx"]
211            actions.gap = False
212            actions.call_function(None, method, args)
213            with content.doxygen_block():
214                content.add_brief_description(
215                    self.substitute_text(action["action-brief"]))
216            content.gap = False
217            with content.function("static void", method, params):
218                content.add(self.substitute_code(action["action-code"]))
219                for check in action["checks"]:
220                    with content.comment_block():
221                        content.wrap(self.substitute_text(check["brief"]))
222                    content.append(self.substitute_text(check["code"]))
223        return actions
224
225    def _get_run_params(self, header: Optional[Dict[str, Any]]) -> List[str]:
226        if not header:
227            return []
228        return [
229            self.substitute_text(param["specifier"],
230                                 f"test-header/run-params[{index}]")
231            for index, param in enumerate(header["run-params"])
232        ]
233
234    def add_header_body(self, content: CContent, header: Dict[str,
235                                                              Any]) -> None:
236        """ Adds the test header body. """
237        content.add(self.substitute_code(header["code"]))
238        with content.doxygen_block():
239            content.add_brief_description("Runs the parameterized test case.")
240            content.add_param_description(header["run-params"])
241        content.gap = False
242        content.declare_function("void", f"{self.ident}_Run",
243                                 self._get_run_params(header))
244
245    def add_support_method(self,
246                           content: CContent,
247                           key: str,
248                           name: str,
249                           mandatory_code: Optional[GenericContent] = None,
250                           optional_code: Optional[GenericContent] = None,
251                           ret: str = "void",
252                           extra_params: Optional[List[str]] = None,
253                           extra_args: Optional[List[str]] = None,
254                           do_wrap: bool = True) -> str:
255        """ Adds a support method to the content. """
256
257        # pylint: disable=too-many-arguments
258        # pylint: disable=too-many-locals
259        info = self[key]
260        if not info and not mandatory_code:
261            return "NULL"
262        if extra_params is None:
263            extra_params = []
264        if extra_args is None:
265            extra_args = []
266        method = f"{self.ident}_{name}"
267        wrap = f"{method}_Wrap"
268        if info:
269            content.add_description_block(
270                self.substitute_text(info["brief"]),
271                self.substitute_text(info["description"]))
272            params = [f"{self.context} *ctx"] + extra_params
273            with content.function(f"static {ret}", method, params):
274                if not do_wrap:
275                    content.gap = False
276                    content.add(mandatory_code)
277                    content.gap = False
278                    content.add(optional_code)
279                content.add(self.substitute_code(info["code"]))
280        if not do_wrap:
281            assert info
282            return method
283        params = ["void *arg"] + extra_params
284        with content.function(f"static {ret}", wrap, params):
285            content.add([f"{self.context} *ctx;", "", "ctx = arg;"])
286            content.gap = False
287            content.add(mandatory_code)
288            content.gap = False
289            content.add(optional_code)
290            if info:
291                content.gap = False
292                ret_2 = None if ret == "void" else "return"
293                args = ["ctx"] + extra_args
294                content.call_function(ret_2, f"{method}", args)
295        return wrap
296
297    def add_function(self, content: CContent, key: str, name: str) -> None:
298        """
299        Adds a function with the name to the content if there is one defined
300        for the attribute key.
301        """
302        if self[key] is not None:
303            with content.function("static void", f"{self.ident}_{name}",
304                                  [f"{self.context} *ctx"]):
305                content.append(self.substitute_code(self[key]))
306
307    def add_default_context_members(self, content: CContent) -> None:
308        """ Adds the default context members to the content """
309        for param in self._get_run_params(self["test-header"]):
310            content.add_description_block(
311                "This member contains a copy of the corresponding "
312                f"{self.ident}_Run() parameter.", None)
313            content.add(f"{param.strip()};")
314
315    def add_context(self, content: CContent) -> str:
316        """ Adds the context to the content. """
317        content.add(self.substitute_code(self["test-context-support"]))
318        if not self["test-context"] and (
319                not self["test-header"]
320                or not self["test-header"]["run-params"]):
321            return "NULL"
322        with content.doxygen_block():
323            content.add_brief_description(
324                f"Test context for {self.name} test case.")
325        content.append("typedef struct {")
326        with content.indent():
327            for info in self["test-context"]:
328                content.add_description_block(
329                    self.substitute_text(info["brief"]),
330                    self.substitute_text(info["description"]))
331                content.add(f"{info['member'].strip()};")
332            self.add_default_context_members(content)
333        content.add([
334            f"}} {self.context};", "", f"static {self.context}",
335            f"  {self.ident}_Instance;"
336        ])
337        return f"&{self.ident}_Instance"
338
339    def generate_header(self, base_directory: str, header: Dict[str,
340                                                                Any]) -> None:
341        """ Generates the test header. """
342        content = CContent()
343        content.register_license_and_copyrights_of_item(self._item)
344        content.prepend_spdx_license_identifier()
345        with content.file_block():
346            content.add_ingroup([self.group_identifier])
347        content.add_copyrights_and_licenses()
348        content.add_automatically_generated_warning()
349        with content.header_guard(os.path.basename(header["target"])):
350            content.add_includes(list(map(CInclude, header["includes"])))
351            content.add_includes(list(map(CInclude, header["local-includes"])),
352                                 local=True)
353            with content.extern_c():
354                with content.add_to_group(self.group_identifier):
355                    self.add_header_body(content, header)
356        content.write(os.path.join(base_directory, header["target"]))
357
358    def _add_context_and_fixture(self, content: CContent) -> Optional[str]:
359        instance = self.add_context(content)
360        if instance == "NULL":
361            self._context = "void"
362            do_wrap = False
363        else:
364            do_wrap = True
365        setup = self.add_support_method(content,
366                                        "test-setup",
367                                        "Setup",
368                                        do_wrap=do_wrap)
369        stop = self.add_support_method(content,
370                                       "test-stop",
371                                       "Stop",
372                                       do_wrap=do_wrap)
373        teardown = self.add_support_method(content,
374                                           "test-teardown",
375                                           "Teardown",
376                                           do_wrap=do_wrap)
377        if all(ptr == "NULL" for ptr in [instance, setup, stop, teardown]):
378            return None
379        content.add([
380            f"static T_fixture {self.ident}_Fixture = {{",
381            f"  .setup = {setup},", f"  .stop = {stop},",
382            f"  .teardown = {teardown},", "  .scope = NULL,",
383            f"  .initial_context = {instance}", "};"
384        ])
385        return f"&{self.ident}_Fixture"
386
387    def assign_run_params(self, content: CContent, header: Dict[str,
388                                                                Any]) -> None:
389        """ Assigns the run parameters to the context.  """
390        if header["run-params"]:
391            content.add([f"ctx = &{self.ident}_Instance;"] + [
392                f"ctx->{param['name']} = {param['name']};"
393                for param in header["run-params"]
394            ])
395
396    def generate(self, content: CContent, base_directory: str,
397                 test_case_to_suites: Dict[str, List["_TestItem"]]) -> None:
398        """ Generates the content. """
399        self.add_test_case_description(content, test_case_to_suites)
400        fixture = self._add_context_and_fixture(content)
401        content.add(self.substitute_code(self["test-support"]))
402        self._mapper.reset()
403        actions = self._add_test_case_actions(content)
404        header = self["test-header"]
405        prologue = CContent()
406        epilogue = CContent()
407        if header:
408            self.generate_header(base_directory, header)
409            ret = "void"
410            name = f"{self.ident}_Run"
411            params = self._get_run_params(header)
412            if self._mapper.steps > 0 and not fixture:
413                fixture = "&T_empty_fixture"
414            if fixture:
415                content.add(f"static T_fixture_node {self.ident}_Node;")
416                if self.context == "void":
417                    result = None
418                else:
419                    prologue.add(f"{self.context} *ctx;")
420                    self.assign_run_params(prologue, header)
421                    result = "ctx ="
422                prologue.call_function(result, "T_push_fixture",
423                                       [f"&{self.ident}_Node", fixture])
424                epilogue.add("T_pop_fixture();")
425            align = True
426        else:
427            ret = ""
428            params = [f"{self.ident}"]
429            if fixture:
430                params.append(fixture)
431                name = "T_TEST_CASE_FIXTURE"
432            else:
433                name = "T_TEST_CASE"
434            if self.context != "void":
435                prologue.add([
436                    f"{self.context} *ctx;", "", "ctx = T_fixture_context();"
437                ])
438            align = False
439            with content.function_block(
440                    f"void T_case_body_{self.ident}( void )"):
441                pass
442            content.gap = False
443        with content.function(ret, name, params, align=align):
444            content.add(prologue)
445            if self._mapper.steps > 0:
446                content.add(f"T_plan( {self._mapper.steps} );")
447            content.add(actions)
448            content.add(epilogue)
449        content.add("/** @} */")
450
451
452class _TestSuiteItem(_TestItem):
453    """ A test suite item. """
454    @property
455    def group_identifier(self) -> str:
456        return f"RTEMSTestSuite{self.ident}"
457
458    def generate(self, content: CContent, _base_directory: str,
459                 _test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
460        with content.defgroup_block(self.group_identifier, self.name):
461            content.add("@ingroup RTEMSTestSuites")
462            content.add_brief_description(self.brief)
463            content.wrap(self.description)
464            content.add("@{")
465        content.add(self.substitute_code(self["test-code"]))
466        content.add("/** @} */")
467
468
469class Transition(NamedTuple):
470    """ Represents a action requirement transition map entry.  """
471    desc_idx: int
472    enabled_by: Any
473    skip: int
474    pre_cond_na: Tuple[int, ...]
475    post_cond: Tuple[Any, ...]
476
477
478def _variant_to_key(variant: Transition) -> str:
479    return "".join((enabled_by_to_exp(variant.enabled_by,
480                                      ExpressionMapper()), str(variant.skip),
481                    str(variant.pre_cond_na), str(variant.post_cond)))
482
483
484class _TransitionEntry:
485    def __init__(self):
486        self.key = ""
487        self.variants = []  # type: List[Transition]
488
489    def __bool__(self):
490        return bool(self.variants)
491
492    def __getitem__(self, key):
493        return self.variants[key]
494
495    def __len__(self):
496        return len(self.variants)
497
498    def add(self, variant: Transition) -> None:
499        """ Adds the variant to the transitions of the entry. """
500        self.key += _variant_to_key(variant)
501        self.variants.append(variant)
502
503    def replace(self, index: int, variant: Transition) -> None:
504        """ Replace the variant at transition variant index. """
505        self.key = self.key.replace(_variant_to_key(self.variants[index]),
506                                    _variant_to_key(variant))
507        self.variants[index] = variant
508
509
510_IdxToX = Tuple[Tuple[str, ...], ...]
511_TransitionMap = List[_TransitionEntry]
512
513
514def _to_st_idx(conditions: List[Any]) -> Tuple[Dict[str, int], ...]:
515    return tuple(
516        dict((state["name"], st_idx) for st_idx, state in enumerate(
517            itertools.chain(condition["states"], [{
518                "name": "N/A"
519            }]))) for condition in conditions)
520
521
522def _to_st_name(conditions: List[Any]) -> _IdxToX:
523    return tuple(
524        tuple(
525            itertools.chain((state["name"]
526                             for state in condition["states"]), ["NA"]))
527        for condition in conditions)
528
529
530class _PostCondContext(NamedTuple):
531    transition_map: "TransitionMap"
532    map_idx: int
533    pre_co_states: Tuple[int, ...]
534    post_co_states: Tuple[Any, ...]
535    post_co_idx: int
536    ops: Any
537
538
539def _post_cond_bool_and(ctx: _PostCondContext, exp: Any) -> bool:
540    for element in exp:
541        if not _post_cond_bool_exp(ctx, element):
542            return False
543    return True
544
545
546def _post_cond_bool_not(ctx: _PostCondContext, exp: Any) -> bool:
547    return not _post_cond_bool_exp(ctx, exp)
548
549
550def _post_cond_bool_or(ctx: _PostCondContext, exp: Any) -> bool:
551    for element in exp:
552        if _post_cond_bool_exp(ctx, element):
553            return True
554    return False
555
556
557def _post_cond_bool_post_cond(ctx: _PostCondContext, exp: Any) -> bool:
558    for post_co_name, status in exp.items():
559        if isinstance(status, str):
560            status = [status]
561        post_co_idx = ctx.transition_map.post_co_name_to_co_idx(post_co_name)
562        st_idx = [
563            ctx.transition_map.post_co_idx_st_name_to_st_idx(
564                post_co_idx, st_name) for st_name in status
565        ]
566        if ctx.post_co_states[post_co_idx] not in st_idx:
567            return False
568    return True
569
570
571def _post_cond_bool_pre_cond(ctx: _PostCondContext, exp: Any) -> bool:
572    for pre_co_name, status in exp.items():
573        if isinstance(status, str):
574            status = [status]
575        pre_co_idx = ctx.transition_map.pre_co_name_to_co_idx(pre_co_name)
576        st_idx = [
577            ctx.transition_map.pre_co_idx_st_name_to_st_idx(
578                pre_co_idx, st_name) for st_name in status
579        ]
580        if ctx.pre_co_states[pre_co_idx] not in st_idx:
581            return False
582    return True
583
584
585_POST_COND_BOOL_OPS = {
586    "and": _post_cond_bool_and,
587    "not": _post_cond_bool_not,
588    "or": _post_cond_bool_or,
589    "post-conditions": _post_cond_bool_post_cond,
590    "pre-conditions": _post_cond_bool_pre_cond,
591}
592
593
594def _post_cond_bool_exp(ctx: _PostCondContext, exp: Any) -> Optional[int]:
595    if isinstance(exp, list):
596        return _post_cond_bool_or(ctx, exp)
597    key = next(iter(exp))
598    return _POST_COND_BOOL_OPS[key](ctx, exp[key])
599
600
601def _post_cond_do_specified_by(ctx: _PostCondContext, pre_co_name: str) -> int:
602    pre_co_idx = ctx.transition_map.pre_co_name_to_co_idx(pre_co_name)
603    st_name = ctx.transition_map.pre_co_idx_st_idx_to_st_name(
604        pre_co_idx, ctx.pre_co_states[pre_co_idx])
605    return ctx.transition_map.post_co_idx_st_name_to_st_idx(
606        ctx.post_co_idx, st_name)
607
608
609def _post_cond_if(ctx: _PostCondContext) -> Optional[int]:
610    if _post_cond_bool_exp(ctx, ctx.ops["if"]):
611        if "then-specified-by" in ctx.ops:
612            return _post_cond_do_specified_by(ctx,
613                                              ctx.ops["then-specified-by"])
614        return ctx.transition_map.post_co_idx_st_name_to_st_idx(
615            ctx.post_co_idx, ctx.ops["then"])
616    return None
617
618
619def _post_cond_specified_by(ctx: _PostCondContext) -> Optional[int]:
620    return _post_cond_do_specified_by(ctx, ctx.ops["specified-by"])
621
622
623def _post_cond_else(ctx: _PostCondContext) -> Optional[int]:
624    return ctx.transition_map.post_co_idx_st_name_to_st_idx(
625        ctx.post_co_idx, ctx.ops["else"])
626
627
628_POST_COND_OP = {
629    "else": _post_cond_else,
630    "if": _post_cond_if,
631    "specified-by": _post_cond_specified_by,
632}
633
634PostCond = Tuple[int, ...]
635
636PreCondsOfPostCond = List[Tuple[List[int], ...]]
637
638
639def _compact(pre_conds: PreCondsOfPostCond) -> PreCondsOfPostCond:
640    while True:
641        last = pre_conds[0]
642        combined_pre_conds = [last]
643        combined_count = 0
644        for row in pre_conds[1:]:
645            diff = [
646                index for index, states in enumerate(last)
647                if states != row[index]
648            ]
649            if len(diff) == 1:
650                index = diff[0]
651                combined_count += 1
652                last[index].extend(row[index])
653            else:
654                combined_pre_conds.append(row)
655                last = row
656        pre_conds = combined_pre_conds
657        if combined_count == 0:
658            break
659    return pre_conds
660
661
662def _compact_more(pre_conds: PreCondsOfPostCond) -> PreCondsOfPostCond:
663    while True:
664        combined_count = 0
665        next_pre_conds = []
666        while pre_conds:
667            first = pre_conds.pop(0)
668            next_pre_conds.append(first)
669            for row in pre_conds:
670                diff = [
671                    index for index, states in enumerate(first)
672                    if states != row[index]
673                ]
674                if len(diff) <= 1:
675                    if diff:
676                        index = diff[0]
677                        first[index].extend(row[index])
678                    combined_count += 1
679                    pre_conds.remove(row)
680        pre_conds = next_pre_conds
681        if combined_count == 0:
682            break
683    return pre_conds
684
685
686class TransitionMap:
687    """ Representation of an action requirement transition map. """
688
689    # pylint: disable=too-many-instance-attributes
690    def __init__(self, item: Item):
691        self._item = item
692        self._pre_co_count = len(item["pre-conditions"])
693        self._post_co_count = len(item["post-conditions"])
694        self.pre_co_summary = tuple(0 for _ in range(self._pre_co_count + 1))
695        self._pre_co_idx_st_idx_to_st_name = _to_st_name(
696            item["pre-conditions"])
697        self._post_co_idx_st_idx_to_st_name = _to_st_name(
698            item["post-conditions"])
699        self._pre_co_idx_st_name_to_st_idx = _to_st_idx(item["pre-conditions"])
700        self._post_co_idx_st_name_to_st_idx = _to_st_idx(
701            item["post-conditions"])
702        self._pre_co_idx_to_cond = dict(
703            (co_idx, condition)
704            for co_idx, condition in enumerate(item["pre-conditions"]))
705        self._pre_co_name_to_co_idx = dict(
706            (condition["name"], co_idx)
707            for co_idx, condition in enumerate(item["pre-conditions"]))
708        self._post_co_name_to_co_idx = dict(
709            (condition["name"], co_idx)
710            for co_idx, condition in enumerate(item["post-conditions"]))
711        self._post_co_idx_to_co_name = dict(
712            (co_idx, condition["name"])
713            for co_idx, condition in enumerate(item["post-conditions"]))
714        self._skip_idx_to_name = dict(
715            (skip_idx + 1, key)
716            for skip_idx, key in enumerate(item["skip-reasons"].keys()))
717        self._skip_name_to_idx = dict(
718            (key, skip_idx + 1)
719            for skip_idx, key in enumerate(item["skip-reasons"].keys()))
720        self._entries = {}  # type: Dict[str, List[Any]]
721        self._map = self._build_map()
722        self._post_process()
723
724    def __getitem__(self, key: str):
725        return self._item[key]
726
727    def __iter__(self):
728        yield from self._map
729
730    def entries(self) -> Iterator[List[Any]]:
731        """ Yields the transition map entry variants sorted by frequency. """
732        yield from sorted(self._entries.values(), key=lambda x: x[1])
733
734    def get_variants(self,
735                     enabled: List[str]) -> Iterator[Tuple[int, Transition]]:
736        """
737        Yields the map index and the transition variants enabled by the enabled
738        list.
739        """
740        for map_idx, transitions in enumerate(self._map):
741            for variant in transitions[1:]:
742                if is_enabled(enabled, variant.enabled_by):
743                    break
744            else:
745                variant = transitions[0]
746            yield map_idx, variant
747
748    def get_post_conditions(
749            self, enabled: List[str]
750    ) -> Iterator[Tuple[PostCond, PreCondsOfPostCond]]:
751        """
752        Yields tuples of post-condition variants and the corresponding
753        pre-condition variants which are enabled by the enabled list.
754
755        The first entry of the post-condition variant is the skip index.  The
756        remaining entries are post-condition indices.  The pre-condition
757        variants are a list of tuples.  Each tuple entry corresponds to a
758        pre-condition and provides a list of corresponding pre-condition state
759        indices.
760        """
761        entries = {}  # type: Dict[PostCond, PreCondsOfPostCond]
762        for map_idx, variant in self.get_variants(enabled):
763            key = (variant.skip, ) + variant.post_cond
764            entry = entries.setdefault(key, [])
765            entry.append(
766                tuple([state] for state in self.map_idx_to_pre_co_states(
767                    map_idx, variant.pre_cond_na)))
768        for post_cond, pre_conds in sorted(entries.items(),
769                                           key=lambda x: (x[0][0], len(x[1]))):
770            pre_conds = _compact_more(_compact(pre_conds))
771            yield post_cond, pre_conds
772
773    def _post_process(self) -> None:
774        for map_idx, transitions in enumerate(self):
775            if not transitions or not isinstance(
776                    transitions[0].enabled_by,
777                    bool) or not transitions[0].enabled_by:
778                raise ValueError(
779                    f"transition map of {self._item.spec} contains no default "
780                    "entry for pre-condition set "
781                    f"{{{self._map_index_to_pre_conditions(map_idx)}}}")
782            entry = self._entries.setdefault(transitions.key,
783                                             [0, 0, transitions, []])
784            entry[0] += 1
785            entry[3].append(map_idx)
786        for index, entry in enumerate(
787                sorted(self._entries.values(),
788                       key=lambda x: x[0],
789                       reverse=True)):
790            entry[1] = index
791
792    def _map_index_to_pre_conditions(self, map_idx: int) -> str:
793        conditions = []
794        for condition in reversed(self._item["pre-conditions"]):
795            states = condition["states"]
796            count = len(states)
797            st_idx = int(map_idx % count)
798            conditions.append(f"{condition['name']}={states[st_idx]['name']}")
799            map_idx //= count
800        return ", ".join(reversed(conditions))
801
802    def map_idx_to_pre_co_states(
803            self, map_idx: int, pre_cond_na: Tuple[int,
804                                                   ...]) -> Tuple[int, ...]:
805        """
806        Maps the transition map index and the associated pre-condition state
807        indices.
808        """
809        co_states = []
810        for condition in reversed(self._item["pre-conditions"]):
811            count = len(condition["states"])
812            co_states.append(count if pre_cond_na[self._pre_co_name_to_co_idx[
813                condition["name"]]] else int(map_idx % count))
814            map_idx //= count
815        return tuple(reversed(co_states))
816
817    def pre_co_name_to_co_idx(self, co_name: str) -> int:
818        """
819        Maps the pre-condition name to the associated pre-condition index.
820        """
821        return self._pre_co_name_to_co_idx[co_name]
822
823    def pre_co_idx_to_co_name(self, co_idx: int) -> str:
824        """
825        Maps the pre-condition index to the associated pre-condition name.
826        """
827        return self._pre_co_idx_to_cond[co_idx]["name"]
828
829    def post_co_name_to_co_idx(self, co_name: str) -> int:
830        """
831        Maps the post-condition name to the associated post-condition index.
832        """
833        return self._post_co_name_to_co_idx[co_name]
834
835    def post_co_idx_to_co_name(self, co_idx: int) -> str:
836        """
837        Maps the post-condition index to the associated post-condition name.
838        """
839        return self._post_co_idx_to_co_name[co_idx]
840
841    def pre_co_idx_st_idx_to_st_name(self, co_idx: int, st_idx: int) -> str:
842        """
843        Maps the pre-condition name and state index to the associated state
844        name.
845        """
846        return self._pre_co_idx_st_idx_to_st_name[co_idx][st_idx]
847
848    def post_co_idx_st_idx_to_st_name(self, co_idx: int, st_idx: int) -> str:
849        """
850        Maps the post-condition name and state index to the associated state
851        name.
852        """
853        return self._post_co_idx_st_idx_to_st_name[co_idx][st_idx]
854
855    def pre_co_idx_st_name_to_st_idx(self, co_idx: int, st_name: str) -> int:
856        """
857        Maps the pre-condition index and state name to the associated state
858        index.
859        """
860        return self._pre_co_idx_st_name_to_st_idx[co_idx][st_name]
861
862    def post_co_idx_st_name_to_st_idx(self, co_idx: int, st_name: str) -> int:
863        """
864        Maps the post-condition index and state name to the associated state
865        index.
866        """
867        return self._post_co_idx_st_name_to_st_idx[co_idx][st_name]
868
869    def skip_idx_to_name(self, skip_idx: int) -> str:
870        """
871        Maps the skip index the associated skip name index.
872        """
873        return self._skip_idx_to_name[skip_idx]
874
875    def _map_post_cond(self, map_idx: int, co_idx: int,
876                       variant: Transition) -> Transition:
877        if isinstance(variant.post_cond[co_idx], int):
878            return variant
879        pre_co_states = self.map_idx_to_pre_co_states(map_idx,
880                                                      variant.pre_cond_na)
881        for ops in variant.post_cond[co_idx]:
882            idx = _POST_COND_OP[next(iter(ops))](_PostCondContext(
883                self, map_idx, pre_co_states, variant.post_cond, co_idx, ops))
884            if idx is not None:
885                return Transition(
886                    variant.desc_idx, variant.enabled_by, variant.skip,
887                    variant.pre_cond_na, variant.post_cond[0:co_idx] +
888                    (idx, ) + variant.post_cond[co_idx + 1:])
889        raise ValueError(
890            "cannot determine state for post-condition "
891            f"'{self._post_co_idx_to_co_name[co_idx]}' of transition map "
892            f"descriptor {variant.desc_idx} of {self._item.spec} for "
893            "pre-condition set "
894            f"{{{self._map_index_to_pre_conditions(map_idx)}}}")
895
896    def _make_post_cond(self, map_idx: int, variant: Transition) -> Transition:
897        for co_idx in range(len(variant.post_cond)):
898            variant = self._map_post_cond(map_idx, co_idx, variant)
899        return variant
900
901    def _add_variant(self, transition_map: _TransitionMap, map_idx: int,
902                     variant: Transition) -> None:
903        if transition_map[map_idx]:
904            for index, existing in enumerate(transition_map[map_idx].variants):
905                if existing.enabled_by == variant.enabled_by:
906                    if variant.skip:
907                        # Allow transition map variants with a skip reason to
908                        # overwrite existing variants with the same enabled-by
909                        # attribute.  This is important if variants use N/A for
910                        # some pre-conditions.  It makes it also easier to skip
911                        # pre-conditon states which are controlled by build
912                        # options.
913                        transition_map[map_idx].replace(index, variant)
914                        return
915                    raise ValueError(
916                        f"transition map descriptor {variant.desc_idx} of "
917                        f"{self._item.spec} duplicates pre-condition set "
918                        f"{{{self._map_index_to_pre_conditions(map_idx)}}}"
919                        " defined by transition map descriptor "
920                        f"{existing.desc_idx}")
921            default = transition_map[map_idx][0]
922            if (default.post_cond, default.skip,
923                    default.pre_cond_na) == (variant.post_cond, variant.skip,
924                                             variant.pre_cond_na):
925                return
926        elif not isinstance(variant.enabled_by,
927                            bool) or not variant.enabled_by:
928            raise ValueError(
929                f"transition map descriptor {variant.desc_idx} of "
930                f"{self._item.spec} is the first variant for "
931                f"{{{self._map_index_to_pre_conditions(map_idx)}}} "
932                "and it is not enabled by default")
933        self.pre_co_summary = tuple(
934            a + b for a, b in zip(self.pre_co_summary, (variant.skip, ) +
935                                  variant.pre_cond_na))
936        transition_map[map_idx].add(variant)
937
938    def _add_transitions(self, transition_map: _TransitionMap,
939                         desc: Dict[str, Any], desc_idx: int,
940                         skip_post_cond: Tuple[Any, ...], co_idx: int,
941                         map_idx: int, pre_cond_na: Tuple[int, ...]) -> None:
942        # pylint: disable=too-many-arguments
943        # pylint: disable=too-many-locals
944        if co_idx < self._pre_co_count:
945            condition = self._pre_co_idx_to_cond[co_idx]
946            state_count = len(condition["states"])
947            map_idx *= state_count
948            states = desc["pre-conditions"][condition["name"]]
949            if isinstance(states, str):
950                assert states in ["all", "N/A"]
951                for st_idx in range(state_count):
952                    self._add_transitions(
953                        transition_map, desc, desc_idx, skip_post_cond,
954                        co_idx + 1, map_idx + st_idx,
955                        pre_cond_na + (int(states == "N/A"), ))
956            else:
957                for st_name in states:
958                    try:
959                        st_idx = self._pre_co_idx_st_name_to_st_idx[co_idx][
960                            st_name]
961                    except KeyError as err:
962                        msg = (f"transition map descriptor {desc_idx} of "
963                               f"{self._item.spec} refers to non-existent "
964                               f"state {err} of pre-condition "
965                               f"'{condition['name']}'")
966                        raise ValueError(msg) from err
967                    self._add_transitions(transition_map, desc, desc_idx,
968                                          skip_post_cond, co_idx + 1,
969                                          map_idx + st_idx,
970                                          pre_cond_na + (0, ))
971        else:
972            variant = self._make_post_cond(
973                map_idx,
974                Transition(desc_idx, desc["enabled-by"], skip_post_cond[0],
975                           pre_cond_na, skip_post_cond[1:]))
976            self._add_variant(transition_map, map_idx, variant)
977
978    def _add_default(self, transition_map: _TransitionMap, desc: Dict[str,
979                                                                      Any],
980                     desc_idx: int, skip_post_cond: Tuple[int, ...]) -> None:
981        enabled_by = desc["enabled-by"]
982        for map_idx, transition in enumerate(transition_map):
983            if not transition:
984                transition.add(
985                    self._make_post_cond(
986                        map_idx,
987                        Transition(desc_idx, enabled_by, skip_post_cond[0],
988                                   (0, ) * self._pre_co_count,
989                                   skip_post_cond[1:])))
990
991    def _get_post_cond(self, desc: Dict[str, Any], co_idx: int) -> Any:
992        info = desc["post-conditions"][self._post_co_idx_to_co_name[co_idx]]
993        if isinstance(info, str):
994            return self._post_co_idx_st_name_to_st_idx[co_idx][info]
995        return info
996
997    def _build_map(self) -> _TransitionMap:
998        transition_count = 1
999        for condition in self["pre-conditions"]:
1000            state_count = len(condition["states"])
1001            if state_count == 0:
1002                raise ValueError(f"pre-condition '{condition['name']}' of "
1003                                 f"{self._item.spec} has no states")
1004            transition_count *= state_count
1005        transition_map = [_TransitionEntry() for _ in range(transition_count)]
1006        for desc_idx, desc in enumerate(self["transition-map"]):
1007            if isinstance(desc["post-conditions"], dict):
1008                try:
1009                    skip_post_cond = (0, ) + tuple(
1010                        self._get_post_cond(desc, co_idx)
1011                        for co_idx in range(self._post_co_count))
1012                except KeyError as err:
1013                    msg = (f"transition map descriptor {desc_idx} of "
1014                           f"{self._item.spec} refers to non-existent "
1015                           f"post-condition state {err}")
1016                    raise ValueError(msg) from err
1017            else:
1018                skip_post_cond = (
1019                    self._skip_name_to_idx[desc["post-conditions"]], ) + tuple(
1020                        self._post_co_idx_st_name_to_st_idx[co_idx]["N/A"]
1021                        for co_idx in range(self._post_co_count))
1022            if isinstance(desc["pre-conditions"], dict):
1023                self._add_transitions(transition_map, desc, desc_idx,
1024                                      skip_post_cond, 0, 0, ())
1025            else:
1026                assert desc["pre-conditions"] == "default"
1027                self._add_default(transition_map, desc, desc_idx,
1028                                  skip_post_cond)
1029        return transition_map
1030
1031    def _get_entry(self, ident: str, variant: Transition) -> str:
1032        text = "{ " + ", ".join(
1033            itertools.chain(
1034                map(str, (int(variant.skip != 0), ) + variant.pre_cond_na),
1035                ((f"{ident}_Post_{self._post_co_idx_to_co_name[co_idx]}"
1036                  f"_{self._post_co_idx_st_idx_to_st_name[co_idx][st_idx]}")
1037                 for co_idx, st_idx in enumerate(variant.post_cond))))
1038        wrapper = textwrap.TextWrapper()
1039        wrapper.initial_indent = "  "
1040        wrapper.subsequent_indent = "    "
1041        wrapper.width = 79
1042        return "\n".join(wrapper.wrap(text)) + " },"
1043
1044    def _get_entry_bits(self) -> int:
1045        bits = self._pre_co_count + 1
1046        for st_idx_to_st_name in self._post_co_idx_st_idx_to_st_name:
1047            bits += math.ceil(math.log2(len(st_idx_to_st_name)))
1048        return 2**max(math.ceil(math.log2(bits)), 3)
1049
1050    def add_map(self, content: CContent, ident: str) -> None:
1051        """ Adds the transition map definitions to the content. """
1052        entries = []
1053        mapper = ExpressionMapper()
1054        for entry in self.entries():
1055            transitions = entry[2]
1056            if len(transitions) == 1:
1057                entries.append(self._get_entry(ident, transitions[0]))
1058            else:
1059                ifelse = "#if "
1060                enumerators = []  # type: List[str]
1061                for variant in transitions[1:]:
1062                    enumerators.append(
1063                        ifelse + enabled_by_to_exp(variant.enabled_by, mapper))
1064                    enumerators.append(self._get_entry(ident, variant))
1065                    ifelse = "#elif "
1066                enumerators.append("#else")
1067                enumerators.append(self._get_entry(ident, transitions[0]))
1068                enumerators.append("#endif")
1069                entries.append("\n".join(enumerators))
1070        bits = self._get_entry_bits()
1071        content.add("typedef struct {")
1072        with content.indent():
1073            content.append(f"uint{bits}_t Skip : 1;")
1074            for condition in self["pre-conditions"]:
1075                content.append(f"uint{bits}_t Pre_{condition['name']}_NA : 1;")
1076            for condition in self["post-conditions"]:
1077                state_bits = math.ceil(math.log2(len(condition["states"]) + 1))
1078                content.append(
1079                    f"uint{bits}_t Post_{condition['name']} : {state_bits};")
1080        content.add(f"}} {ident}_Entry;")
1081        content.add([f"static const {ident}_Entry", f"{ident}_Entries[] = {{"])
1082        entries[-1] = entries[-1].replace("},", "}")
1083        content.append(entries)
1084        bits = max(8, math.ceil(math.log2(len(self._entries)) / 8) * 8)
1085        content.append(
1086            ["};", "", f"static const uint{bits}_t", f"{ident}_Map[] = {{"])
1087        text = ", ".join(
1088            str(self._entries[transitions.key][1])
1089            for transitions in self._map)
1090        wrapper = textwrap.TextWrapper()
1091        wrapper.initial_indent = "  "
1092        wrapper.subsequent_indent = "  "
1093        wrapper.width = 79
1094        content.append(wrapper.wrap(text))
1095        content.append("};")
1096
1097    def get_post_entry_member(self, co_idx: int) -> str:
1098        """
1099        Gets the post-condition entry member name for the post-condition index.
1100        """
1101        return f"Post_{self._post_co_idx_to_co_name[co_idx]}"
1102
1103
1104def _to_enum(prefix: str, conditions: List[Any]) -> _IdxToX:
1105    return tuple(
1106        tuple([f"{prefix}_{condition['name']}"] + [
1107            f"{prefix}_{condition['name']}_{state['name']}"
1108            for state in condition["states"]
1109        ] + [f"{prefix}_{condition['name']}_NA"]) for condition in conditions)
1110
1111
1112def _add_condition_enum(content: CContent, co_idx_to_enum: _IdxToX) -> None:
1113    for enum in co_idx_to_enum:
1114        content.add("typedef enum {")
1115        with content.indent():
1116            content.add(",\n".join(enum[1:]))
1117        content.add(f"}} {enum[0]};")
1118
1119
1120class _ActionRequirementTestItem(_TestItem):
1121    """ An action requirement test item. """
1122    def __init__(self, item: Item):
1123        super().__init__(item)
1124        self._pre_co_count = len(item["pre-conditions"])
1125        self._pre_co_idx_to_enum = _to_enum(f"{self.ident}_Pre",
1126                                            item["pre-conditions"])
1127        self._post_co_idx_to_enum = _to_enum(f"{self.ident}_Post",
1128                                             item["post-conditions"])
1129
1130    def _add_pre_condition_descriptions(self, content: CContent) -> None:
1131        for condition in self["pre-conditions"]:
1132            content.add("static const char * const "
1133                        f"{self.ident}_PreDesc_{condition['name']}[] = {{")
1134            with content.indent():
1135                content.add(",\n".join(
1136                    itertools.chain((f"\"{state['name']}\""
1137                                     for state in condition["states"]),
1138                                    ["\"NA\""])))
1139            content.add("};")
1140        content.add("static const char * const * const "
1141                    f"{self.ident}_PreDesc[] = {{")
1142        with content.indent():
1143            content.add(",\n".join([
1144                f"{self.ident}_PreDesc_{condition['name']}"
1145                for condition in self["pre-conditions"]
1146            ] + ["NULL"]))
1147        content.add("};")
1148
1149    def add_default_context_members(self, content: CContent) -> None:
1150        super().add_default_context_members(content)
1151        content.add_description_block(
1152            "This member defines the pre-condition states "
1153            "for the next action.", None)
1154        content.add(f"size_t pcs[ {self._pre_co_count} ];")
1155        content.add_description_block(
1156            "This member indicates if the test action loop "
1157            "is currently executed.", None)
1158        content.add("bool in_action_loop;")
1159
1160    def _add_fixture_scope(self, content: CContent) -> None:
1161        params = ["void *arg", "char *buf", "size_t n"]
1162        with content.function("static size_t", f"{self.ident}_Scope", params):
1163            content.add([f"{self.context} *ctx;", "", "ctx = arg;"])
1164            with content.condition("ctx->in_action_loop"):
1165                content.call_function(
1166                    "return", "T_get_scope",
1167                    [f"{self.ident}_PreDesc", "buf", "n", "ctx->pcs"])
1168            content.add("return 0;")
1169
1170    def _add_call(self, content: CContent, key: str, name: str) -> None:
1171        if self[key] is not None:
1172            content.gap = False
1173            content.call_function(None, f"{self.ident}_{name}", ["ctx"])
1174
1175    def _add_loop_body(self, content: CContent,
1176                       transition_map: TransitionMap) -> None:
1177        has_pre_co_na = max(transition_map.pre_co_summary[1:])
1178        content.add(f"{self.ident}_Entry entry;")
1179        if has_pre_co_na:
1180            content.append(f"size_t pcs[ {self._pre_co_count} ];")
1181        content.call_function("entry =", f"{self.ident}_GetEntry", ["index"])
1182        content.append("++index;")
1183        if transition_map.pre_co_summary[0]:
1184            with content.condition("entry.Skip"):
1185                content.append("continue;")
1186        if has_pre_co_na:
1187            content.call_function(None, "memcpy",
1188                                  ["pcs", "ctx->pcs", "sizeof( pcs )"])
1189            for index, pre_co in enumerate(self._item["pre-conditions"]):
1190                if transition_map.pre_co_summary[index + 1]:
1191                    name = pre_co["name"]
1192                    with content.condition(f"entry.Pre_{name}_NA"):
1193                        enum_na = self._pre_co_idx_to_enum[index][-1]
1194                        content.append(f"ctx->pcs[ {index} ] = {enum_na};")
1195        content.add_blank_line()
1196        self._add_call(content, "test-prepare", "Prepare")
1197        for index, enum in enumerate(self._pre_co_idx_to_enum):
1198            content.gap = False
1199            content.call_function(None, f"{enum[0]}_Prepare",
1200                                  ["ctx", f"ctx->pcs[ {index} ]"])
1201        self._add_call(content, "test-action", "Action")
1202        for index, enum in enumerate(self._post_co_idx_to_enum):
1203            content.gap = False
1204            content.call_function(None, f"{enum[0]}_Check", [
1205                "ctx", f"entry.{transition_map.get_post_entry_member(index)}"
1206            ])
1207        self._add_call(content, "test-cleanup", "Cleanup")
1208        if has_pre_co_na:
1209            content.gap = False
1210            content.call_function(None, "memcpy",
1211                                  ["ctx->pcs", "pcs", "sizeof( ctx->pcs )"])
1212
1213    def _add_for_loops(self, content: CContent, transition_map: TransitionMap,
1214                       index: int) -> None:
1215        if index < self._pre_co_count:
1216            var = f"ctx->pcs[ {index} ]"
1217            begin = self._pre_co_idx_to_enum[index][1]
1218            end = self._pre_co_idx_to_enum[index][-1]
1219            with content.for_loop(f"{var} = {begin}", f"{var} < {end}",
1220                                  f"++{var}"):
1221                self._add_for_loops(content, transition_map, index + 1)
1222        else:
1223            self._add_loop_body(content, transition_map)
1224
1225    def _add_test_case(self, content: CContent, transition_map: TransitionMap,
1226                       header: Dict[str, Any]) -> None:
1227        ret = f"static inline {self.ident}_Entry"
1228        name = f"{self.ident}_GetEntry"
1229        params = ["size_t index"]
1230        with content.function(ret, name, params, align=True):
1231            content.add([
1232                f"return {self.ident}_Entries[",
1233                f"  {self.ident}_Map[ index ]", "];"
1234            ])
1235        fixture = f"{self.ident}_Fixture"
1236        prologue = CContent()
1237        epilogue = CContent()
1238        if header:
1239            content.add(f"static T_fixture_node {self.ident}_Node;")
1240            ret = "void"
1241            name = f"{self.ident}_Run"
1242            params = self._get_run_params(header)
1243            prologue.add([f"{self.context} *ctx;", "size_t index;"])
1244            self.assign_run_params(prologue, header)
1245            prologue.call_function("ctx =", "T_push_fixture",
1246                                   [f"&{self.ident}_Node", f"&{fixture}"])
1247            prologue.append(["ctx->in_action_loop = true;", "index = 0;"])
1248            epilogue.add("T_pop_fixture();")
1249            align = True
1250        else:
1251            with content.function_block(
1252                    f"void T_case_body_{self.ident}( void )"):
1253                pass
1254            content.gap = False
1255            ret = ""
1256            name = "T_TEST_CASE_FIXTURE"
1257            params = [f"{self.ident}", f"&{fixture}"]
1258            prologue.add([
1259                f"{self.context} *ctx;", "size_t index;", "",
1260                "ctx = T_fixture_context();", "ctx->in_action_loop = true;",
1261                "index = 0;"
1262            ])
1263            align = False
1264        with content.function(ret, name, params, align=align):
1265            content.add(prologue)
1266            self._add_for_loops(content, transition_map, 0)
1267            content.add(epilogue)
1268
1269    def _add_handler(self, content: CContent, conditions: List[Any],
1270                     co_idx_to_enum: _IdxToX, action: str) -> None:
1271        for co_idx, condition in enumerate(conditions):
1272            enum = co_idx_to_enum[co_idx]
1273            handler = f"{enum[0]}_{action}"
1274            params = [f"{self.context} *ctx", f"{enum[0]} state"]
1275            with content.function("static void", handler, params):
1276                content.add(self.substitute_code(condition["test-prologue"]))
1277                content.add("switch ( state ) {")
1278                with content.indent():
1279                    for state_index, state in enumerate(condition["states"]):
1280                        content.add(f"case {enum[state_index + 1]}: {{")
1281                        with content.indent():
1282                            with content.comment_block():
1283                                content.wrap(
1284                                    self.substitute_text(state["text"]))
1285                            content.append(
1286                                self.substitute_code(state["test-code"]))
1287                            content.append("break;")
1288                        content.add("}")
1289                    content.add(f"case {enum[-1]}:")
1290                    with content.indent():
1291                        content.append("break;")
1292                content.add("}")
1293                content.add(self.substitute_code(condition["test-epilogue"]))
1294
1295    def add_test_case_action_description(self, _content: CContent) -> None:
1296        pass
1297
1298    def add_header_body(self, content: CContent, header: Dict[str,
1299                                                              Any]) -> None:
1300        _add_condition_enum(content, self._pre_co_idx_to_enum)
1301        _add_condition_enum(content, self._post_co_idx_to_enum)
1302        super().add_header_body(content, header)
1303
1304    def generate(self, content: CContent, base_directory: str,
1305                 test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
1306        self.add_test_case_description(content, test_case_to_suites)
1307        header = self["test-header"]
1308        if header:
1309            self.generate_header(base_directory, header)
1310        else:
1311            _add_condition_enum(content, self._pre_co_idx_to_enum)
1312            _add_condition_enum(content, self._post_co_idx_to_enum)
1313        instance = self.add_context(content)
1314        self._add_pre_condition_descriptions(content)
1315        content.add(self.substitute_code(self["test-support"]))
1316        self._add_handler(content, self["pre-conditions"],
1317                          self._pre_co_idx_to_enum, "Prepare")
1318        self._add_handler(content, self["post-conditions"],
1319                          self._post_co_idx_to_enum, "Check")
1320        optional_code = "ctx->in_action_loop = false;"
1321        setup = self.add_support_method(content,
1322                                        "test-setup",
1323                                        "Setup",
1324                                        optional_code=optional_code)
1325        stop = self.add_support_method(content,
1326                                       "test-stop",
1327                                       "Stop",
1328                                       optional_code=optional_code)
1329        teardown = self.add_support_method(content,
1330                                           "test-teardown",
1331                                           "Teardown",
1332                                           optional_code=optional_code)
1333        self.add_function(content, "test-prepare", "Prepare")
1334        self.add_function(content, "test-action", "Action")
1335        self.add_function(content, "test-cleanup", "Cleanup")
1336        transition_map = TransitionMap(self.item)
1337        transition_map.add_map(content, self.ident)
1338        self._add_fixture_scope(content)
1339        content.add([
1340            f"static T_fixture {self.ident}_Fixture = {{",
1341            f"  .setup = {setup},", f"  .stop = {stop},",
1342            f"  .teardown = {teardown},", f"  .scope = {self.ident}_Scope,",
1343            f"  .initial_context = {instance}", "};"
1344        ])
1345        self._add_test_case(content, transition_map, header)
1346        content.add("/** @} */")
1347
1348
1349class _RuntimeMeasurementRequestItem(_TestItem):
1350    """ A runtime measurement request item. """
1351    def __init__(self, item: Item, context: str):
1352        super().__init__(item)
1353        self._context = context
1354
1355
1356def _add_call_method(content: CContent, name: str) -> None:
1357    if name != "NULL":
1358        content.gap = False
1359        content.call_function(None, name, ["ctx"])
1360
1361
1362class _RuntimeMeasurementTestItem(_TestItem):
1363    """ A runtime measurement test item. """
1364    def add_test_case_action_description(self, _content: CContent) -> None:
1365        pass
1366
1367    def add_default_context_members(self, content: CContent) -> None:
1368        content.add_description_block(
1369            "This member references the measure runtime context.", None)
1370        content.add("T_measure_runtime_context *context;")
1371        content.add_description_block(
1372            "This member provides the measure runtime request.", None)
1373        content.add("T_measure_runtime_request request;")
1374
1375    def _add_requests(self, content: CContent) -> CContent:
1376        requests = CContent()
1377        prepare = self.add_support_method(content,
1378                                          "test-prepare",
1379                                          "Prepare",
1380                                          do_wrap=False)
1381        cleanup = self.add_support_method(content,
1382                                          "test-cleanup",
1383                                          "Cleanup",
1384                                          do_wrap=False)
1385        for item in self.item.children("runtime-measurement-request"):
1386            req = _RuntimeMeasurementRequestItem(item, self.context)
1387            requests.add_blank_line()
1388            _add_call_method(requests, prepare)
1389            name = req.add_support_method(content,
1390                                          "test-prepare",
1391                                          "Prepare",
1392                                          do_wrap=False)
1393            _add_call_method(requests, name)
1394            name = req.add_support_method(content, "test-setup", "Setup")
1395            requests.append([
1396                f"ctx->request.name = \"{req.ident}\";",
1397                f"ctx->request.setup = {name};"
1398            ])
1399            name = req.add_support_method(content, "test-body", "Body")
1400            requests.append([f"ctx->request.body = {name};"])
1401            extra_params = [
1402                "T_ticks *delta", "uint32_t tic", "uint32_t toc",
1403                "unsigned int retry"
1404            ]
1405            extra_args = ["delta", "tic", "toc", "retry"]
1406            name = req.add_support_method(content,
1407                                          "test-teardown",
1408                                          "Teardown",
1409                                          ret="bool",
1410                                          extra_params=extra_params,
1411                                          extra_args=extra_args)
1412            requests.append([f"ctx->request.teardown = {name};"])
1413            requests.gap = False
1414            requests.call_function(None, "T_measure_runtime",
1415                                   ["ctx->context", "&ctx->request"])
1416            name = req.add_support_method(content,
1417                                          "test-cleanup",
1418                                          "Cleanup",
1419                                          do_wrap=False)
1420            _add_call_method(requests, name)
1421            _add_call_method(requests, cleanup)
1422        return requests
1423
1424    def generate(self, content: CContent, base_directory: str,
1425                 test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
1426        self.add_test_case_description(content, test_case_to_suites)
1427        instance = self.add_context(content)
1428        content.add(self.substitute_code(self["test-support"]))
1429        setup = f"{self.ident}_Setup_Context"
1430        with content.function("static void", setup, [f"{self.context} *ctx"]):
1431            content.add([
1432                "T_measure_runtime_config config;",
1433                "",
1434                "memset( &config, 0, sizeof( config ) );",
1435                f"config.sample_count = {self['params']['sample-count']};",
1436                "ctx->request.arg = ctx;",
1437                "ctx->request.flags = T_MEASURE_RUNTIME_REPORT_SAMPLES;",
1438                "ctx->context = T_measure_runtime_create( &config );",
1439                "T_assert_not_null( ctx->context );",
1440            ])
1441        setup = self.add_support_method(content,
1442                                        "test-setup",
1443                                        "Setup",
1444                                        mandatory_code=f"{setup}( ctx );")
1445        stop = self.add_support_method(content, "test-stop", "Stop")
1446        teardown = self.add_support_method(content, "test-teardown",
1447                                           "Teardown")
1448        content.add([
1449            f"static T_fixture {self.ident}_Fixture = {{",
1450            f"  .setup = {setup},", f"  .stop = {stop},",
1451            f"  .teardown = {teardown},", "  .scope = NULL,",
1452            f"  .initial_context = {instance}", "};"
1453        ])
1454        requests = self._add_requests(content)
1455        with content.function_block(f"void T_case_body_{self.ident}( void )"):
1456            pass
1457        content.gap = False
1458        ret = ""
1459        name = "T_TEST_CASE_FIXTURE"
1460        params = [f"{self.ident}", f"&{self.ident}_Fixture"]
1461        with content.function(ret, name, params, align=False):
1462            content.add([
1463                f"{self.context} *ctx;",
1464                "",
1465                "ctx = T_fixture_context();",
1466            ])
1467            content.append(requests)
1468        content.add("/** @} */")
1469
1470
1471class _SourceFile:
1472    """ A test source file. """
1473    def __init__(self, filename: str):
1474        """ Initializes a test source file. """
1475        self._file = filename
1476        self._test_suites = []  # type: List[_TestItem]
1477        self._test_cases = []  # type: List[_TestItem]
1478
1479    @property
1480    def test_suites(self) -> List[_TestItem]:
1481        """ The test suites of the source file. """
1482        return self._test_suites
1483
1484    @property
1485    def test_cases(self) -> List[_TestItem]:
1486        """ The test cases of the source file. """
1487        return self._test_cases
1488
1489    def add_test_suite(self, item: Item) -> None:
1490        """ Adds a test suite to the source file. """
1491        self._test_suites.append(_TestSuiteItem(item))
1492
1493    def add_test_case(self, item: Item) -> None:
1494        """ Adds a test case to the source file. """
1495        self._test_cases.append(_TestItem(item))
1496
1497    def add_action_requirement_test(self, item: Item) -> None:
1498        """ Adds an action requirement test to the source file. """
1499        self._test_cases.append(_ActionRequirementTestItem(item))
1500
1501    def add_runtime_measurement_test(self, item: Item) -> None:
1502        """ Adds a runtime measurement test to the source file. """
1503        self._test_cases.append(_RuntimeMeasurementTestItem(item))
1504
1505    def generate(self, base_directory: str,
1506                 test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
1507        """
1508        Generates the source file and the corresponding build specification.
1509        """
1510        content = CContent()
1511        includes = []  # type: List[CInclude]
1512        local_includes = []  # type: List[CInclude]
1513        for item in itertools.chain(self._test_suites, self._test_cases):
1514            includes.extend(map(CInclude, item.includes))
1515            local_includes.extend(map(CInclude, item.local_includes))
1516            content.register_license_and_copyrights_of_item(item.item)
1517        content.prepend_spdx_license_identifier()
1518        with content.file_block():
1519            _add_ingroup(content, self._test_suites)
1520            _add_ingroup(content, self._test_cases)
1521        content.add_copyrights_and_licenses()
1522        content.add_automatically_generated_warning()
1523        content.add_have_config()
1524        content.add_includes(includes)
1525        content.add_includes(local_includes, local=True)
1526        content.add_includes([CInclude("rtems/test.h")])
1527        for item in sorted(self._test_cases, key=lambda x: x.name):
1528            item.generate(content, base_directory, test_case_to_suites)
1529        for item in sorted(self._test_suites, key=lambda x: x.name):
1530            item.generate(content, base_directory, test_case_to_suites)
1531        content.write(os.path.join(base_directory, self._file))
1532
1533
1534class _TestProgram:
1535    """ A test program. """
1536    def __init__(self, item: Item):
1537        """ Initializes a test program. """
1538        self._item = item
1539        self._source_files = []  # type: List[_SourceFile]
1540
1541    @property
1542    def source_files(self) -> List[_SourceFile]:
1543        """ The source files of the test program. """
1544        return self._source_files
1545
1546    def add_source_files(self, source_files: Dict[str, _SourceFile]) -> None:
1547        """
1548        Adds the source files of the test program which are present in the
1549        source file map.
1550        """
1551        for filename in self._item["source"]:
1552            source_file = source_files.get(filename, None)
1553            if source_file is not None:
1554                self._source_files.append(source_file)
1555
1556
1557def _get_source_file(filename: str,
1558                     source_files: Dict[str, _SourceFile]) -> _SourceFile:
1559    return source_files.setdefault(filename, _SourceFile(filename))
1560
1561
1562def _gather_action_requirement_test(
1563        item: Item, source_files: Dict[str, _SourceFile],
1564        _test_programs: List[_TestProgram]) -> None:
1565    src = _get_source_file(item["test-target"], source_files)
1566    src.add_action_requirement_test(item)
1567
1568
1569def _gather_runtime_measurement_test(
1570        item: Item, source_files: Dict[str, _SourceFile],
1571        _test_programs: List[_TestProgram]) -> None:
1572    src = _get_source_file(item["test-target"], source_files)
1573    src.add_runtime_measurement_test(item)
1574
1575
1576def _gather_test_case(item: Item, source_files: Dict[str, _SourceFile],
1577                      _test_programs: List[_TestProgram]) -> None:
1578    src = _get_source_file(item["test-target"], source_files)
1579    src.add_test_case(item)
1580
1581
1582def _gather_test_program(item: Item, _source_files: Dict[str, _SourceFile],
1583                         test_programs: List[_TestProgram]) -> None:
1584    test_programs.append(_TestProgram(item))
1585
1586
1587def _gather_test_suite(item: Item, source_files: Dict[str, _SourceFile],
1588                       _test_programs: List[_TestProgram]) -> None:
1589    src = _get_source_file(item["test-target"], source_files)
1590    src.add_test_suite(item)
1591
1592
1593def _gather_default(_item: Item, _source_files: Dict[str, _SourceFile],
1594                    _test_programs: List[_TestProgram]) -> None:
1595    pass
1596
1597
1598_GATHER = {
1599    "build/test-program": _gather_test_program,
1600    "requirement/functional/action": _gather_action_requirement_test,
1601    "runtime-measurement-test": _gather_runtime_measurement_test,
1602    "test-case": _gather_test_case,
1603    "test-suite": _gather_test_suite,
1604}
1605
1606
1607def generate(config: dict, item_cache: ItemCache) -> None:
1608    """
1609    Generates source files and build specification items for validation test
1610    suites and test cases according to the configuration.
1611
1612    :param config: A dictionary with configuration entries.
1613    :param item_cache: The specification item cache containing the validation
1614                       test suites and test cases.
1615    """
1616    source_files = {}  # type: Dict[str, _SourceFile]
1617    test_programs = []  # type: List[_TestProgram]
1618    for item in item_cache.all.values():
1619        _GATHER.get(item.type, _gather_default)(item, source_files,
1620                                                test_programs)
1621
1622    test_case_to_suites = {}  # type: Dict[str, List[_TestItem]]
1623    for test_program in test_programs:
1624        test_program.add_source_files(source_files)
1625        test_suites = []  # type: List[_TestItem]
1626        for source_file in test_program.source_files:
1627            test_suites.extend(source_file.test_suites)
1628        for source_file in test_program.source_files:
1629            for test_case in source_file.test_cases:
1630                test_case_to_suites.setdefault(test_case.uid,
1631                                               []).extend(test_suites)
1632
1633    for src in source_files.values():
1634        src.generate(config["base-directory"], test_case_to_suites)
Note: See TracBrowser for help on using the repository browser.