source: rtems-central/rtemsspec/validation.py @ c0d8670

Last change on this file since c0d8670 was c0d8670, checked in by Sebastian Huber <sebastian.huber@…>, on 11/24/21 at 13:11:58

validation: Add optional begin/end time point

  • Property mode set to 100644
File size: 50.7 KB
Line 
1# SPDX-License-Identifier: BSD-2-Clause
2""" This module provides functions for the generation of validation tests. """
3
4# Copyright (C) 2020, 2021 embedded brains GmbH (http://www.embedded-brains.de)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25# POSSIBILITY OF SUCH DAMAGE.
26
27# pylint: disable=too-many-lines
28
29import itertools
30import functools
31import os
32import re
33from typing import Any, Dict, List, Optional, Tuple
34
35from rtemsspec.content import CContent, CInclude, \
36    GenericContent, get_integer_type, get_value_params, get_value_plural, \
37    get_value_doxygen_group, get_value_doxygen_function, to_camel_case
38from rtemsspec.items import Item, ItemCache, \
39    ItemGetValueContext, ItemMapper
40from rtemsspec.transitionmap import TransitionMap
41
42ItemMap = Dict[str, Item]
43
44_STEPS = re.compile(r"^steps/([0-9]+)$")
45
46
47def _get_test_context_instance(ctx: ItemGetValueContext) -> Any:
48    return f"{to_camel_case(ctx.item.uid[1:])}_Instance"
49
50
51def _get_test_context_type(ctx: ItemGetValueContext) -> Any:
52    return f"{to_camel_case(ctx.item.uid[1:])}_Context"
53
54
55def _get_test_run(ctx: ItemGetValueContext) -> Any:
56    return f"{to_camel_case(ctx.item.uid[1:])}_Run"
57
58
59class _Mapper(ItemMapper):
60    def __init__(self, item: Item):
61        super().__init__(item)
62        self._step = 0
63        self.add_get_value("glossary/term:/plural", get_value_plural)
64        self.add_get_value("interface/function:/name",
65                           get_value_doxygen_function)
66        self.add_get_value("interface/function:/params/name", get_value_params)
67        self.add_get_value("interface/group:/name", get_value_doxygen_group)
68        self.add_get_value("interface/macro:/name", get_value_doxygen_function)
69        self.add_get_value("interface/macro:/params/name", get_value_params)
70        self.add_get_value("interface/unspecified-function:/name",
71                           get_value_doxygen_function)
72        self.add_get_value(
73            "requirement/functional/action:/test-context-instance",
74            _get_test_context_instance)
75        self.add_get_value("requirement/functional/action:/test-context-type",
76                           _get_test_context_type)
77        self.add_get_value("requirement/functional/action:/test-run",
78                           _get_test_run)
79        self.add_get_value("runtime-measurement-test:/test-context-type",
80                           _get_test_context_type)
81        self.add_get_value("test-case:/test-context-instance",
82                           _get_test_context_instance)
83        self.add_get_value("test-case:/test-context-type",
84                           _get_test_context_type)
85        self.add_get_value("test-case:/test-run", _get_test_run)
86
87    @property
88    def steps(self):
89        """ The count of test steps. """
90        return self._step
91
92    def reset(self):
93        """ Resets the test step counter. """
94        self._step = 0
95
96    def map(self,
97            identifier: str,
98            item: Optional[Item] = None,
99            prefix: Optional[str] = None) -> Tuple[Item, str, Any]:
100        if identifier == "step":
101            step = self._step
102            self._step = step + 1
103            return self._item, "step", str(step)
104        match = _STEPS.search(identifier)
105        if match:
106            inc = int(match.group(1))
107            self._step += inc
108            return self._item, "step", f"Accounts for {inc} test plan steps"
109        return super().map(identifier, item, prefix)
110
111
112def _add_ingroup(content: CContent, items: List["_TestItem"]) -> None:
113    content.add_ingroup([item.group_identifier for item in items])
114
115
116class _TestItem:
117    """ A test item with a default implementation for test cases. """
118
119    # pylint: disable=too-many-public-methods
120    def __init__(self, item: Item):
121        self._item = item
122        self._ident = to_camel_case(item.uid[1:])
123        self._context = f"{self._ident}_Context"
124        self._mapper = _Mapper(item)
125
126    def __getitem__(self, key: str):
127        return self._item[key]
128
129    @property
130    def item(self) -> Item:
131        """ Returns the item. """
132        return self._item
133
134    @property
135    def uid(self) -> str:
136        """ Returns the item UID. """
137        return self._item.uid
138
139    @property
140    def ident(self) -> str:
141        """ Returns the test identifier. """
142        return self._ident
143
144    @property
145    def context(self) -> str:
146        """ Returns the test case context type. """
147        return self._context
148
149    @property
150    def name(self) -> str:
151        """ Returns the name. """
152        return self._item.spec
153
154    @property
155    def includes(self) -> List[str]:
156        """ Returns the list of includes. """
157        return self._item["test-includes"]
158
159    @property
160    def local_includes(self) -> List[str]:
161        """ Returns the list of local includes. """
162        return self._item["test-local-includes"]
163
164    @property
165    def brief(self) -> str:
166        """ Returns the substituted brief description. """
167        return self.substitute_text(self["test-brief"])
168
169    @property
170    def description(self) -> str:
171        """ Returns the substituted description. """
172        return self.substitute_text(self["test-description"])
173
174    @property
175    def group_identifier(self) -> str:
176        """ Returns the group identifier. """
177        return f"RTEMSTestCase{self.ident}"
178
179    def substitute_code(self,
180                        text: Optional[str],
181                        prefix: Optional[str] = None) -> str:
182        """
183        Performs a variable substitution for code with an optional prefix.
184        """
185        return self._mapper.substitute(text, prefix=prefix)
186
187    def substitute_text(self,
188                        text: Optional[str],
189                        prefix: Optional[str] = None) -> str:
190        """
191        Performs a variable substitution for text with an optional prefix.
192        """
193        return self._mapper.substitute(text, prefix=prefix)
194
195    def add_test_case_description(
196            self, content: CContent,
197            test_case_to_suites: Dict[str, List["_TestItem"]]) -> None:
198        """ Adds the test case description. """
199        with content.defgroup_block(self.group_identifier, self.name):
200            try:
201                test_suites = test_case_to_suites[self.uid]
202            except KeyError as err:
203                msg = (f"the target file '{self['test-target']}' of "
204                       f"{self.item.spec} is not a source file of an item of "
205                       "type 'build/test-program'")
206                raise ValueError(msg) from err
207            _add_ingroup(content, test_suites)
208            content.add_brief_description(self.brief)
209            content.wrap(self.description)
210            self.add_test_case_action_description(content)
211            content.add("@{")
212
213    def add_test_case_action_description(self, content: CContent) -> None:
214        """ Adds the test case action description. """
215        actions = self["test-actions"]
216        if actions:
217            content.add("This test case performs the following actions:")
218            for action in actions:
219                content.wrap(self.substitute_text(action["action-brief"]),
220                             initial_indent="- ")
221                for check in action["checks"]:
222                    content.wrap(self.substitute_text(check["brief"]),
223                                 initial_indent="  - ",
224                                 subsequent_indent="    ")
225
226    def _add_test_case_actions(self, content: CContent) -> CContent:
227        actions = CContent()
228        for index, action in enumerate(self["test-actions"]):
229            method = f"{self._ident}_Action_{index}"
230            if self.context == "void":
231                args = []
232                params = []
233            else:
234                args = ["ctx"]
235                params = [f"{self.context} *ctx"]
236            actions.gap = False
237            actions.call_function(None, method, args)
238            with content.doxygen_block():
239                content.add_brief_description(
240                    self.substitute_text(action["action-brief"]))
241            content.gap = False
242            with content.function("static void", method, params):
243                content.add(self.substitute_code(action["action-code"]))
244                for check in action["checks"]:
245                    with content.comment_block():
246                        content.wrap(self.substitute_text(check["brief"]))
247                    content.append(self.substitute_text(check["code"]))
248        return actions
249
250    def _get_run_params(self, header: Optional[Dict[str, Any]]) -> List[str]:
251        if not header:
252            return []
253        return [
254            self.substitute_text(param["specifier"],
255                                 f"test-header/run-params[{index}]")
256            for index, param in enumerate(header["run-params"])
257        ]
258
259    def add_header_body(self, content: CContent, header: Dict[str,
260                                                              Any]) -> None:
261        """ Adds the test header body. """
262        content.add(self.substitute_code(header["code"]))
263        with content.doxygen_block():
264            content.add_brief_description("Runs the parameterized test case.")
265            content.add_param_description(header["run-params"])
266        content.gap = False
267        content.declare_function("void", f"{self.ident}_Run",
268                                 self._get_run_params(header))
269
270    def add_support_method(self,
271                           content: CContent,
272                           key: str,
273                           name: str,
274                           mandatory_code: Optional[GenericContent] = None,
275                           optional_code: Optional[GenericContent] = None,
276                           ret: str = "void",
277                           extra_params: Optional[List[str]] = None,
278                           extra_args: Optional[List[str]] = None,
279                           do_wrap: bool = True) -> str:
280        """ Adds a support method to the content. """
281
282        # pylint: disable=too-many-arguments
283        # pylint: disable=too-many-locals
284        info = self[key]
285        if not info and not mandatory_code:
286            return "NULL"
287        if extra_params is None:
288            extra_params = []
289        if extra_args is None:
290            extra_args = []
291        method = f"{self.ident}_{name}"
292        wrap = f"{method}_Wrap"
293        if info:
294            content.add_description_block(
295                self.substitute_text(info["brief"]),
296                self.substitute_text(info["description"]))
297            params = [f"{self.context} *ctx"] + extra_params
298            with content.function(f"static {ret}", method, params):
299                if not do_wrap:
300                    content.gap = False
301                    content.add(mandatory_code)
302                    content.gap = False
303                    content.add(optional_code)
304                content.add(self.substitute_code(info["code"]))
305        if not do_wrap:
306            assert info
307            return method
308        params = ["void *arg"] + extra_params
309        with content.function(f"static {ret}", wrap, params):
310            content.add([f"{self.context} *ctx;", "", "ctx = arg;"])
311            content.gap = False
312            content.add(mandatory_code)
313            content.gap = False
314            content.add(optional_code)
315            if info:
316                content.gap = False
317                ret_2 = None if ret == "void" else "return"
318                args = ["ctx"] + extra_args
319                content.call_function(ret_2, f"{method}", args)
320        return wrap
321
322    def add_function(self, content: CContent, key: str, name: str) -> None:
323        """
324        Adds a function with the name to the content if there is one defined
325        for the attribute key.
326        """
327        if self[key] is not None:
328            with content.function("static void", f"{self.ident}_{name}",
329                                  [f"{self.context} *ctx"]):
330                content.append(self.substitute_code(self[key]))
331
332    def add_default_context_members(self, content: CContent) -> None:
333        """ Adds the default context members to the content """
334        for param in self._get_run_params(self["test-header"]):
335            content.add_description_block(
336                "This member contains a copy of the corresponding "
337                f"{self.ident}_Run() parameter.", None)
338            content.add(f"{param.strip()};")
339
340    def add_context(self, content: CContent) -> str:
341        """ Adds the context to the content. """
342        content.add(self.substitute_code(self["test-context-support"]))
343        default_members = CContent()
344        with default_members.indent():
345            self.add_default_context_members(default_members)
346        if not self["test-context"] and not default_members.lines:
347            return "NULL"
348        with content.doxygen_block():
349            content.add_brief_description(
350                f"Test context for {self.name} test case.")
351        content.append("typedef struct {")
352        gap = False
353        with content.indent():
354            for info in self["test-context"]:
355                content.add_description_block(
356                    self.substitute_text(info["brief"]),
357                    self.substitute_text(info["description"]))
358                content.add(f"{info['member'].strip()};")
359            gap = content.gap
360        content.gap = gap
361        content.add(default_members)
362        content.append([
363            f"}} {self.context};", "", f"static {self.context}",
364            f"  {self.ident}_Instance;"
365        ])
366        return f"&{self.ident}_Instance"
367
368    def generate_header(self, base_directory: str, header: Dict[str,
369                                                                Any]) -> None:
370        """ Generates the test header. """
371        content = CContent()
372        content.register_license_and_copyrights_of_item(self._item)
373        content.prepend_spdx_license_identifier()
374        with content.file_block():
375            content.add_ingroup([self.group_identifier])
376        content.add_copyrights_and_licenses()
377        content.add_automatically_generated_warning()
378        with content.header_guard(os.path.basename(header["target"])):
379            content.add_includes(list(map(CInclude, header["includes"])))
380            content.add_includes(list(map(CInclude, header["local-includes"])),
381                                 local=True)
382            with content.extern_c():
383                with content.add_to_group(self.group_identifier):
384                    self.add_header_body(content, header)
385        content.write(os.path.join(base_directory, header["target"]))
386
387    def _add_fixture(self, content: CContent, instance: str) -> Optional[str]:
388        if instance == "NULL":
389            self._context = "void"
390            do_wrap = False
391        else:
392            do_wrap = True
393        setup = self.add_support_method(content,
394                                        "test-setup",
395                                        "Setup",
396                                        do_wrap=do_wrap)
397        stop = self.add_support_method(content,
398                                       "test-stop",
399                                       "Stop",
400                                       do_wrap=do_wrap)
401        teardown = self.add_support_method(content,
402                                           "test-teardown",
403                                           "Teardown",
404                                           do_wrap=do_wrap)
405        if all(ptr == "NULL" for ptr in [instance, setup, stop, teardown]):
406            return None
407        content.add([
408            f"static T_fixture {self.ident}_Fixture = {{",
409            f"  .setup = {setup},", f"  .stop = {stop},",
410            f"  .teardown = {teardown},", "  .scope = NULL,",
411            f"  .initial_context = {instance}", "};"
412        ])
413        return f"&{self.ident}_Fixture"
414
415    def assign_run_params(self, content: CContent, header: Dict[str,
416                                                                Any]) -> None:
417        """ Assigns the run parameters to the context.  """
418        if header["run-params"]:
419            content.add([f"ctx = &{self.ident}_Instance;"] + [
420                f"ctx->{param['name']} = {param['name']};"
421                for param in header["run-params"]
422            ])
423
424    def _add_runner_prologue_and_epilogue(self, content: CContent,
425                                          prologue: CContent,
426                                          epilogue: CContent,
427                                          fixture: str) -> None:
428        header = self["test-header"]
429        if self.context == "void":
430            result = None
431        else:
432            prologue.add(f"{self.context} *ctx;")
433            self.assign_run_params(prologue, header)
434            result = "ctx ="
435        if header["freestanding"]:
436            prologue.call_function(result, "T_case_begin",
437                                   [f"\"{self.ident}\"", fixture])
438            epilogue.add("T_case_end();")
439        else:
440            content.add(f"static T_fixture_node {self.ident}_Node;")
441            prologue.call_function(result, "T_push_fixture",
442                                   [f"&{self.ident}_Node", fixture])
443            epilogue.add("T_pop_fixture();")
444
445    def generate(self, content: CContent, base_directory: str,
446                 test_case_to_suites: Dict[str, List["_TestItem"]]) -> None:
447        """ Generates the content. """
448        self.add_test_case_description(content, test_case_to_suites)
449        instance = self.add_context(content)
450        content.add(self.substitute_code(self["test-support"]))
451        fixture = self._add_fixture(content, instance)
452        self._mapper.reset()
453        actions = self._add_test_case_actions(content)
454        header = self["test-header"]
455        prologue = CContent()
456        epilogue = CContent()
457        if header:
458            self.generate_header(base_directory, header)
459            ret = "void"
460            name = f"{self.ident}_Run"
461            params = self._get_run_params(header)
462            if self._mapper.steps > 0 and not fixture:
463                fixture = "&T_empty_fixture"
464            if fixture:
465                self._add_runner_prologue_and_epilogue(content, prologue,
466                                                       epilogue, fixture)
467            align = True
468        else:
469            ret = ""
470            params = [f"{self.ident}"]
471            if fixture:
472                params.append(fixture)
473                name = "T_TEST_CASE_FIXTURE"
474            else:
475                name = "T_TEST_CASE"
476            if self.context != "void":
477                prologue.add([
478                    f"{self.context} *ctx;", "", "ctx = T_fixture_context();"
479                ])
480            align = False
481            with content.function_block(
482                    f"void T_case_body_{self.ident}( void )"):
483                pass
484            content.gap = False
485        with content.function(ret, name, params, align=align):
486            content.add(prologue)
487            if self._mapper.steps > 0:
488                content.add(f"T_plan( {self._mapper.steps} );")
489            content.add(actions)
490            content.add(epilogue)
491        content.add("/** @} */")
492
493
494class _TestSuiteItem(_TestItem):
495    """ A test suite item. """
496    @property
497    def group_identifier(self) -> str:
498        return f"RTEMSTestSuite{self.ident}"
499
500    def generate(self, content: CContent, _base_directory: str,
501                 _test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
502        with content.defgroup_block(self.group_identifier, self.name):
503            content.add("@ingroup RTEMSTestSuites")
504            content.add_brief_description(self.brief)
505            content.wrap(self.description)
506            content.add("@{")
507        content.add(self.substitute_code(self["test-code"]))
508        content.add("/** @} */")
509
510
511_IdxToX = Tuple[Tuple[str, ...], ...]
512
513
514def _to_enum(prefix: str, conditions: List[Any]) -> _IdxToX:
515    return tuple(
516        tuple([f"{prefix}_{condition['name']}"] + [
517            f"{prefix}_{condition['name']}_{state['name']}"
518            for state in condition["states"]
519        ] + [f"{prefix}_{condition['name']}_NA"]) for condition in conditions)
520
521
522def _add_condition_enum(content: CContent, co_idx_to_enum: _IdxToX) -> None:
523    for enum in co_idx_to_enum:
524        content.add("typedef enum {")
525        with content.indent():
526            content.add(",\n".join(enum[1:]))
527        content.add(f"}} {enum[0]};")
528
529
530class _ActionRequirementTestItem(_TestItem):
531    """ An action requirement test item. """
532    def __init__(self, item: Item):
533        super().__init__(item)
534        self._mapper.add_get_value(("requirement/functional/action:"
535                                    "/pre-conditions/states/test-code/skip"),
536                                   self._skip_pre_condition)
537        self._pre_co_skip = {}  # type: Dict[int, bool]
538        self._pre_co_count = len(item["pre-conditions"])
539        self._pre_co_idx_to_enum = _to_enum(f"{self.ident}_Pre",
540                                            item["pre-conditions"])
541        self._post_co_idx_to_enum = _to_enum(f"{self.ident}_Post",
542                                             item["post-conditions"])
543        self._pci = "pcs"
544
545    def _add_pre_condition_descriptions(self, content: CContent) -> None:
546        for condition in self["pre-conditions"]:
547            content.add("static const char * const "
548                        f"{self.ident}_PreDesc_{condition['name']}[] = {{")
549            with content.indent():
550                content.add(",\n".join(
551                    itertools.chain((f"\"{state['name']}\""
552                                     for state in condition["states"]),
553                                    ["\"NA\""])))
554            content.add("};")
555        content.add("static const char * const * const "
556                    f"{self.ident}_PreDesc[] = {{")
557        with content.indent():
558            content.add(",\n".join([
559                f"{self.ident}_PreDesc_{condition['name']}"
560                for condition in self["pre-conditions"]
561            ] + ["NULL"]))
562        content.add("};")
563
564    def add_default_context_members(self, content: CContent) -> None:
565        super().add_default_context_members(content)
566        content.add("struct {")
567        with content.indent():
568            if self._pci == "pci":
569                content.add_description_block(
570                    "This member defines the pre-condition indices "
571                    "for the next action.", None)
572                content.add(f"size_t pci[ {self._pre_co_count} ];")
573            content.add_description_block(
574                "This member defines the pre-condition states "
575                "for the next action.", None)
576            content.add(f"size_t pcs[ {self._pre_co_count} ];")
577            content.add_description_block(
578                "If this member is true, then the test action "
579                "loop is executed.", None)
580            content.add("bool in_action_loop;")
581            content.add_description_block(
582                "This member contains the next transition map index.", None)
583            content.add("size_t index;")
584            content.add_description_block(
585                "This member contains the current transition map entry.", None)
586            content.add(f"{self.ident}_Entry entry;")
587            content.add_description_block(
588                "If this member is true, then the current transition "
589                "variant should be skipped.", None)
590            content.add("bool skip;")
591        content.append("} Map;")
592
593    def _add_fixture_scope(self, content: CContent) -> None:
594        params = ["void *arg", "char *buf", "size_t n"]
595        with content.function("static size_t", f"{self.ident}_Scope", params):
596            content.add([f"{self.context} *ctx;", "", "ctx = arg;"])
597            with content.condition("ctx->Map.in_action_loop"):
598                content.call_function(
599                    "return", "T_get_scope",
600                    [f"{self.ident}_PreDesc", "buf", "n", "ctx->Map.pcs"])
601            content.add("return 0;")
602
603    def _add_call(self, content: CContent, key: str, name: str) -> None:
604        if self[key] is not None:
605            content.gap = False
606            content.call_function(None, f"{self.ident}_{name}", ["ctx"])
607
608    def _add_skip(self, content: CContent) -> Any:
609        state_counts = [len(enum) - 2 for enum in self._pre_co_idx_to_enum]
610        weigths = [
611            str(
612                functools.reduce((lambda x, y: x * y),
613                                 state_counts[index + 1:], 1))
614            for index in range(self._pre_co_count)
615        ]
616        integer_type = get_integer_type(int(weigths[0]))
617        content.add(f"static const {integer_type} {self.ident}_Weights[] = {{")
618        with content.indent():
619            content.wrap(", ".join(weigths))
620        content.add("};")
621        with content.function("static void", f"{self.ident}_Skip",
622                              [f"{self.context} *ctx", "size_t index"]):
623            content.append("switch ( index + 1 ) {")
624            fall_through = "/* Fall through */"
625            with content.indent():
626                for index, enum in enumerate(self._pre_co_idx_to_enum[1:], 1):
627                    content.add(f"case {index}:")
628                    with content.indent():
629                        pci = f"ctx->Map.{self._pci}[ {index} ]"
630                        content.append(
631                            [f"{pci} = {enum[-1]} - 1;", fall_through])
632                content.lines[-1] = content.lines[-1].replace(
633                    fall_through, "break;")
634            content.append("}")
635
636    def _add_test_variant(self, content: CContent,
637                          transition_map: TransitionMap) -> None:
638        entry = "ctx->Map.entry"
639        for index in range(self._pre_co_count):
640            content.gap = False
641            state = f"ctx->Map.pcs[ {index} ]"
642            prepare = f"{self._pre_co_idx_to_enum[index][0]}_Prepare"
643            content.call_function(None, prepare, ["ctx", state])
644            if self._pre_co_skip.get(index, False):
645                with content.condition("ctx->Map.skip"):
646                    content.call_function(None, f"{self.ident}_Skip",
647                                          ["ctx", str(index)])
648                    content.append("return;")
649                content.add_blank_line()
650        self._add_call(content, "test-action", "Action")
651        for index, enum in enumerate(self._post_co_idx_to_enum):
652            content.gap = False
653            content.call_function(None, f"{enum[0]}_Check", [
654                "ctx", f"{entry}.{transition_map.get_post_entry_member(index)}"
655            ])
656
657    def _add_loop_body(self, content: CContent,
658                       transition_map: TransitionMap) -> None:
659        entry = "ctx->Map.entry"
660        content.call_function(f"{entry} =", f"{self.ident}_PopEntry", ["ctx"])
661        if transition_map.pre_co_summary[0]:
662            with content.condition(f"{entry}.Skip"):
663                content.append("continue;")
664            content.add_blank_line()
665        if transition_map.has_pre_co_not_applicable():
666            name = f"{self.ident}_SetPreConditionStates"
667            content.gap = False
668            content.call_function(None, name, ["ctx"])
669        self._add_call(content, "test-prepare", "Prepare")
670        content.gap = False
671        content.call_function(None, f"{self.ident}_TestVariant", ["ctx"])
672        self._add_call(content, "test-cleanup", "Cleanup")
673
674    def _add_for_loops(self, content: CContent, transition_map: TransitionMap,
675                       index: int) -> None:
676        if index < self._pre_co_count:
677            var = f"ctx->Map.{self._pci}[ {index} ]"
678            begin = self._pre_co_idx_to_enum[index][1]
679            end = self._pre_co_idx_to_enum[index][-1]
680            with content.for_loop(f"{var} = {begin}", f"{var} < {end}",
681                                  f"++{var}"):
682                self._add_for_loops(content, transition_map, index + 1)
683        else:
684            self._add_loop_body(content, transition_map)
685
686    def _add_set_pre_co_states(self, content: CContent,
687                               transition_map: TransitionMap) -> None:
688        ret = "static void"
689        name = f"{self.ident}_SetPreConditionStates"
690        params = [f"{self.context} *ctx"]
691        with content.function(ret, name, params, align=True):
692            entry = "ctx->Map.entry"
693            gap = False
694            for index, pre_co in enumerate(self._item["pre-conditions"]):
695                pcs_pci = f"ctx->Map.pcs[ {index} ] = ctx->Map.pci[ {index} ];"
696                if transition_map.pre_co_summary[index + 1]:
697                    is_na = f"{entry}.Pre_{pre_co['name']}_NA"
698                    with content.first_condition(is_na):
699                        enum_na = self._pre_co_idx_to_enum[index][-1]
700                        content.add(f"ctx->Map.pcs[ {index} ] = {enum_na};")
701                    with content.final_condition():
702                        content.add(pcs_pci)
703                    gap = True
704                else:
705                    content.gap = gap
706                    gap = False
707                    content.add(pcs_pci)
708
709    def _add_pop_entry(self, content: CContent) -> None:
710        ret = f"static inline {self.ident}_Entry"
711        name = f"{self.ident}_PopEntry"
712        params = [f"{self.context} *ctx"]
713        with content.function(ret, name, params, align=True):
714            content.add("size_t index;")
715            if self._pre_co_skip:
716                with content.first_condition("ctx->Map.skip"):
717                    content.add([
718                        "size_t i;", "", "ctx->Map.skip = false;", "index = 0;"
719                    ])
720                    with content.for_loop("i = 0", f"i < {self._pre_co_count}",
721                                          "++i"):
722                        content.append(f"index += {self.ident}_Weights[ i ]"
723                                       f" * ctx->Map.{self._pci}[ i ];")
724                with content.final_condition():
725                    content.add("index = ctx->Map.index;")
726                content.add("ctx->Map.index = index + 1;")
727            else:
728                content.add(
729                    ["index = ctx->Map.index;", "ctx->Map.index = index + 1;"])
730                content.gap = False
731            content.add([
732                f"return {self.ident}_Entries[",
733                f"  {self.ident}_Map[ index ]", "];"
734            ])
735
736    def _add_test_case(self, content: CContent, transition_map: TransitionMap,
737                       header: Dict[str, Any]) -> None:
738        if self._pre_co_skip:
739            self._add_skip(content)
740        self._add_pop_entry(content)
741        if transition_map.has_pre_co_not_applicable():
742            self._add_set_pre_co_states(content, transition_map)
743        with content.function("static void", f"{self.ident}_TestVariant",
744                              [f"{self.context} *ctx"]):
745            self._add_test_variant(content, transition_map)
746        fixture = f"{self.ident}_Fixture"
747        prologue = CContent()
748        epilogue = CContent()
749        map_members_initialization = [
750            "ctx->Map.in_action_loop = true;", "ctx->Map.index = 0;"
751        ]
752        if self._pre_co_skip:
753            map_members_initialization.append("ctx->Map.skip = false;")
754        if header:
755            content.add(f"static T_fixture_node {self.ident}_Node;")
756            ret = "void"
757            name = f"{self.ident}_Run"
758            params = self._get_run_params(header)
759            prologue.add([f"{self.context} *ctx;"])
760            self.assign_run_params(prologue, header)
761            prologue.call_function("ctx =", "T_push_fixture",
762                                   [f"&{self.ident}_Node", f"&{fixture}"])
763            prologue.append(map_members_initialization)
764            epilogue.add("T_pop_fixture();")
765            align = True
766        else:
767            with content.function_block(
768                    f"void T_case_body_{self.ident}( void )"):
769                pass
770            content.gap = False
771            ret = ""
772            name = "T_TEST_CASE_FIXTURE"
773            params = [f"{self.ident}", f"&{fixture}"]
774            prologue.add([
775                f"{self.context} *ctx;",
776                "",
777                "ctx = T_fixture_context();",
778            ] + map_members_initialization)
779            align = False
780        with content.function(ret, name, params, align=align):
781            content.add(prologue)
782            self._add_for_loops(content, transition_map, 0)
783            content.add(epilogue)
784
785    def _add_handler(self, content: CContent, conditions: str,
786                     co_idx_to_enum: _IdxToX, action: str) -> None:
787        for co_idx, condition in enumerate(self[conditions]):
788            enum = co_idx_to_enum[co_idx]
789            handler = f"{enum[0]}_{action}"
790            params = [f"{self.context} *ctx", f"{enum[0]} state"]
791            with content.function("static void", handler, params):
792                content.add(self.substitute_code(condition["test-prologue"]))
793                content.add("switch ( state ) {")
794                with content.indent():
795                    for state_index, state in enumerate(condition["states"]):
796                        content.add(f"case {enum[state_index + 1]}: {{")
797                        prefix = (f"/{conditions}[{co_idx}]"
798                                  f"/states[{state_index}]/test-code")
799                        with content.indent():
800                            with content.comment_block():
801                                content.wrap(
802                                    self.substitute_text(state["text"]))
803                            content.append(
804                                self.substitute_code(state["test-code"],
805                                                     prefix))
806                            content.append("break;")
807                        content.add("}")
808                    content.add(f"case {enum[-1]}:")
809                    with content.indent():
810                        content.append("break;")
811                content.add("}")
812                content.add(self.substitute_code(condition["test-epilogue"]))
813
814    def add_test_case_action_description(self, _content: CContent) -> None:
815        pass
816
817    def add_header_body(self, content: CContent, header: Dict[str,
818                                                              Any]) -> None:
819        _add_condition_enum(content, self._pre_co_idx_to_enum)
820        _add_condition_enum(content, self._post_co_idx_to_enum)
821        super().add_header_body(content, header)
822
823    def generate(self, content: CContent, base_directory: str,
824                 test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
825        self.add_test_case_description(content, test_case_to_suites)
826        header = self["test-header"]
827        if header:
828            self.generate_header(base_directory, header)
829        else:
830            _add_condition_enum(content, self._pre_co_idx_to_enum)
831            _add_condition_enum(content, self._post_co_idx_to_enum)
832        transition_map = TransitionMap(self.item)
833        if transition_map.has_pre_co_not_applicable():
834            self._pci = "pci"
835        transition_map.add_map_entry_type(content, self.ident)
836        instance = self.add_context(content)
837        self._add_pre_condition_descriptions(content)
838        content.add(self.substitute_code(self["test-support"]))
839        self._add_handler(content, "pre-conditions", self._pre_co_idx_to_enum,
840                          "Prepare")
841        self._add_handler(content, "post-conditions",
842                          self._post_co_idx_to_enum, "Check")
843        optional_code = "ctx->Map.in_action_loop = false;"
844        setup = self.add_support_method(content,
845                                        "test-setup",
846                                        "Setup",
847                                        optional_code=optional_code)
848        stop = self.add_support_method(content,
849                                       "test-stop",
850                                       "Stop",
851                                       optional_code=optional_code)
852        teardown = self.add_support_method(content,
853                                           "test-teardown",
854                                           "Teardown",
855                                           optional_code=optional_code)
856        self.add_function(content, "test-prepare", "Prepare")
857        self.add_function(content, "test-action", "Action")
858        self.add_function(content, "test-cleanup", "Cleanup")
859        transition_map.add_map(content, self.ident)
860        self._add_fixture_scope(content)
861        content.add([
862            f"static T_fixture {self.ident}_Fixture = {{",
863            f"  .setup = {setup},", f"  .stop = {stop},",
864            f"  .teardown = {teardown},", f"  .scope = {self.ident}_Scope,",
865            f"  .initial_context = {instance}", "};"
866        ])
867        self._add_test_case(content, transition_map, header)
868        content.add("/** @} */")
869
870    def _skip_pre_condition(self, ctx: ItemGetValueContext) -> Any:
871        """ Adds code to skip the current pre-condition state. """
872        index = int(ctx.path.split("]")[0].split("[")[1])
873        self._pre_co_skip[index] = True
874        return "ctx->Map.skip = true;"
875
876
877class _RuntimeMeasurementRequestItem(_TestItem):
878    """ A runtime measurement request item. """
879    def __init__(self, item: Item, context: str):
880        super().__init__(item)
881        self._context = context
882
883
884def _add_call_method(content: CContent, name: str) -> None:
885    if name != "NULL":
886        content.gap = False
887        content.call_function(None, name, ["ctx"])
888
889
890class _RuntimeMeasurementTestItem(_TestItem):
891    """ A runtime measurement test item. """
892    def add_test_case_action_description(self, _content: CContent) -> None:
893        pass
894
895    def add_default_context_members(self, content: CContent) -> None:
896        content.add_description_block(
897            "This member references the measure runtime context.", None)
898        content.add("T_measure_runtime_context *context;")
899        content.add_description_block(
900            "This member provides the measure runtime request.", None)
901        content.add("T_measure_runtime_request request;")
902        content.add_description_block(
903            "This member provides an optional measurement begin time point.",
904            None)
905        content.add("T_ticks begin;")
906        content.add_description_block(
907            "This member provides an optional measurement end time point.",
908            None)
909        content.add("T_ticks end;")
910
911    def _add_requests(self, content: CContent) -> CContent:
912        requests = CContent()
913        prepare = self.add_support_method(content,
914                                          "test-prepare",
915                                          "Prepare",
916                                          do_wrap=False)
917        cleanup = self.add_support_method(content,
918                                          "test-cleanup",
919                                          "Cleanup",
920                                          do_wrap=False)
921        for item in self.item.children("runtime-measurement-request"):
922            req = _RuntimeMeasurementRequestItem(item, self.context)
923            requests.add_blank_line()
924            _add_call_method(requests, prepare)
925            name = req.add_support_method(content,
926                                          "test-prepare",
927                                          "Prepare",
928                                          do_wrap=False)
929            _add_call_method(requests, name)
930            name = req.add_support_method(content, "test-setup", "Setup")
931            requests.append([
932                f"ctx->request.name = \"{req.ident}\";",
933                f"ctx->request.setup = {name};"
934            ])
935            name = req.add_support_method(content, "test-body", "Body")
936            requests.append([f"ctx->request.body = {name};"])
937            extra_params = [
938                "T_ticks *delta", "uint32_t tic", "uint32_t toc",
939                "unsigned int retry"
940            ]
941            extra_args = ["delta", "tic", "toc", "retry"]
942            name = req.add_support_method(content,
943                                          "test-teardown",
944                                          "Teardown",
945                                          ret="bool",
946                                          extra_params=extra_params,
947                                          extra_args=extra_args)
948            requests.append([f"ctx->request.teardown = {name};"])
949            requests.gap = False
950            requests.call_function(None, "T_measure_runtime",
951                                   ["ctx->context", "&ctx->request"])
952            name = req.add_support_method(content,
953                                          "test-cleanup",
954                                          "Cleanup",
955                                          do_wrap=False)
956            _add_call_method(requests, name)
957            _add_call_method(requests, cleanup)
958        return requests
959
960    def generate(self, content: CContent, base_directory: str,
961                 test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
962        self.add_test_case_description(content, test_case_to_suites)
963        instance = self.add_context(content)
964        content.add(self.substitute_code(self["test-support"]))
965        setup = f"{self.ident}_Setup_Context"
966        with content.function("static void", setup, [f"{self.context} *ctx"]):
967            content.add([
968                "T_measure_runtime_config config;",
969                "",
970                "memset( &config, 0, sizeof( config ) );",
971                f"config.sample_count = {self['params']['sample-count']};",
972                "ctx->request.arg = ctx;",
973                "ctx->request.flags = T_MEASURE_RUNTIME_REPORT_SAMPLES;",
974                "ctx->context = T_measure_runtime_create( &config );",
975                "T_assert_not_null( ctx->context );",
976            ])
977        setup = self.add_support_method(content,
978                                        "test-setup",
979                                        "Setup",
980                                        mandatory_code=f"{setup}( ctx );")
981        stop = self.add_support_method(content, "test-stop", "Stop")
982        teardown = self.add_support_method(content, "test-teardown",
983                                           "Teardown")
984        content.add([
985            f"static T_fixture {self.ident}_Fixture = {{",
986            f"  .setup = {setup},", f"  .stop = {stop},",
987            f"  .teardown = {teardown},", "  .scope = NULL,",
988            f"  .initial_context = {instance}", "};"
989        ])
990        requests = self._add_requests(content)
991        with content.function_block(f"void T_case_body_{self.ident}( void )"):
992            pass
993        content.gap = False
994        ret = ""
995        name = "T_TEST_CASE_FIXTURE"
996        params = [f"{self.ident}", f"&{self.ident}_Fixture"]
997        with content.function(ret, name, params, align=False):
998            content.add([
999                f"{self.context} *ctx;",
1000                "",
1001                "ctx = T_fixture_context();",
1002            ])
1003            content.append(requests)
1004        content.add("/** @} */")
1005
1006
1007class _SourceFile:
1008    """ A test source file. """
1009    def __init__(self, filename: str):
1010        """ Initializes a test source file. """
1011        self._file = filename
1012        self._test_suites = []  # type: List[_TestItem]
1013        self._test_cases = []  # type: List[_TestItem]
1014
1015    @property
1016    def test_suites(self) -> List[_TestItem]:
1017        """ The test suites of the source file. """
1018        return self._test_suites
1019
1020    @property
1021    def test_cases(self) -> List[_TestItem]:
1022        """ The test cases of the source file. """
1023        return self._test_cases
1024
1025    def add_test_suite(self, item: Item) -> None:
1026        """ Adds a test suite to the source file. """
1027        self._test_suites.append(_TestSuiteItem(item))
1028
1029    def add_test_case(self, item: Item) -> None:
1030        """ Adds a test case to the source file. """
1031        self._test_cases.append(_TestItem(item))
1032
1033    def add_action_requirement_test(self, item: Item) -> None:
1034        """ Adds an action requirement test to the source file. """
1035        self._test_cases.append(_ActionRequirementTestItem(item))
1036
1037    def add_runtime_measurement_test(self, item: Item) -> None:
1038        """ Adds a runtime measurement test to the source file. """
1039        self._test_cases.append(_RuntimeMeasurementTestItem(item))
1040
1041    def generate(self, base_directory: str,
1042                 test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
1043        """
1044        Generates the source file and the corresponding build specification.
1045        """
1046        content = CContent()
1047        includes = []  # type: List[CInclude]
1048        local_includes = []  # type: List[CInclude]
1049        for item in itertools.chain(self._test_suites, self._test_cases):
1050            includes.extend(map(CInclude, item.includes))
1051            local_includes.extend(map(CInclude, item.local_includes))
1052            content.register_license_and_copyrights_of_item(item.item)
1053        content.prepend_spdx_license_identifier()
1054        with content.file_block():
1055            _add_ingroup(content, self._test_suites)
1056            _add_ingroup(content, self._test_cases)
1057        content.add_copyrights_and_licenses()
1058        content.add_automatically_generated_warning()
1059        content.add_have_config()
1060        content.add_includes(includes)
1061        content.add_includes(local_includes, local=True)
1062        content.add_includes([CInclude("rtems/test.h")])
1063        for item in sorted(self._test_cases, key=lambda x: x.name):
1064            item.generate(content, base_directory, test_case_to_suites)
1065        for item in sorted(self._test_suites, key=lambda x: x.name):
1066            item.generate(content, base_directory, test_case_to_suites)
1067        content.write(os.path.join(base_directory, self._file))
1068
1069
1070def _gather_build_source_files(item: Item, files: List[str]):
1071    for parent in item.parents("build-dependency"):
1072        _gather_build_source_files(parent, files)
1073    files.extend(item.data.get("source", []))
1074
1075
1076class _TestProgram:
1077    """ A test program. """
1078    def __init__(self, item: Item):
1079        """ Initializes a test program. """
1080        self._item = item
1081        self._source_files = []  # type: List[_SourceFile]
1082        self._build_source_files = []  # type: List[str]
1083        _gather_build_source_files(item, self._build_source_files)
1084
1085    @property
1086    def source_files(self) -> List[_SourceFile]:
1087        """ The source files of the test program. """
1088        return self._source_files
1089
1090    def add_source_files(self, source_files: Dict[str, _SourceFile]) -> None:
1091        """
1092        Adds the source files of the test program which are present in the
1093        source file map.
1094        """
1095        for filename in self._build_source_files:
1096            source_file = source_files.get(filename, None)
1097            if source_file is not None:
1098                self._source_files.append(source_file)
1099
1100
1101def _get_source_file(filename: str,
1102                     source_files: Dict[str, _SourceFile]) -> _SourceFile:
1103    return source_files.setdefault(filename, _SourceFile(filename))
1104
1105
1106def _gather_action_requirement_test(
1107        item: Item, source_files: Dict[str, _SourceFile],
1108        _test_programs: List[_TestProgram]) -> None:
1109    src = _get_source_file(item["test-target"], source_files)
1110    src.add_action_requirement_test(item)
1111
1112
1113def _gather_runtime_measurement_test(
1114        item: Item, source_files: Dict[str, _SourceFile],
1115        _test_programs: List[_TestProgram]) -> None:
1116    src = _get_source_file(item["test-target"], source_files)
1117    src.add_runtime_measurement_test(item)
1118
1119
1120def _gather_test_case(item: Item, source_files: Dict[str, _SourceFile],
1121                      _test_programs: List[_TestProgram]) -> None:
1122    src = _get_source_file(item["test-target"], source_files)
1123    src.add_test_case(item)
1124
1125
1126def _gather_test_program(item: Item, _source_files: Dict[str, _SourceFile],
1127                         test_programs: List[_TestProgram]) -> None:
1128    test_programs.append(_TestProgram(item))
1129
1130
1131def _gather_test_suite(item: Item, source_files: Dict[str, _SourceFile],
1132                       _test_programs: List[_TestProgram]) -> None:
1133    src = _get_source_file(item["test-target"], source_files)
1134    src.add_test_suite(item)
1135
1136
1137def _gather_default(_item: Item, _source_files: Dict[str, _SourceFile],
1138                    _test_programs: List[_TestProgram]) -> None:
1139    pass
1140
1141
1142_GATHER = {
1143    "build/test-program": _gather_test_program,
1144    "requirement/functional/action": _gather_action_requirement_test,
1145    "runtime-measurement-test": _gather_runtime_measurement_test,
1146    "test-case": _gather_test_case,
1147    "test-suite": _gather_test_suite,
1148}
1149
1150
1151def generate(config: dict,
1152             item_cache: ItemCache,
1153             targets: Optional[List[str]] = None) -> None:
1154    """
1155    Generates source files and build specification items for validation test
1156    suites and test cases according to the configuration.
1157
1158    :param config: A dictionary with configuration entries.
1159    :param item_cache: The specification item cache containing the validation
1160                       test suites and test cases.
1161    """
1162    source_files = {}  # type: Dict[str, _SourceFile]
1163    test_programs = []  # type: List[_TestProgram]
1164    for item in item_cache.all.values():
1165        _GATHER.get(item.type, _gather_default)(item, source_files,
1166                                                test_programs)
1167
1168    test_case_to_suites = {}  # type: Dict[str, List[_TestItem]]
1169    for test_program in test_programs:
1170        test_program.add_source_files(source_files)
1171        test_suites = []  # type: List[_TestItem]
1172        for source_file in test_program.source_files:
1173            test_suites.extend(source_file.test_suites)
1174        for source_file in test_program.source_files:
1175            for test_case in source_file.test_cases:
1176                test_case_to_suites.setdefault(test_case.uid,
1177                                               []).extend(test_suites)
1178
1179    if not targets:
1180        for src in source_files.values():
1181            src.generate(config["base-directory"], test_case_to_suites)
1182    else:
1183        for target in targets:
1184            source_files[target].generate(config["base-directory"],
1185                                          test_case_to_suites)
Note: See TracBrowser for help on using the repository browser.