source: rtems-central/rtemsspec/validation.py @ d6db730

Last change on this file since d6db730 was d6db730, checked in by Sebastian Huber <sebastian.huber@…>, on 11/08/21 at 09:26:30

validation: Move support code before fixture

  • Property mode set to 100644
File size: 50.2 KB
Line 
1# SPDX-License-Identifier: BSD-2-Clause
2""" This module provides functions for the generation of validation tests. """
3
4# Copyright (C) 2020, 2021 embedded brains GmbH (http://www.embedded-brains.de)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25# POSSIBILITY OF SUCH DAMAGE.
26
27# pylint: disable=too-many-lines
28
29import itertools
30import functools
31import os
32import re
33from typing import Any, Dict, List, Optional, Tuple
34
35from rtemsspec.content import CContent, CInclude, \
36    GenericContent, get_integer_type, get_value_params, get_value_plural, \
37    get_value_doxygen_group, get_value_doxygen_function, to_camel_case
38from rtemsspec.items import Item, ItemCache, \
39    ItemGetValueContext, ItemMapper
40from rtemsspec.transitionmap import TransitionMap
41
42ItemMap = Dict[str, Item]
43
44_STEPS = re.compile(r"^steps/([0-9]+)$")
45
46
47def _get_test_context_instance(ctx: ItemGetValueContext) -> Any:
48    return f"{to_camel_case(ctx.item.uid[1:])}_Instance"
49
50
51def _get_test_context_type(ctx: ItemGetValueContext) -> Any:
52    return f"{to_camel_case(ctx.item.uid[1:])}_Context"
53
54
55def _get_test_run(ctx: ItemGetValueContext) -> Any:
56    return f"{to_camel_case(ctx.item.uid[1:])}_Run"
57
58
59class _Mapper(ItemMapper):
60    def __init__(self, item: Item):
61        super().__init__(item)
62        self._step = 0
63        self.add_get_value("glossary/term:/plural", get_value_plural)
64        self.add_get_value("interface/function:/name",
65                           get_value_doxygen_function)
66        self.add_get_value("interface/function:/params/name", get_value_params)
67        self.add_get_value("interface/group:/name", get_value_doxygen_group)
68        self.add_get_value("interface/macro:/name", get_value_doxygen_function)
69        self.add_get_value("interface/macro:/params/name", get_value_params)
70        self.add_get_value("interface/unspecified-function:/name",
71                           get_value_doxygen_function)
72        self.add_get_value(
73            "requirement/functional/action:/test-context-instance",
74            _get_test_context_instance)
75        self.add_get_value("requirement/functional/action:/test-context-type",
76                           _get_test_context_type)
77        self.add_get_value("requirement/functional/action:/test-run",
78                           _get_test_run)
79        self.add_get_value("test-case:/test-context-instance",
80                           _get_test_context_instance)
81        self.add_get_value("test-case:/test-context-type",
82                           _get_test_context_type)
83        self.add_get_value("test-case:/test-run", _get_test_run)
84
85    @property
86    def steps(self):
87        """ The count of test steps. """
88        return self._step
89
90    def reset(self):
91        """ Resets the test step counter. """
92        self._step = 0
93
94    def map(self,
95            identifier: str,
96            item: Optional[Item] = None,
97            prefix: Optional[str] = None) -> Tuple[Item, str, Any]:
98        if identifier == "step":
99            step = self._step
100            self._step = step + 1
101            return self._item, "step", str(step)
102        match = _STEPS.search(identifier)
103        if match:
104            inc = int(match.group(1))
105            self._step += inc
106            return self._item, "step", f"Accounts for {inc} test plan steps"
107        return super().map(identifier, item, prefix)
108
109
110def _add_ingroup(content: CContent, items: List["_TestItem"]) -> None:
111    content.add_ingroup([item.group_identifier for item in items])
112
113
114class _TestItem:
115    """ A test item with a default implementation for test cases. """
116
117    # pylint: disable=too-many-public-methods
118    def __init__(self, item: Item):
119        self._item = item
120        self._ident = to_camel_case(item.uid[1:])
121        self._context = f"{self._ident}_Context"
122        self._mapper = _Mapper(item)
123
124    def __getitem__(self, key: str):
125        return self._item[key]
126
127    @property
128    def item(self) -> Item:
129        """ Returns the item. """
130        return self._item
131
132    @property
133    def uid(self) -> str:
134        """ Returns the item UID. """
135        return self._item.uid
136
137    @property
138    def ident(self) -> str:
139        """ Returns the test identifier. """
140        return self._ident
141
142    @property
143    def context(self) -> str:
144        """ Returns the test case context type. """
145        return self._context
146
147    @property
148    def name(self) -> str:
149        """ Returns the name. """
150        return self._item.spec
151
152    @property
153    def includes(self) -> List[str]:
154        """ Returns the list of includes. """
155        return self._item["test-includes"]
156
157    @property
158    def local_includes(self) -> List[str]:
159        """ Returns the list of local includes. """
160        return self._item["test-local-includes"]
161
162    @property
163    def brief(self) -> str:
164        """ Returns the substituted brief description. """
165        return self.substitute_text(self["test-brief"])
166
167    @property
168    def description(self) -> str:
169        """ Returns the substituted description. """
170        return self.substitute_text(self["test-description"])
171
172    @property
173    def group_identifier(self) -> str:
174        """ Returns the group identifier. """
175        return f"RTEMSTestCase{self.ident}"
176
177    def substitute_code(self,
178                        text: Optional[str],
179                        prefix: Optional[str] = None) -> str:
180        """
181        Performs a variable substitution for code with an optional prefix.
182        """
183        return self._mapper.substitute(text, prefix=prefix)
184
185    def substitute_text(self,
186                        text: Optional[str],
187                        prefix: Optional[str] = None) -> str:
188        """
189        Performs a variable substitution for text with an optional prefix.
190        """
191        return self._mapper.substitute(text, prefix=prefix)
192
193    def add_test_case_description(
194            self, content: CContent,
195            test_case_to_suites: Dict[str, List["_TestItem"]]) -> None:
196        """ Adds the test case description. """
197        with content.defgroup_block(self.group_identifier, self.name):
198            try:
199                test_suites = test_case_to_suites[self.uid]
200            except KeyError as err:
201                msg = (f"the target file '{self['test-target']}' of "
202                       f"{self.item.spec} is not a source file of an item of "
203                       "type 'build/test-program'")
204                raise ValueError(msg) from err
205            _add_ingroup(content, test_suites)
206            content.add_brief_description(self.brief)
207            content.wrap(self.description)
208            self.add_test_case_action_description(content)
209            content.add("@{")
210
211    def add_test_case_action_description(self, content: CContent) -> None:
212        """ Adds the test case action description. """
213        actions = self["test-actions"]
214        if actions:
215            content.add("This test case performs the following actions:")
216            for action in actions:
217                content.wrap(self.substitute_text(action["action-brief"]),
218                             initial_indent="- ")
219                for check in action["checks"]:
220                    content.wrap(self.substitute_text(check["brief"]),
221                                 initial_indent="  - ",
222                                 subsequent_indent="    ")
223
224    def _add_test_case_actions(self, content: CContent) -> CContent:
225        actions = CContent()
226        for index, action in enumerate(self["test-actions"]):
227            method = f"{self._ident}_Action_{index}"
228            if self.context == "void":
229                args = []
230                params = []
231            else:
232                args = ["ctx"]
233                params = [f"{self.context} *ctx"]
234            actions.gap = False
235            actions.call_function(None, method, args)
236            with content.doxygen_block():
237                content.add_brief_description(
238                    self.substitute_text(action["action-brief"]))
239            content.gap = False
240            with content.function("static void", method, params):
241                content.add(self.substitute_code(action["action-code"]))
242                for check in action["checks"]:
243                    with content.comment_block():
244                        content.wrap(self.substitute_text(check["brief"]))
245                    content.append(self.substitute_text(check["code"]))
246        return actions
247
248    def _get_run_params(self, header: Optional[Dict[str, Any]]) -> List[str]:
249        if not header:
250            return []
251        return [
252            self.substitute_text(param["specifier"],
253                                 f"test-header/run-params[{index}]")
254            for index, param in enumerate(header["run-params"])
255        ]
256
257    def add_header_body(self, content: CContent, header: Dict[str,
258                                                              Any]) -> None:
259        """ Adds the test header body. """
260        content.add(self.substitute_code(header["code"]))
261        with content.doxygen_block():
262            content.add_brief_description("Runs the parameterized test case.")
263            content.add_param_description(header["run-params"])
264        content.gap = False
265        content.declare_function("void", f"{self.ident}_Run",
266                                 self._get_run_params(header))
267
268    def add_support_method(self,
269                           content: CContent,
270                           key: str,
271                           name: str,
272                           mandatory_code: Optional[GenericContent] = None,
273                           optional_code: Optional[GenericContent] = None,
274                           ret: str = "void",
275                           extra_params: Optional[List[str]] = None,
276                           extra_args: Optional[List[str]] = None,
277                           do_wrap: bool = True) -> str:
278        """ Adds a support method to the content. """
279
280        # pylint: disable=too-many-arguments
281        # pylint: disable=too-many-locals
282        info = self[key]
283        if not info and not mandatory_code:
284            return "NULL"
285        if extra_params is None:
286            extra_params = []
287        if extra_args is None:
288            extra_args = []
289        method = f"{self.ident}_{name}"
290        wrap = f"{method}_Wrap"
291        if info:
292            content.add_description_block(
293                self.substitute_text(info["brief"]),
294                self.substitute_text(info["description"]))
295            params = [f"{self.context} *ctx"] + extra_params
296            with content.function(f"static {ret}", method, params):
297                if not do_wrap:
298                    content.gap = False
299                    content.add(mandatory_code)
300                    content.gap = False
301                    content.add(optional_code)
302                content.add(self.substitute_code(info["code"]))
303        if not do_wrap:
304            assert info
305            return method
306        params = ["void *arg"] + extra_params
307        with content.function(f"static {ret}", wrap, params):
308            content.add([f"{self.context} *ctx;", "", "ctx = arg;"])
309            content.gap = False
310            content.add(mandatory_code)
311            content.gap = False
312            content.add(optional_code)
313            if info:
314                content.gap = False
315                ret_2 = None if ret == "void" else "return"
316                args = ["ctx"] + extra_args
317                content.call_function(ret_2, f"{method}", args)
318        return wrap
319
320    def add_function(self, content: CContent, key: str, name: str) -> None:
321        """
322        Adds a function with the name to the content if there is one defined
323        for the attribute key.
324        """
325        if self[key] is not None:
326            with content.function("static void", f"{self.ident}_{name}",
327                                  [f"{self.context} *ctx"]):
328                content.append(self.substitute_code(self[key]))
329
330    def add_default_context_members(self, content: CContent) -> None:
331        """ Adds the default context members to the content """
332        for param in self._get_run_params(self["test-header"]):
333            content.add_description_block(
334                "This member contains a copy of the corresponding "
335                f"{self.ident}_Run() parameter.", None)
336            content.add(f"{param.strip()};")
337
338    def add_context(self, content: CContent) -> str:
339        """ Adds the context to the content. """
340        content.add(self.substitute_code(self["test-context-support"]))
341        default_members = CContent()
342        with default_members.indent():
343            self.add_default_context_members(default_members)
344        if not self["test-context"] and not default_members.lines:
345            return "NULL"
346        with content.doxygen_block():
347            content.add_brief_description(
348                f"Test context for {self.name} test case.")
349        content.append("typedef struct {")
350        gap = False
351        with content.indent():
352            for info in self["test-context"]:
353                content.add_description_block(
354                    self.substitute_text(info["brief"]),
355                    self.substitute_text(info["description"]))
356                content.add(f"{info['member'].strip()};")
357            gap = content.gap
358        content.gap = gap
359        content.add(default_members)
360        content.append([
361            f"}} {self.context};", "", f"static {self.context}",
362            f"  {self.ident}_Instance;"
363        ])
364        return f"&{self.ident}_Instance"
365
366    def generate_header(self, base_directory: str, header: Dict[str,
367                                                                Any]) -> None:
368        """ Generates the test header. """
369        content = CContent()
370        content.register_license_and_copyrights_of_item(self._item)
371        content.prepend_spdx_license_identifier()
372        with content.file_block():
373            content.add_ingroup([self.group_identifier])
374        content.add_copyrights_and_licenses()
375        content.add_automatically_generated_warning()
376        with content.header_guard(os.path.basename(header["target"])):
377            content.add_includes(list(map(CInclude, header["includes"])))
378            content.add_includes(list(map(CInclude, header["local-includes"])),
379                                 local=True)
380            with content.extern_c():
381                with content.add_to_group(self.group_identifier):
382                    self.add_header_body(content, header)
383        content.write(os.path.join(base_directory, header["target"]))
384
385    def _add_fixture(self, content: CContent, instance: str) -> Optional[str]:
386        if instance == "NULL":
387            self._context = "void"
388            do_wrap = False
389        else:
390            do_wrap = True
391        setup = self.add_support_method(content,
392                                        "test-setup",
393                                        "Setup",
394                                        do_wrap=do_wrap)
395        stop = self.add_support_method(content,
396                                       "test-stop",
397                                       "Stop",
398                                       do_wrap=do_wrap)
399        teardown = self.add_support_method(content,
400                                           "test-teardown",
401                                           "Teardown",
402                                           do_wrap=do_wrap)
403        if all(ptr == "NULL" for ptr in [instance, setup, stop, teardown]):
404            return None
405        content.add([
406            f"static T_fixture {self.ident}_Fixture = {{",
407            f"  .setup = {setup},", f"  .stop = {stop},",
408            f"  .teardown = {teardown},", "  .scope = NULL,",
409            f"  .initial_context = {instance}", "};"
410        ])
411        return f"&{self.ident}_Fixture"
412
413    def assign_run_params(self, content: CContent, header: Dict[str,
414                                                                Any]) -> None:
415        """ Assigns the run parameters to the context.  """
416        if header["run-params"]:
417            content.add([f"ctx = &{self.ident}_Instance;"] + [
418                f"ctx->{param['name']} = {param['name']};"
419                for param in header["run-params"]
420            ])
421
422    def _add_runner_prologue_and_epilogue(self, content: CContent,
423                                          prologue: CContent,
424                                          epilogue: CContent,
425                                          fixture: str) -> None:
426        header = self["test-header"]
427        if self.context == "void":
428            result = None
429        else:
430            prologue.add(f"{self.context} *ctx;")
431            self.assign_run_params(prologue, header)
432            result = "ctx ="
433        if header["freestanding"]:
434            prologue.call_function(result, "T_case_begin",
435                                   [f"\"{self.ident}\"", fixture])
436            epilogue.add("T_case_end();")
437        else:
438            content.add(f"static T_fixture_node {self.ident}_Node;")
439            prologue.call_function(result, "T_push_fixture",
440                                   [f"&{self.ident}_Node", fixture])
441            epilogue.add("T_pop_fixture();")
442
443    def generate(self, content: CContent, base_directory: str,
444                 test_case_to_suites: Dict[str, List["_TestItem"]]) -> None:
445        """ Generates the content. """
446        self.add_test_case_description(content, test_case_to_suites)
447        instance = self.add_context(content)
448        content.add(self.substitute_code(self["test-support"]))
449        fixture = self._add_fixture(content, instance)
450        self._mapper.reset()
451        actions = self._add_test_case_actions(content)
452        header = self["test-header"]
453        prologue = CContent()
454        epilogue = CContent()
455        if header:
456            self.generate_header(base_directory, header)
457            ret = "void"
458            name = f"{self.ident}_Run"
459            params = self._get_run_params(header)
460            if self._mapper.steps > 0 and not fixture:
461                fixture = "&T_empty_fixture"
462            if fixture:
463                self._add_runner_prologue_and_epilogue(content, prologue,
464                                                       epilogue, fixture)
465            align = True
466        else:
467            ret = ""
468            params = [f"{self.ident}"]
469            if fixture:
470                params.append(fixture)
471                name = "T_TEST_CASE_FIXTURE"
472            else:
473                name = "T_TEST_CASE"
474            if self.context != "void":
475                prologue.add([
476                    f"{self.context} *ctx;", "", "ctx = T_fixture_context();"
477                ])
478            align = False
479            with content.function_block(
480                    f"void T_case_body_{self.ident}( void )"):
481                pass
482            content.gap = False
483        with content.function(ret, name, params, align=align):
484            content.add(prologue)
485            if self._mapper.steps > 0:
486                content.add(f"T_plan( {self._mapper.steps} );")
487            content.add(actions)
488            content.add(epilogue)
489        content.add("/** @} */")
490
491
492class _TestSuiteItem(_TestItem):
493    """ A test suite item. """
494    @property
495    def group_identifier(self) -> str:
496        return f"RTEMSTestSuite{self.ident}"
497
498    def generate(self, content: CContent, _base_directory: str,
499                 _test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
500        with content.defgroup_block(self.group_identifier, self.name):
501            content.add("@ingroup RTEMSTestSuites")
502            content.add_brief_description(self.brief)
503            content.wrap(self.description)
504            content.add("@{")
505        content.add(self.substitute_code(self["test-code"]))
506        content.add("/** @} */")
507
508
509_IdxToX = Tuple[Tuple[str, ...], ...]
510
511
512def _to_enum(prefix: str, conditions: List[Any]) -> _IdxToX:
513    return tuple(
514        tuple([f"{prefix}_{condition['name']}"] + [
515            f"{prefix}_{condition['name']}_{state['name']}"
516            for state in condition["states"]
517        ] + [f"{prefix}_{condition['name']}_NA"]) for condition in conditions)
518
519
520def _add_condition_enum(content: CContent, co_idx_to_enum: _IdxToX) -> None:
521    for enum in co_idx_to_enum:
522        content.add("typedef enum {")
523        with content.indent():
524            content.add(",\n".join(enum[1:]))
525        content.add(f"}} {enum[0]};")
526
527
528class _ActionRequirementTestItem(_TestItem):
529    """ An action requirement test item. """
530    def __init__(self, item: Item):
531        super().__init__(item)
532        self._mapper.add_get_value(("requirement/functional/action:"
533                                    "/pre-conditions/states/test-code/skip"),
534                                   self._skip_pre_condition)
535        self._pre_co_skip = {}  # type: Dict[int, bool]
536        self._pre_co_count = len(item["pre-conditions"])
537        self._pre_co_idx_to_enum = _to_enum(f"{self.ident}_Pre",
538                                            item["pre-conditions"])
539        self._post_co_idx_to_enum = _to_enum(f"{self.ident}_Post",
540                                             item["post-conditions"])
541        self._pci = "pcs"
542
543    def _add_pre_condition_descriptions(self, content: CContent) -> None:
544        for condition in self["pre-conditions"]:
545            content.add("static const char * const "
546                        f"{self.ident}_PreDesc_{condition['name']}[] = {{")
547            with content.indent():
548                content.add(",\n".join(
549                    itertools.chain((f"\"{state['name']}\""
550                                     for state in condition["states"]),
551                                    ["\"NA\""])))
552            content.add("};")
553        content.add("static const char * const * const "
554                    f"{self.ident}_PreDesc[] = {{")
555        with content.indent():
556            content.add(",\n".join([
557                f"{self.ident}_PreDesc_{condition['name']}"
558                for condition in self["pre-conditions"]
559            ] + ["NULL"]))
560        content.add("};")
561
562    def add_default_context_members(self, content: CContent) -> None:
563        super().add_default_context_members(content)
564        content.add("struct {")
565        with content.indent():
566            if self._pci == "pci":
567                content.add_description_block(
568                    "This member defines the pre-condition indices "
569                    "for the next action.", None)
570                content.add(f"size_t pci[ {self._pre_co_count} ];")
571            content.add_description_block(
572                "This member defines the pre-condition states "
573                "for the next action.", None)
574            content.add(f"size_t pcs[ {self._pre_co_count} ];")
575            content.add_description_block(
576                "If this member is true, then the test action "
577                "loop is executed.", None)
578            content.add("bool in_action_loop;")
579            content.add_description_block(
580                "This member contains the next transition map index.", None)
581            content.add("size_t index;")
582            content.add_description_block(
583                "This member contains the current transition map entry.", None)
584            content.add(f"{self.ident}_Entry entry;")
585            content.add_description_block(
586                "If this member is true, then the current transition "
587                "variant should be skipped.", None)
588            content.add("bool skip;")
589        content.append("} Map;")
590
591    def _add_fixture_scope(self, content: CContent) -> None:
592        params = ["void *arg", "char *buf", "size_t n"]
593        with content.function("static size_t", f"{self.ident}_Scope", params):
594            content.add([f"{self.context} *ctx;", "", "ctx = arg;"])
595            with content.condition("ctx->Map.in_action_loop"):
596                content.call_function(
597                    "return", "T_get_scope",
598                    [f"{self.ident}_PreDesc", "buf", "n", "ctx->Map.pcs"])
599            content.add("return 0;")
600
601    def _add_call(self, content: CContent, key: str, name: str) -> None:
602        if self[key] is not None:
603            content.gap = False
604            content.call_function(None, f"{self.ident}_{name}", ["ctx"])
605
606    def _add_skip(self, content: CContent) -> Any:
607        state_counts = [len(enum) - 2 for enum in self._pre_co_idx_to_enum]
608        weigths = [
609            str(
610                functools.reduce((lambda x, y: x * y),
611                                 state_counts[index + 1:], 1))
612            for index in range(self._pre_co_count)
613        ]
614        integer_type = get_integer_type(int(weigths[0]))
615        content.add(f"static const {integer_type} {self.ident}_Weights[] = {{")
616        with content.indent():
617            content.wrap(", ".join(weigths))
618        content.add("};")
619        with content.function("static void", f"{self.ident}_Skip",
620                              [f"{self.context} *ctx", "size_t index"]):
621            content.append("switch ( index + 1 ) {")
622            fall_through = "/* Fall through */"
623            with content.indent():
624                for index, enum in enumerate(self._pre_co_idx_to_enum[1:], 1):
625                    content.add(f"case {index}:")
626                    with content.indent():
627                        pci = f"ctx->Map.{self._pci}[ {index} ]"
628                        content.append(
629                            [f"{pci} = {enum[-1]} - 1;", fall_through])
630                content.lines[-1] = content.lines[-1].replace(
631                    fall_through, "break;")
632            content.append("}")
633
634    def _add_test_variant(self, content: CContent,
635                          transition_map: TransitionMap) -> None:
636        entry = "ctx->Map.entry"
637        for index in range(self._pre_co_count):
638            content.gap = False
639            state = f"ctx->Map.pcs[ {index} ]"
640            prepare = f"{self._pre_co_idx_to_enum[index][0]}_Prepare"
641            content.call_function(None, prepare, ["ctx", state])
642            if self._pre_co_skip.get(index, False):
643                with content.condition("ctx->Map.skip"):
644                    content.call_function(None, f"{self.ident}_Skip",
645                                          ["ctx", str(index)])
646                    content.append("return;")
647                content.add_blank_line()
648        self._add_call(content, "test-action", "Action")
649        for index, enum in enumerate(self._post_co_idx_to_enum):
650            content.gap = False
651            content.call_function(None, f"{enum[0]}_Check", [
652                "ctx", f"{entry}.{transition_map.get_post_entry_member(index)}"
653            ])
654
655    def _add_loop_body(self, content: CContent,
656                       transition_map: TransitionMap) -> None:
657        entry = "ctx->Map.entry"
658        content.call_function(f"{entry} =", f"{self.ident}_PopEntry", ["ctx"])
659        if transition_map.pre_co_summary[0]:
660            with content.condition(f"{entry}.Skip"):
661                content.append("continue;")
662            content.add_blank_line()
663        if transition_map.has_pre_co_not_applicable():
664            name = f"{self.ident}_SetPreConditionStates"
665            content.gap = False
666            content.call_function(None, name, ["ctx"])
667        self._add_call(content, "test-prepare", "Prepare")
668        content.gap = False
669        content.call_function(None, f"{self.ident}_TestVariant", ["ctx"])
670        self._add_call(content, "test-cleanup", "Cleanup")
671
672    def _add_for_loops(self, content: CContent, transition_map: TransitionMap,
673                       index: int) -> None:
674        if index < self._pre_co_count:
675            var = f"ctx->Map.{self._pci}[ {index} ]"
676            begin = self._pre_co_idx_to_enum[index][1]
677            end = self._pre_co_idx_to_enum[index][-1]
678            with content.for_loop(f"{var} = {begin}", f"{var} < {end}",
679                                  f"++{var}"):
680                self._add_for_loops(content, transition_map, index + 1)
681        else:
682            self._add_loop_body(content, transition_map)
683
684    def _add_set_pre_co_states(self, content: CContent,
685                               transition_map: TransitionMap) -> None:
686        ret = "static void"
687        name = f"{self.ident}_SetPreConditionStates"
688        params = [f"{self.context} *ctx"]
689        with content.function(ret, name, params, align=True):
690            entry = "ctx->Map.entry"
691            gap = False
692            for index, pre_co in enumerate(self._item["pre-conditions"]):
693                pcs_pci = f"ctx->Map.pcs[ {index} ] = ctx->Map.pci[ {index} ];"
694                if transition_map.pre_co_summary[index + 1]:
695                    is_na = f"{entry}.Pre_{pre_co['name']}_NA"
696                    with content.first_condition(is_na):
697                        enum_na = self._pre_co_idx_to_enum[index][-1]
698                        content.add(f"ctx->Map.pcs[ {index} ] = {enum_na};")
699                    with content.final_condition():
700                        content.add(pcs_pci)
701                    gap = True
702                else:
703                    content.gap = gap
704                    gap = False
705                    content.add(pcs_pci)
706
707    def _add_pop_entry(self, content: CContent) -> None:
708        ret = f"static inline {self.ident}_Entry"
709        name = f"{self.ident}_PopEntry"
710        params = [f"{self.context} *ctx"]
711        with content.function(ret, name, params, align=True):
712            content.add("size_t index;")
713            if self._pre_co_skip:
714                with content.first_condition("ctx->Map.skip"):
715                    content.add([
716                        "size_t i;", "", "ctx->Map.skip = false;", "index = 0;"
717                    ])
718                    with content.for_loop("i = 0", f"i < {self._pre_co_count}",
719                                          "++i"):
720                        content.append(f"index += {self.ident}_Weights[ i ]"
721                                       f" * ctx->Map.{self._pci}[ i ];")
722                with content.final_condition():
723                    content.add("index = ctx->Map.index;")
724                content.add("ctx->Map.index = index + 1;")
725            else:
726                content.add(
727                    ["index = ctx->Map.index;", "ctx->Map.index = index + 1;"])
728                content.gap = False
729            content.add([
730                f"return {self.ident}_Entries[",
731                f"  {self.ident}_Map[ index ]", "];"
732            ])
733
734    def _add_test_case(self, content: CContent, transition_map: TransitionMap,
735                       header: Dict[str, Any]) -> None:
736        if self._pre_co_skip:
737            self._add_skip(content)
738        self._add_pop_entry(content)
739        if transition_map.has_pre_co_not_applicable():
740            self._add_set_pre_co_states(content, transition_map)
741        with content.function("static void", f"{self.ident}_TestVariant",
742                              [f"{self.context} *ctx"]):
743            self._add_test_variant(content, transition_map)
744        fixture = f"{self.ident}_Fixture"
745        prologue = CContent()
746        epilogue = CContent()
747        map_members_initialization = [
748            "ctx->Map.in_action_loop = true;", "ctx->Map.index = 0;"
749        ]
750        if self._pre_co_skip:
751            map_members_initialization.append("ctx->Map.skip = false;")
752        if header:
753            content.add(f"static T_fixture_node {self.ident}_Node;")
754            ret = "void"
755            name = f"{self.ident}_Run"
756            params = self._get_run_params(header)
757            prologue.add([f"{self.context} *ctx;"])
758            self.assign_run_params(prologue, header)
759            prologue.call_function("ctx =", "T_push_fixture",
760                                   [f"&{self.ident}_Node", f"&{fixture}"])
761            prologue.append(map_members_initialization)
762            epilogue.add("T_pop_fixture();")
763            align = True
764        else:
765            with content.function_block(
766                    f"void T_case_body_{self.ident}( void )"):
767                pass
768            content.gap = False
769            ret = ""
770            name = "T_TEST_CASE_FIXTURE"
771            params = [f"{self.ident}", f"&{fixture}"]
772            prologue.add([
773                f"{self.context} *ctx;",
774                "",
775                "ctx = T_fixture_context();",
776            ] + map_members_initialization)
777            align = False
778        with content.function(ret, name, params, align=align):
779            content.add(prologue)
780            self._add_for_loops(content, transition_map, 0)
781            content.add(epilogue)
782
783    def _add_handler(self, content: CContent, conditions: str,
784                     co_idx_to_enum: _IdxToX, action: str) -> None:
785        for co_idx, condition in enumerate(self[conditions]):
786            enum = co_idx_to_enum[co_idx]
787            handler = f"{enum[0]}_{action}"
788            params = [f"{self.context} *ctx", f"{enum[0]} state"]
789            with content.function("static void", handler, params):
790                content.add(self.substitute_code(condition["test-prologue"]))
791                content.add("switch ( state ) {")
792                with content.indent():
793                    for state_index, state in enumerate(condition["states"]):
794                        content.add(f"case {enum[state_index + 1]}: {{")
795                        prefix = (f"/{conditions}[{co_idx}]"
796                                  f"/states[{state_index}]/test-code")
797                        with content.indent():
798                            with content.comment_block():
799                                content.wrap(
800                                    self.substitute_text(state["text"]))
801                            content.append(
802                                self.substitute_code(state["test-code"],
803                                                     prefix))
804                            content.append("break;")
805                        content.add("}")
806                    content.add(f"case {enum[-1]}:")
807                    with content.indent():
808                        content.append("break;")
809                content.add("}")
810                content.add(self.substitute_code(condition["test-epilogue"]))
811
812    def add_test_case_action_description(self, _content: CContent) -> None:
813        pass
814
815    def add_header_body(self, content: CContent, header: Dict[str,
816                                                              Any]) -> None:
817        _add_condition_enum(content, self._pre_co_idx_to_enum)
818        _add_condition_enum(content, self._post_co_idx_to_enum)
819        super().add_header_body(content, header)
820
821    def generate(self, content: CContent, base_directory: str,
822                 test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
823        self.add_test_case_description(content, test_case_to_suites)
824        header = self["test-header"]
825        if header:
826            self.generate_header(base_directory, header)
827        else:
828            _add_condition_enum(content, self._pre_co_idx_to_enum)
829            _add_condition_enum(content, self._post_co_idx_to_enum)
830        transition_map = TransitionMap(self.item)
831        if transition_map.has_pre_co_not_applicable():
832            self._pci = "pci"
833        transition_map.add_map_entry_type(content, self.ident)
834        instance = self.add_context(content)
835        self._add_pre_condition_descriptions(content)
836        content.add(self.substitute_code(self["test-support"]))
837        self._add_handler(content, "pre-conditions", self._pre_co_idx_to_enum,
838                          "Prepare")
839        self._add_handler(content, "post-conditions",
840                          self._post_co_idx_to_enum, "Check")
841        optional_code = "ctx->Map.in_action_loop = false;"
842        setup = self.add_support_method(content,
843                                        "test-setup",
844                                        "Setup",
845                                        optional_code=optional_code)
846        stop = self.add_support_method(content,
847                                       "test-stop",
848                                       "Stop",
849                                       optional_code=optional_code)
850        teardown = self.add_support_method(content,
851                                           "test-teardown",
852                                           "Teardown",
853                                           optional_code=optional_code)
854        self.add_function(content, "test-prepare", "Prepare")
855        self.add_function(content, "test-action", "Action")
856        self.add_function(content, "test-cleanup", "Cleanup")
857        transition_map.add_map(content, self.ident)
858        self._add_fixture_scope(content)
859        content.add([
860            f"static T_fixture {self.ident}_Fixture = {{",
861            f"  .setup = {setup},", f"  .stop = {stop},",
862            f"  .teardown = {teardown},", f"  .scope = {self.ident}_Scope,",
863            f"  .initial_context = {instance}", "};"
864        ])
865        self._add_test_case(content, transition_map, header)
866        content.add("/** @} */")
867
868    def _skip_pre_condition(self, ctx: ItemGetValueContext) -> Any:
869        """ Adds code to skip the current pre-condition state. """
870        index = int(ctx.path.split("]")[0].split("[")[1])
871        self._pre_co_skip[index] = True
872        return "ctx->Map.skip = true;"
873
874
875class _RuntimeMeasurementRequestItem(_TestItem):
876    """ A runtime measurement request item. """
877    def __init__(self, item: Item, context: str):
878        super().__init__(item)
879        self._context = context
880
881
882def _add_call_method(content: CContent, name: str) -> None:
883    if name != "NULL":
884        content.gap = False
885        content.call_function(None, name, ["ctx"])
886
887
888class _RuntimeMeasurementTestItem(_TestItem):
889    """ A runtime measurement test item. """
890    def add_test_case_action_description(self, _content: CContent) -> None:
891        pass
892
893    def add_default_context_members(self, content: CContent) -> None:
894        content.add_description_block(
895            "This member references the measure runtime context.", None)
896        content.add("T_measure_runtime_context *context;")
897        content.add_description_block(
898            "This member provides the measure runtime request.", None)
899        content.add("T_measure_runtime_request request;")
900
901    def _add_requests(self, content: CContent) -> CContent:
902        requests = CContent()
903        prepare = self.add_support_method(content,
904                                          "test-prepare",
905                                          "Prepare",
906                                          do_wrap=False)
907        cleanup = self.add_support_method(content,
908                                          "test-cleanup",
909                                          "Cleanup",
910                                          do_wrap=False)
911        for item in self.item.children("runtime-measurement-request"):
912            req = _RuntimeMeasurementRequestItem(item, self.context)
913            requests.add_blank_line()
914            _add_call_method(requests, prepare)
915            name = req.add_support_method(content,
916                                          "test-prepare",
917                                          "Prepare",
918                                          do_wrap=False)
919            _add_call_method(requests, name)
920            name = req.add_support_method(content, "test-setup", "Setup")
921            requests.append([
922                f"ctx->request.name = \"{req.ident}\";",
923                f"ctx->request.setup = {name};"
924            ])
925            name = req.add_support_method(content, "test-body", "Body")
926            requests.append([f"ctx->request.body = {name};"])
927            extra_params = [
928                "T_ticks *delta", "uint32_t tic", "uint32_t toc",
929                "unsigned int retry"
930            ]
931            extra_args = ["delta", "tic", "toc", "retry"]
932            name = req.add_support_method(content,
933                                          "test-teardown",
934                                          "Teardown",
935                                          ret="bool",
936                                          extra_params=extra_params,
937                                          extra_args=extra_args)
938            requests.append([f"ctx->request.teardown = {name};"])
939            requests.gap = False
940            requests.call_function(None, "T_measure_runtime",
941                                   ["ctx->context", "&ctx->request"])
942            name = req.add_support_method(content,
943                                          "test-cleanup",
944                                          "Cleanup",
945                                          do_wrap=False)
946            _add_call_method(requests, name)
947            _add_call_method(requests, cleanup)
948        return requests
949
950    def generate(self, content: CContent, base_directory: str,
951                 test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
952        self.add_test_case_description(content, test_case_to_suites)
953        instance = self.add_context(content)
954        content.add(self.substitute_code(self["test-support"]))
955        setup = f"{self.ident}_Setup_Context"
956        with content.function("static void", setup, [f"{self.context} *ctx"]):
957            content.add([
958                "T_measure_runtime_config config;",
959                "",
960                "memset( &config, 0, sizeof( config ) );",
961                f"config.sample_count = {self['params']['sample-count']};",
962                "ctx->request.arg = ctx;",
963                "ctx->request.flags = T_MEASURE_RUNTIME_REPORT_SAMPLES;",
964                "ctx->context = T_measure_runtime_create( &config );",
965                "T_assert_not_null( ctx->context );",
966            ])
967        setup = self.add_support_method(content,
968                                        "test-setup",
969                                        "Setup",
970                                        mandatory_code=f"{setup}( ctx );")
971        stop = self.add_support_method(content, "test-stop", "Stop")
972        teardown = self.add_support_method(content, "test-teardown",
973                                           "Teardown")
974        content.add([
975            f"static T_fixture {self.ident}_Fixture = {{",
976            f"  .setup = {setup},", f"  .stop = {stop},",
977            f"  .teardown = {teardown},", "  .scope = NULL,",
978            f"  .initial_context = {instance}", "};"
979        ])
980        requests = self._add_requests(content)
981        with content.function_block(f"void T_case_body_{self.ident}( void )"):
982            pass
983        content.gap = False
984        ret = ""
985        name = "T_TEST_CASE_FIXTURE"
986        params = [f"{self.ident}", f"&{self.ident}_Fixture"]
987        with content.function(ret, name, params, align=False):
988            content.add([
989                f"{self.context} *ctx;",
990                "",
991                "ctx = T_fixture_context();",
992            ])
993            content.append(requests)
994        content.add("/** @} */")
995
996
997class _SourceFile:
998    """ A test source file. """
999    def __init__(self, filename: str):
1000        """ Initializes a test source file. """
1001        self._file = filename
1002        self._test_suites = []  # type: List[_TestItem]
1003        self._test_cases = []  # type: List[_TestItem]
1004
1005    @property
1006    def test_suites(self) -> List[_TestItem]:
1007        """ The test suites of the source file. """
1008        return self._test_suites
1009
1010    @property
1011    def test_cases(self) -> List[_TestItem]:
1012        """ The test cases of the source file. """
1013        return self._test_cases
1014
1015    def add_test_suite(self, item: Item) -> None:
1016        """ Adds a test suite to the source file. """
1017        self._test_suites.append(_TestSuiteItem(item))
1018
1019    def add_test_case(self, item: Item) -> None:
1020        """ Adds a test case to the source file. """
1021        self._test_cases.append(_TestItem(item))
1022
1023    def add_action_requirement_test(self, item: Item) -> None:
1024        """ Adds an action requirement test to the source file. """
1025        self._test_cases.append(_ActionRequirementTestItem(item))
1026
1027    def add_runtime_measurement_test(self, item: Item) -> None:
1028        """ Adds a runtime measurement test to the source file. """
1029        self._test_cases.append(_RuntimeMeasurementTestItem(item))
1030
1031    def generate(self, base_directory: str,
1032                 test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
1033        """
1034        Generates the source file and the corresponding build specification.
1035        """
1036        content = CContent()
1037        includes = []  # type: List[CInclude]
1038        local_includes = []  # type: List[CInclude]
1039        for item in itertools.chain(self._test_suites, self._test_cases):
1040            includes.extend(map(CInclude, item.includes))
1041            local_includes.extend(map(CInclude, item.local_includes))
1042            content.register_license_and_copyrights_of_item(item.item)
1043        content.prepend_spdx_license_identifier()
1044        with content.file_block():
1045            _add_ingroup(content, self._test_suites)
1046            _add_ingroup(content, self._test_cases)
1047        content.add_copyrights_and_licenses()
1048        content.add_automatically_generated_warning()
1049        content.add_have_config()
1050        content.add_includes(includes)
1051        content.add_includes(local_includes, local=True)
1052        content.add_includes([CInclude("rtems/test.h")])
1053        for item in sorted(self._test_cases, key=lambda x: x.name):
1054            item.generate(content, base_directory, test_case_to_suites)
1055        for item in sorted(self._test_suites, key=lambda x: x.name):
1056            item.generate(content, base_directory, test_case_to_suites)
1057        content.write(os.path.join(base_directory, self._file))
1058
1059
1060def _gather_build_source_files(item: Item, files: List[str]):
1061    for parent in item.parents("build-dependency"):
1062        _gather_build_source_files(parent, files)
1063    files.extend(item.data.get("source", []))
1064
1065
1066class _TestProgram:
1067    """ A test program. """
1068    def __init__(self, item: Item):
1069        """ Initializes a test program. """
1070        self._item = item
1071        self._source_files = []  # type: List[_SourceFile]
1072        self._build_source_files = []  # type: List[str]
1073        _gather_build_source_files(item, self._build_source_files)
1074
1075    @property
1076    def source_files(self) -> List[_SourceFile]:
1077        """ The source files of the test program. """
1078        return self._source_files
1079
1080    def add_source_files(self, source_files: Dict[str, _SourceFile]) -> None:
1081        """
1082        Adds the source files of the test program which are present in the
1083        source file map.
1084        """
1085        for filename in self._build_source_files:
1086            source_file = source_files.get(filename, None)
1087            if source_file is not None:
1088                self._source_files.append(source_file)
1089
1090
1091def _get_source_file(filename: str,
1092                     source_files: Dict[str, _SourceFile]) -> _SourceFile:
1093    return source_files.setdefault(filename, _SourceFile(filename))
1094
1095
1096def _gather_action_requirement_test(
1097        item: Item, source_files: Dict[str, _SourceFile],
1098        _test_programs: List[_TestProgram]) -> None:
1099    src = _get_source_file(item["test-target"], source_files)
1100    src.add_action_requirement_test(item)
1101
1102
1103def _gather_runtime_measurement_test(
1104        item: Item, source_files: Dict[str, _SourceFile],
1105        _test_programs: List[_TestProgram]) -> None:
1106    src = _get_source_file(item["test-target"], source_files)
1107    src.add_runtime_measurement_test(item)
1108
1109
1110def _gather_test_case(item: Item, source_files: Dict[str, _SourceFile],
1111                      _test_programs: List[_TestProgram]) -> None:
1112    src = _get_source_file(item["test-target"], source_files)
1113    src.add_test_case(item)
1114
1115
1116def _gather_test_program(item: Item, _source_files: Dict[str, _SourceFile],
1117                         test_programs: List[_TestProgram]) -> None:
1118    test_programs.append(_TestProgram(item))
1119
1120
1121def _gather_test_suite(item: Item, source_files: Dict[str, _SourceFile],
1122                       _test_programs: List[_TestProgram]) -> None:
1123    src = _get_source_file(item["test-target"], source_files)
1124    src.add_test_suite(item)
1125
1126
1127def _gather_default(_item: Item, _source_files: Dict[str, _SourceFile],
1128                    _test_programs: List[_TestProgram]) -> None:
1129    pass
1130
1131
1132_GATHER = {
1133    "build/test-program": _gather_test_program,
1134    "requirement/functional/action": _gather_action_requirement_test,
1135    "runtime-measurement-test": _gather_runtime_measurement_test,
1136    "test-case": _gather_test_case,
1137    "test-suite": _gather_test_suite,
1138}
1139
1140
1141def generate(config: dict,
1142             item_cache: ItemCache,
1143             targets: Optional[List[str]] = None) -> None:
1144    """
1145    Generates source files and build specification items for validation test
1146    suites and test cases according to the configuration.
1147
1148    :param config: A dictionary with configuration entries.
1149    :param item_cache: The specification item cache containing the validation
1150                       test suites and test cases.
1151    """
1152    source_files = {}  # type: Dict[str, _SourceFile]
1153    test_programs = []  # type: List[_TestProgram]
1154    for item in item_cache.all.values():
1155        _GATHER.get(item.type, _gather_default)(item, source_files,
1156                                                test_programs)
1157
1158    test_case_to_suites = {}  # type: Dict[str, List[_TestItem]]
1159    for test_program in test_programs:
1160        test_program.add_source_files(source_files)
1161        test_suites = []  # type: List[_TestItem]
1162        for source_file in test_program.source_files:
1163            test_suites.extend(source_file.test_suites)
1164        for source_file in test_program.source_files:
1165            for test_case in source_file.test_cases:
1166                test_case_to_suites.setdefault(test_case.uid,
1167                                               []).extend(test_suites)
1168
1169    if not targets:
1170        for src in source_files.values():
1171            src.generate(config["base-directory"], test_case_to_suites)
1172    else:
1173        for target in targets:
1174            source_files[target].generate(config["base-directory"],
1175                                          test_case_to_suites)
Note: See TracBrowser for help on using the repository browser.