source: rtems-central/rtemsspec/validation.py @ cdb62c9

Last change on this file since cdb62c9 was cdb62c9, checked in by Sebastian Huber <sebastian.huber@…>, on 06/16/21 at 17:06:38

validation: Split file

  • Property mode set to 100644
File size: 42.3 KB
Line 
1# SPDX-License-Identifier: BSD-2-Clause
2""" This module provides functions for the generation of validation tests. """
3
4# Copyright (C) 2020, 2021 embedded brains GmbH (http://www.embedded-brains.de)
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9# 1. Redistributions of source code must retain the above copyright
10#    notice, this list of conditions and the following disclaimer.
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25# POSSIBILITY OF SUCH DAMAGE.
26
27import itertools
28import os
29import re
30from typing import Any, Dict, List, Optional, Tuple
31
32from rtemsspec.content import CContent, CInclude, \
33    GenericContent, get_value_params, get_value_plural, \
34    get_value_doxygen_group, get_value_doxygen_function, to_camel_case
35from rtemsspec.items import Item, ItemCache, \
36    ItemGetValueContext, ItemMapper
37from rtemsspec.transitionmap import TransitionMap
38
39ItemMap = Dict[str, Item]
40
41_STEPS = re.compile(r"^steps/([0-9]+)$")
42
43
44def _get_test_run(ctx: ItemGetValueContext) -> Any:
45    return f"{to_camel_case(ctx.item.uid[1:]).replace(' ', '')}_Run"
46
47
48class _Mapper(ItemMapper):
49    def __init__(self, item: Item):
50        super().__init__(item)
51        self._step = 0
52        self.add_get_value("glossary/term:/plural", get_value_plural)
53        self.add_get_value("interface/function:/name",
54                           get_value_doxygen_function)
55        self.add_get_value("interface/function:/params/name", get_value_params)
56        self.add_get_value("interface/group:/name", get_value_doxygen_group)
57        self.add_get_value("interface/macro:/name", get_value_doxygen_function)
58        self.add_get_value("interface/macro:/params/name", get_value_params)
59        self.add_get_value("requirement/functional/action:/test-run",
60                           _get_test_run)
61        self.add_get_value("test-case:/test-run", _get_test_run)
62
63    @property
64    def steps(self):
65        """ The count of test steps. """
66        return self._step
67
68    def reset(self):
69        """ Resets the test step counter. """
70        self._step = 0
71
72    def map(self,
73            identifier: str,
74            item: Optional[Item] = None,
75            prefix: Optional[str] = None) -> Tuple[Item, str, Any]:
76        if identifier == "step":
77            step = self._step
78            self._step = step + 1
79            return self._item, "step", str(step)
80        match = _STEPS.search(identifier)
81        if match:
82            inc = int(match.group(1))
83            self._step += inc
84            return self._item, "step", f"Accounts for {inc} test plan steps"
85        return super().map(identifier, item, prefix)
86
87
88def _add_ingroup(content: CContent, items: List["_TestItem"]) -> None:
89    content.add_ingroup([item.group_identifier for item in items])
90
91
92class _TestItem:
93    """ A test item with a default implementation for test cases. """
94
95    # pylint: disable=too-many-public-methods
96    def __init__(self, item: Item):
97        self._item = item
98        self._ident = to_camel_case(item.uid[1:])
99        self._context = f"{self._ident}_Context"
100        self._mapper = _Mapper(item)
101
102    def __getitem__(self, key: str):
103        return self._item[key]
104
105    @property
106    def item(self) -> Item:
107        """ Returns the item. """
108        return self._item
109
110    @property
111    def uid(self) -> str:
112        """ Returns the item UID. """
113        return self._item.uid
114
115    @property
116    def ident(self) -> str:
117        """ Returns the test identifier. """
118        return self._ident
119
120    @property
121    def context(self) -> str:
122        """ Returns the test case context type. """
123        return self._context
124
125    @property
126    def name(self) -> str:
127        """ Returns the name. """
128        return self._item.spec
129
130    @property
131    def includes(self) -> List[str]:
132        """ Returns the list of includes. """
133        return self._item["test-includes"]
134
135    @property
136    def local_includes(self) -> List[str]:
137        """ Returns the list of local includes. """
138        return self._item["test-local-includes"]
139
140    @property
141    def brief(self) -> str:
142        """ Returns the substituted brief description. """
143        return self.substitute_text(self["test-brief"])
144
145    @property
146    def description(self) -> str:
147        """ Returns the substituted description. """
148        return self.substitute_text(self["test-description"])
149
150    @property
151    def group_identifier(self) -> str:
152        """ Returns the group identifier. """
153        return f"RTEMSTestCase{self.ident}"
154
155    def substitute_code(self, text: Optional[str]) -> str:
156        """ Performs a variable substitution for code. """
157        return self._mapper.substitute(text)
158
159    def substitute_text(self,
160                        text: Optional[str],
161                        prefix: Optional[str] = None) -> str:
162        """
163        Performs a variable substitution for text with an optional prefix.
164        """
165        return self._mapper.substitute(text, prefix=prefix)
166
167    def add_test_case_description(
168            self, content: CContent,
169            test_case_to_suites: Dict[str, List["_TestItem"]]) -> None:
170        """ Adds the test case description. """
171        with content.defgroup_block(self.group_identifier, self.name):
172            try:
173                test_suites = test_case_to_suites[self.uid]
174            except KeyError as err:
175                msg = (f"the target file '{self['test-target']}' of "
176                       f"{self.item.spec} is not a source file of an item of "
177                       "type 'build/test-program'")
178                raise ValueError(msg) from err
179            _add_ingroup(content, test_suites)
180            content.add_brief_description(self.brief)
181            content.wrap(self.description)
182            self.add_test_case_action_description(content)
183            content.add("@{")
184
185    def add_test_case_action_description(self, content: CContent) -> None:
186        """ Adds the test case action description. """
187        actions = self["test-actions"]
188        if actions:
189            content.add("This test case performs the following actions:")
190            for action in actions:
191                content.wrap(self.substitute_text(action["action-brief"]),
192                             initial_indent="- ")
193                for check in action["checks"]:
194                    content.wrap(self.substitute_text(check["brief"]),
195                                 initial_indent="  - ",
196                                 subsequent_indent="    ")
197
198    def _add_test_case_actions(self, content: CContent) -> CContent:
199        actions = CContent()
200        for index, action in enumerate(self["test-actions"]):
201            method = f"{self._ident}_Action_{index}"
202            if self.context == "void":
203                args = []
204                params = []
205            else:
206                args = ["ctx"]
207                params = [f"{self.context} *ctx"]
208            actions.gap = False
209            actions.call_function(None, method, args)
210            with content.doxygen_block():
211                content.add_brief_description(
212                    self.substitute_text(action["action-brief"]))
213            content.gap = False
214            with content.function("static void", method, params):
215                content.add(self.substitute_code(action["action-code"]))
216                for check in action["checks"]:
217                    with content.comment_block():
218                        content.wrap(self.substitute_text(check["brief"]))
219                    content.append(self.substitute_text(check["code"]))
220        return actions
221
222    def _get_run_params(self, header: Optional[Dict[str, Any]]) -> List[str]:
223        if not header:
224            return []
225        return [
226            self.substitute_text(param["specifier"],
227                                 f"test-header/run-params[{index}]")
228            for index, param in enumerate(header["run-params"])
229        ]
230
231    def add_header_body(self, content: CContent, header: Dict[str,
232                                                              Any]) -> None:
233        """ Adds the test header body. """
234        content.add(self.substitute_code(header["code"]))
235        with content.doxygen_block():
236            content.add_brief_description("Runs the parameterized test case.")
237            content.add_param_description(header["run-params"])
238        content.gap = False
239        content.declare_function("void", f"{self.ident}_Run",
240                                 self._get_run_params(header))
241
242    def add_support_method(self,
243                           content: CContent,
244                           key: str,
245                           name: str,
246                           mandatory_code: Optional[GenericContent] = None,
247                           optional_code: Optional[GenericContent] = None,
248                           ret: str = "void",
249                           extra_params: Optional[List[str]] = None,
250                           extra_args: Optional[List[str]] = None,
251                           do_wrap: bool = True) -> str:
252        """ Adds a support method to the content. """
253
254        # pylint: disable=too-many-arguments
255        # pylint: disable=too-many-locals
256        info = self[key]
257        if not info and not mandatory_code:
258            return "NULL"
259        if extra_params is None:
260            extra_params = []
261        if extra_args is None:
262            extra_args = []
263        method = f"{self.ident}_{name}"
264        wrap = f"{method}_Wrap"
265        if info:
266            content.add_description_block(
267                self.substitute_text(info["brief"]),
268                self.substitute_text(info["description"]))
269            params = [f"{self.context} *ctx"] + extra_params
270            with content.function(f"static {ret}", method, params):
271                if not do_wrap:
272                    content.gap = False
273                    content.add(mandatory_code)
274                    content.gap = False
275                    content.add(optional_code)
276                content.add(self.substitute_code(info["code"]))
277        if not do_wrap:
278            assert info
279            return method
280        params = ["void *arg"] + extra_params
281        with content.function(f"static {ret}", wrap, params):
282            content.add([f"{self.context} *ctx;", "", "ctx = arg;"])
283            content.gap = False
284            content.add(mandatory_code)
285            content.gap = False
286            content.add(optional_code)
287            if info:
288                content.gap = False
289                ret_2 = None if ret == "void" else "return"
290                args = ["ctx"] + extra_args
291                content.call_function(ret_2, f"{method}", args)
292        return wrap
293
294    def add_function(self, content: CContent, key: str, name: str) -> None:
295        """
296        Adds a function with the name to the content if there is one defined
297        for the attribute key.
298        """
299        if self[key] is not None:
300            with content.function("static void", f"{self.ident}_{name}",
301                                  [f"{self.context} *ctx"]):
302                content.append(self.substitute_code(self[key]))
303
304    def add_default_context_members(self, content: CContent) -> None:
305        """ Adds the default context members to the content """
306        for param in self._get_run_params(self["test-header"]):
307            content.add_description_block(
308                "This member contains a copy of the corresponding "
309                f"{self.ident}_Run() parameter.", None)
310            content.add(f"{param.strip()};")
311
312    def add_context(self, content: CContent) -> str:
313        """ Adds the context to the content. """
314        content.add(self.substitute_code(self["test-context-support"]))
315        if not self["test-context"] and (
316                not self["test-header"]
317                or not self["test-header"]["run-params"]):
318            return "NULL"
319        with content.doxygen_block():
320            content.add_brief_description(
321                f"Test context for {self.name} test case.")
322        content.append("typedef struct {")
323        with content.indent():
324            for info in self["test-context"]:
325                content.add_description_block(
326                    self.substitute_text(info["brief"]),
327                    self.substitute_text(info["description"]))
328                content.add(f"{info['member'].strip()};")
329            self.add_default_context_members(content)
330        content.add([
331            f"}} {self.context};", "", f"static {self.context}",
332            f"  {self.ident}_Instance;"
333        ])
334        return f"&{self.ident}_Instance"
335
336    def generate_header(self, base_directory: str, header: Dict[str,
337                                                                Any]) -> None:
338        """ Generates the test header. """
339        content = CContent()
340        content.register_license_and_copyrights_of_item(self._item)
341        content.prepend_spdx_license_identifier()
342        with content.file_block():
343            content.add_ingroup([self.group_identifier])
344        content.add_copyrights_and_licenses()
345        content.add_automatically_generated_warning()
346        with content.header_guard(os.path.basename(header["target"])):
347            content.add_includes(list(map(CInclude, header["includes"])))
348            content.add_includes(list(map(CInclude, header["local-includes"])),
349                                 local=True)
350            with content.extern_c():
351                with content.add_to_group(self.group_identifier):
352                    self.add_header_body(content, header)
353        content.write(os.path.join(base_directory, header["target"]))
354
355    def _add_context_and_fixture(self, content: CContent) -> Optional[str]:
356        instance = self.add_context(content)
357        if instance == "NULL":
358            self._context = "void"
359            do_wrap = False
360        else:
361            do_wrap = True
362        setup = self.add_support_method(content,
363                                        "test-setup",
364                                        "Setup",
365                                        do_wrap=do_wrap)
366        stop = self.add_support_method(content,
367                                       "test-stop",
368                                       "Stop",
369                                       do_wrap=do_wrap)
370        teardown = self.add_support_method(content,
371                                           "test-teardown",
372                                           "Teardown",
373                                           do_wrap=do_wrap)
374        if all(ptr == "NULL" for ptr in [instance, setup, stop, teardown]):
375            return None
376        content.add([
377            f"static T_fixture {self.ident}_Fixture = {{",
378            f"  .setup = {setup},", f"  .stop = {stop},",
379            f"  .teardown = {teardown},", "  .scope = NULL,",
380            f"  .initial_context = {instance}", "};"
381        ])
382        return f"&{self.ident}_Fixture"
383
384    def assign_run_params(self, content: CContent, header: Dict[str,
385                                                                Any]) -> None:
386        """ Assigns the run parameters to the context.  """
387        if header["run-params"]:
388            content.add([f"ctx = &{self.ident}_Instance;"] + [
389                f"ctx->{param['name']} = {param['name']};"
390                for param in header["run-params"]
391            ])
392
393    def generate(self, content: CContent, base_directory: str,
394                 test_case_to_suites: Dict[str, List["_TestItem"]]) -> None:
395        """ Generates the content. """
396        self.add_test_case_description(content, test_case_to_suites)
397        fixture = self._add_context_and_fixture(content)
398        content.add(self.substitute_code(self["test-support"]))
399        self._mapper.reset()
400        actions = self._add_test_case_actions(content)
401        header = self["test-header"]
402        prologue = CContent()
403        epilogue = CContent()
404        if header:
405            self.generate_header(base_directory, header)
406            ret = "void"
407            name = f"{self.ident}_Run"
408            params = self._get_run_params(header)
409            if self._mapper.steps > 0 and not fixture:
410                fixture = "&T_empty_fixture"
411            if fixture:
412                content.add(f"static T_fixture_node {self.ident}_Node;")
413                if self.context == "void":
414                    result = None
415                else:
416                    prologue.add(f"{self.context} *ctx;")
417                    self.assign_run_params(prologue, header)
418                    result = "ctx ="
419                prologue.call_function(result, "T_push_fixture",
420                                       [f"&{self.ident}_Node", fixture])
421                epilogue.add("T_pop_fixture();")
422            align = True
423        else:
424            ret = ""
425            params = [f"{self.ident}"]
426            if fixture:
427                params.append(fixture)
428                name = "T_TEST_CASE_FIXTURE"
429            else:
430                name = "T_TEST_CASE"
431            if self.context != "void":
432                prologue.add([
433                    f"{self.context} *ctx;", "", "ctx = T_fixture_context();"
434                ])
435            align = False
436            with content.function_block(
437                    f"void T_case_body_{self.ident}( void )"):
438                pass
439            content.gap = False
440        with content.function(ret, name, params, align=align):
441            content.add(prologue)
442            if self._mapper.steps > 0:
443                content.add(f"T_plan( {self._mapper.steps} );")
444            content.add(actions)
445            content.add(epilogue)
446        content.add("/** @} */")
447
448
449class _TestSuiteItem(_TestItem):
450    """ A test suite item. """
451    @property
452    def group_identifier(self) -> str:
453        return f"RTEMSTestSuite{self.ident}"
454
455    def generate(self, content: CContent, _base_directory: str,
456                 _test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
457        with content.defgroup_block(self.group_identifier, self.name):
458            content.add("@ingroup RTEMSTestSuites")
459            content.add_brief_description(self.brief)
460            content.wrap(self.description)
461            content.add("@{")
462        content.add(self.substitute_code(self["test-code"]))
463        content.add("/** @} */")
464
465
466_IdxToX = Tuple[Tuple[str, ...], ...]
467
468
469def _to_enum(prefix: str, conditions: List[Any]) -> _IdxToX:
470    return tuple(
471        tuple([f"{prefix}_{condition['name']}"] + [
472            f"{prefix}_{condition['name']}_{state['name']}"
473            for state in condition["states"]
474        ] + [f"{prefix}_{condition['name']}_NA"]) for condition in conditions)
475
476
477def _add_condition_enum(content: CContent, co_idx_to_enum: _IdxToX) -> None:
478    for enum in co_idx_to_enum:
479        content.add("typedef enum {")
480        with content.indent():
481            content.add(",\n".join(enum[1:]))
482        content.add(f"}} {enum[0]};")
483
484
485class _ActionRequirementTestItem(_TestItem):
486    """ An action requirement test item. """
487    def __init__(self, item: Item):
488        super().__init__(item)
489        self._pre_co_count = len(item["pre-conditions"])
490        self._pre_co_idx_to_enum = _to_enum(f"{self.ident}_Pre",
491                                            item["pre-conditions"])
492        self._post_co_idx_to_enum = _to_enum(f"{self.ident}_Post",
493                                             item["post-conditions"])
494
495    def _add_pre_condition_descriptions(self, content: CContent) -> None:
496        for condition in self["pre-conditions"]:
497            content.add("static const char * const "
498                        f"{self.ident}_PreDesc_{condition['name']}[] = {{")
499            with content.indent():
500                content.add(",\n".join(
501                    itertools.chain((f"\"{state['name']}\""
502                                     for state in condition["states"]),
503                                    ["\"NA\""])))
504            content.add("};")
505        content.add("static const char * const * const "
506                    f"{self.ident}_PreDesc[] = {{")
507        with content.indent():
508            content.add(",\n".join([
509                f"{self.ident}_PreDesc_{condition['name']}"
510                for condition in self["pre-conditions"]
511            ] + ["NULL"]))
512        content.add("};")
513
514    def add_default_context_members(self, content: CContent) -> None:
515        super().add_default_context_members(content)
516        content.add_description_block(
517            "This member defines the pre-condition states "
518            "for the next action.", None)
519        content.add(f"size_t pcs[ {self._pre_co_count} ];")
520        content.add_description_block(
521            "This member indicates if the test action loop "
522            "is currently executed.", None)
523        content.add("bool in_action_loop;")
524
525    def _add_fixture_scope(self, content: CContent) -> None:
526        params = ["void *arg", "char *buf", "size_t n"]
527        with content.function("static size_t", f"{self.ident}_Scope", params):
528            content.add([f"{self.context} *ctx;", "", "ctx = arg;"])
529            with content.condition("ctx->in_action_loop"):
530                content.call_function(
531                    "return", "T_get_scope",
532                    [f"{self.ident}_PreDesc", "buf", "n", "ctx->pcs"])
533            content.add("return 0;")
534
535    def _add_call(self, content: CContent, key: str, name: str) -> None:
536        if self[key] is not None:
537            content.gap = False
538            content.call_function(None, f"{self.ident}_{name}", ["ctx"])
539
540    def _add_loop_body(self, content: CContent,
541                       transition_map: TransitionMap) -> None:
542        has_pre_co_na = max(transition_map.pre_co_summary[1:])
543        content.add(f"{self.ident}_Entry entry;")
544        if has_pre_co_na:
545            content.append(f"size_t pcs[ {self._pre_co_count} ];")
546        content.call_function("entry =", f"{self.ident}_GetEntry", ["index"])
547        content.append("++index;")
548        if transition_map.pre_co_summary[0]:
549            with content.condition("entry.Skip"):
550                content.append("continue;")
551        if has_pre_co_na:
552            content.call_function(None, "memcpy",
553                                  ["pcs", "ctx->pcs", "sizeof( pcs )"])
554            for index, pre_co in enumerate(self._item["pre-conditions"]):
555                if transition_map.pre_co_summary[index + 1]:
556                    name = pre_co["name"]
557                    with content.condition(f"entry.Pre_{name}_NA"):
558                        enum_na = self._pre_co_idx_to_enum[index][-1]
559                        content.append(f"ctx->pcs[ {index} ] = {enum_na};")
560        content.add_blank_line()
561        self._add_call(content, "test-prepare", "Prepare")
562        for index, enum in enumerate(self._pre_co_idx_to_enum):
563            content.gap = False
564            content.call_function(None, f"{enum[0]}_Prepare",
565                                  ["ctx", f"ctx->pcs[ {index} ]"])
566        self._add_call(content, "test-action", "Action")
567        for index, enum in enumerate(self._post_co_idx_to_enum):
568            content.gap = False
569            content.call_function(None, f"{enum[0]}_Check", [
570                "ctx", f"entry.{transition_map.get_post_entry_member(index)}"
571            ])
572        self._add_call(content, "test-cleanup", "Cleanup")
573        if has_pre_co_na:
574            content.gap = False
575            content.call_function(None, "memcpy",
576                                  ["ctx->pcs", "pcs", "sizeof( ctx->pcs )"])
577
578    def _add_for_loops(self, content: CContent, transition_map: TransitionMap,
579                       index: int) -> None:
580        if index < self._pre_co_count:
581            var = f"ctx->pcs[ {index} ]"
582            begin = self._pre_co_idx_to_enum[index][1]
583            end = self._pre_co_idx_to_enum[index][-1]
584            with content.for_loop(f"{var} = {begin}", f"{var} < {end}",
585                                  f"++{var}"):
586                self._add_for_loops(content, transition_map, index + 1)
587        else:
588            self._add_loop_body(content, transition_map)
589
590    def _add_test_case(self, content: CContent, transition_map: TransitionMap,
591                       header: Dict[str, Any]) -> None:
592        ret = f"static inline {self.ident}_Entry"
593        name = f"{self.ident}_GetEntry"
594        params = ["size_t index"]
595        with content.function(ret, name, params, align=True):
596            content.add([
597                f"return {self.ident}_Entries[",
598                f"  {self.ident}_Map[ index ]", "];"
599            ])
600        fixture = f"{self.ident}_Fixture"
601        prologue = CContent()
602        epilogue = CContent()
603        if header:
604            content.add(f"static T_fixture_node {self.ident}_Node;")
605            ret = "void"
606            name = f"{self.ident}_Run"
607            params = self._get_run_params(header)
608            prologue.add([f"{self.context} *ctx;", "size_t index;"])
609            self.assign_run_params(prologue, header)
610            prologue.call_function("ctx =", "T_push_fixture",
611                                   [f"&{self.ident}_Node", f"&{fixture}"])
612            prologue.append(["ctx->in_action_loop = true;", "index = 0;"])
613            epilogue.add("T_pop_fixture();")
614            align = True
615        else:
616            with content.function_block(
617                    f"void T_case_body_{self.ident}( void )"):
618                pass
619            content.gap = False
620            ret = ""
621            name = "T_TEST_CASE_FIXTURE"
622            params = [f"{self.ident}", f"&{fixture}"]
623            prologue.add([
624                f"{self.context} *ctx;", "size_t index;", "",
625                "ctx = T_fixture_context();", "ctx->in_action_loop = true;",
626                "index = 0;"
627            ])
628            align = False
629        with content.function(ret, name, params, align=align):
630            content.add(prologue)
631            self._add_for_loops(content, transition_map, 0)
632            content.add(epilogue)
633
634    def _add_handler(self, content: CContent, conditions: List[Any],
635                     co_idx_to_enum: _IdxToX, action: str) -> None:
636        for co_idx, condition in enumerate(conditions):
637            enum = co_idx_to_enum[co_idx]
638            handler = f"{enum[0]}_{action}"
639            params = [f"{self.context} *ctx", f"{enum[0]} state"]
640            with content.function("static void", handler, params):
641                content.add(self.substitute_code(condition["test-prologue"]))
642                content.add("switch ( state ) {")
643                with content.indent():
644                    for state_index, state in enumerate(condition["states"]):
645                        content.add(f"case {enum[state_index + 1]}: {{")
646                        with content.indent():
647                            with content.comment_block():
648                                content.wrap(
649                                    self.substitute_text(state["text"]))
650                            content.append(
651                                self.substitute_code(state["test-code"]))
652                            content.append("break;")
653                        content.add("}")
654                    content.add(f"case {enum[-1]}:")
655                    with content.indent():
656                        content.append("break;")
657                content.add("}")
658                content.add(self.substitute_code(condition["test-epilogue"]))
659
660    def add_test_case_action_description(self, _content: CContent) -> None:
661        pass
662
663    def add_header_body(self, content: CContent, header: Dict[str,
664                                                              Any]) -> None:
665        _add_condition_enum(content, self._pre_co_idx_to_enum)
666        _add_condition_enum(content, self._post_co_idx_to_enum)
667        super().add_header_body(content, header)
668
669    def generate(self, content: CContent, base_directory: str,
670                 test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
671        self.add_test_case_description(content, test_case_to_suites)
672        header = self["test-header"]
673        if header:
674            self.generate_header(base_directory, header)
675        else:
676            _add_condition_enum(content, self._pre_co_idx_to_enum)
677            _add_condition_enum(content, self._post_co_idx_to_enum)
678        instance = self.add_context(content)
679        self._add_pre_condition_descriptions(content)
680        content.add(self.substitute_code(self["test-support"]))
681        self._add_handler(content, self["pre-conditions"],
682                          self._pre_co_idx_to_enum, "Prepare")
683        self._add_handler(content, self["post-conditions"],
684                          self._post_co_idx_to_enum, "Check")
685        optional_code = "ctx->in_action_loop = false;"
686        setup = self.add_support_method(content,
687                                        "test-setup",
688                                        "Setup",
689                                        optional_code=optional_code)
690        stop = self.add_support_method(content,
691                                       "test-stop",
692                                       "Stop",
693                                       optional_code=optional_code)
694        teardown = self.add_support_method(content,
695                                           "test-teardown",
696                                           "Teardown",
697                                           optional_code=optional_code)
698        self.add_function(content, "test-prepare", "Prepare")
699        self.add_function(content, "test-action", "Action")
700        self.add_function(content, "test-cleanup", "Cleanup")
701        transition_map = TransitionMap(self.item)
702        transition_map.add_map(content, self.ident)
703        self._add_fixture_scope(content)
704        content.add([
705            f"static T_fixture {self.ident}_Fixture = {{",
706            f"  .setup = {setup},", f"  .stop = {stop},",
707            f"  .teardown = {teardown},", f"  .scope = {self.ident}_Scope,",
708            f"  .initial_context = {instance}", "};"
709        ])
710        self._add_test_case(content, transition_map, header)
711        content.add("/** @} */")
712
713
714class _RuntimeMeasurementRequestItem(_TestItem):
715    """ A runtime measurement request item. """
716    def __init__(self, item: Item, context: str):
717        super().__init__(item)
718        self._context = context
719
720
721def _add_call_method(content: CContent, name: str) -> None:
722    if name != "NULL":
723        content.gap = False
724        content.call_function(None, name, ["ctx"])
725
726
727class _RuntimeMeasurementTestItem(_TestItem):
728    """ A runtime measurement test item. """
729    def add_test_case_action_description(self, _content: CContent) -> None:
730        pass
731
732    def add_default_context_members(self, content: CContent) -> None:
733        content.add_description_block(
734            "This member references the measure runtime context.", None)
735        content.add("T_measure_runtime_context *context;")
736        content.add_description_block(
737            "This member provides the measure runtime request.", None)
738        content.add("T_measure_runtime_request request;")
739
740    def _add_requests(self, content: CContent) -> CContent:
741        requests = CContent()
742        prepare = self.add_support_method(content,
743                                          "test-prepare",
744                                          "Prepare",
745                                          do_wrap=False)
746        cleanup = self.add_support_method(content,
747                                          "test-cleanup",
748                                          "Cleanup",
749                                          do_wrap=False)
750        for item in self.item.children("runtime-measurement-request"):
751            req = _RuntimeMeasurementRequestItem(item, self.context)
752            requests.add_blank_line()
753            _add_call_method(requests, prepare)
754            name = req.add_support_method(content,
755                                          "test-prepare",
756                                          "Prepare",
757                                          do_wrap=False)
758            _add_call_method(requests, name)
759            name = req.add_support_method(content, "test-setup", "Setup")
760            requests.append([
761                f"ctx->request.name = \"{req.ident}\";",
762                f"ctx->request.setup = {name};"
763            ])
764            name = req.add_support_method(content, "test-body", "Body")
765            requests.append([f"ctx->request.body = {name};"])
766            extra_params = [
767                "T_ticks *delta", "uint32_t tic", "uint32_t toc",
768                "unsigned int retry"
769            ]
770            extra_args = ["delta", "tic", "toc", "retry"]
771            name = req.add_support_method(content,
772                                          "test-teardown",
773                                          "Teardown",
774                                          ret="bool",
775                                          extra_params=extra_params,
776                                          extra_args=extra_args)
777            requests.append([f"ctx->request.teardown = {name};"])
778            requests.gap = False
779            requests.call_function(None, "T_measure_runtime",
780                                   ["ctx->context", "&ctx->request"])
781            name = req.add_support_method(content,
782                                          "test-cleanup",
783                                          "Cleanup",
784                                          do_wrap=False)
785            _add_call_method(requests, name)
786            _add_call_method(requests, cleanup)
787        return requests
788
789    def generate(self, content: CContent, base_directory: str,
790                 test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
791        self.add_test_case_description(content, test_case_to_suites)
792        instance = self.add_context(content)
793        content.add(self.substitute_code(self["test-support"]))
794        setup = f"{self.ident}_Setup_Context"
795        with content.function("static void", setup, [f"{self.context} *ctx"]):
796            content.add([
797                "T_measure_runtime_config config;",
798                "",
799                "memset( &config, 0, sizeof( config ) );",
800                f"config.sample_count = {self['params']['sample-count']};",
801                "ctx->request.arg = ctx;",
802                "ctx->request.flags = T_MEASURE_RUNTIME_REPORT_SAMPLES;",
803                "ctx->context = T_measure_runtime_create( &config );",
804                "T_assert_not_null( ctx->context );",
805            ])
806        setup = self.add_support_method(content,
807                                        "test-setup",
808                                        "Setup",
809                                        mandatory_code=f"{setup}( ctx );")
810        stop = self.add_support_method(content, "test-stop", "Stop")
811        teardown = self.add_support_method(content, "test-teardown",
812                                           "Teardown")
813        content.add([
814            f"static T_fixture {self.ident}_Fixture = {{",
815            f"  .setup = {setup},", f"  .stop = {stop},",
816            f"  .teardown = {teardown},", "  .scope = NULL,",
817            f"  .initial_context = {instance}", "};"
818        ])
819        requests = self._add_requests(content)
820        with content.function_block(f"void T_case_body_{self.ident}( void )"):
821            pass
822        content.gap = False
823        ret = ""
824        name = "T_TEST_CASE_FIXTURE"
825        params = [f"{self.ident}", f"&{self.ident}_Fixture"]
826        with content.function(ret, name, params, align=False):
827            content.add([
828                f"{self.context} *ctx;",
829                "",
830                "ctx = T_fixture_context();",
831            ])
832            content.append(requests)
833        content.add("/** @} */")
834
835
836class _SourceFile:
837    """ A test source file. """
838    def __init__(self, filename: str):
839        """ Initializes a test source file. """
840        self._file = filename
841        self._test_suites = []  # type: List[_TestItem]
842        self._test_cases = []  # type: List[_TestItem]
843
844    @property
845    def test_suites(self) -> List[_TestItem]:
846        """ The test suites of the source file. """
847        return self._test_suites
848
849    @property
850    def test_cases(self) -> List[_TestItem]:
851        """ The test cases of the source file. """
852        return self._test_cases
853
854    def add_test_suite(self, item: Item) -> None:
855        """ Adds a test suite to the source file. """
856        self._test_suites.append(_TestSuiteItem(item))
857
858    def add_test_case(self, item: Item) -> None:
859        """ Adds a test case to the source file. """
860        self._test_cases.append(_TestItem(item))
861
862    def add_action_requirement_test(self, item: Item) -> None:
863        """ Adds an action requirement test to the source file. """
864        self._test_cases.append(_ActionRequirementTestItem(item))
865
866    def add_runtime_measurement_test(self, item: Item) -> None:
867        """ Adds a runtime measurement test to the source file. """
868        self._test_cases.append(_RuntimeMeasurementTestItem(item))
869
870    def generate(self, base_directory: str,
871                 test_case_to_suites: Dict[str, List[_TestItem]]) -> None:
872        """
873        Generates the source file and the corresponding build specification.
874        """
875        content = CContent()
876        includes = []  # type: List[CInclude]
877        local_includes = []  # type: List[CInclude]
878        for item in itertools.chain(self._test_suites, self._test_cases):
879            includes.extend(map(CInclude, item.includes))
880            local_includes.extend(map(CInclude, item.local_includes))
881            content.register_license_and_copyrights_of_item(item.item)
882        content.prepend_spdx_license_identifier()
883        with content.file_block():
884            _add_ingroup(content, self._test_suites)
885            _add_ingroup(content, self._test_cases)
886        content.add_copyrights_and_licenses()
887        content.add_automatically_generated_warning()
888        content.add_have_config()
889        content.add_includes(includes)
890        content.add_includes(local_includes, local=True)
891        content.add_includes([CInclude("rtems/test.h")])
892        for item in sorted(self._test_cases, key=lambda x: x.name):
893            item.generate(content, base_directory, test_case_to_suites)
894        for item in sorted(self._test_suites, key=lambda x: x.name):
895            item.generate(content, base_directory, test_case_to_suites)
896        content.write(os.path.join(base_directory, self._file))
897
898
899class _TestProgram:
900    """ A test program. """
901    def __init__(self, item: Item):
902        """ Initializes a test program. """
903        self._item = item
904        self._source_files = []  # type: List[_SourceFile]
905
906    @property
907    def source_files(self) -> List[_SourceFile]:
908        """ The source files of the test program. """
909        return self._source_files
910
911    def add_source_files(self, source_files: Dict[str, _SourceFile]) -> None:
912        """
913        Adds the source files of the test program which are present in the
914        source file map.
915        """
916        for filename in self._item["source"]:
917            source_file = source_files.get(filename, None)
918            if source_file is not None:
919                self._source_files.append(source_file)
920
921
922def _get_source_file(filename: str,
923                     source_files: Dict[str, _SourceFile]) -> _SourceFile:
924    return source_files.setdefault(filename, _SourceFile(filename))
925
926
927def _gather_action_requirement_test(
928        item: Item, source_files: Dict[str, _SourceFile],
929        _test_programs: List[_TestProgram]) -> None:
930    src = _get_source_file(item["test-target"], source_files)
931    src.add_action_requirement_test(item)
932
933
934def _gather_runtime_measurement_test(
935        item: Item, source_files: Dict[str, _SourceFile],
936        _test_programs: List[_TestProgram]) -> None:
937    src = _get_source_file(item["test-target"], source_files)
938    src.add_runtime_measurement_test(item)
939
940
941def _gather_test_case(item: Item, source_files: Dict[str, _SourceFile],
942                      _test_programs: List[_TestProgram]) -> None:
943    src = _get_source_file(item["test-target"], source_files)
944    src.add_test_case(item)
945
946
947def _gather_test_program(item: Item, _source_files: Dict[str, _SourceFile],
948                         test_programs: List[_TestProgram]) -> None:
949    test_programs.append(_TestProgram(item))
950
951
952def _gather_test_suite(item: Item, source_files: Dict[str, _SourceFile],
953                       _test_programs: List[_TestProgram]) -> None:
954    src = _get_source_file(item["test-target"], source_files)
955    src.add_test_suite(item)
956
957
958def _gather_default(_item: Item, _source_files: Dict[str, _SourceFile],
959                    _test_programs: List[_TestProgram]) -> None:
960    pass
961
962
963_GATHER = {
964    "build/test-program": _gather_test_program,
965    "requirement/functional/action": _gather_action_requirement_test,
966    "runtime-measurement-test": _gather_runtime_measurement_test,
967    "test-case": _gather_test_case,
968    "test-suite": _gather_test_suite,
969}
970
971
972def generate(config: dict, item_cache: ItemCache) -> None:
973    """
974    Generates source files and build specification items for validation test
975    suites and test cases according to the configuration.
976
977    :param config: A dictionary with configuration entries.
978    :param item_cache: The specification item cache containing the validation
979                       test suites and test cases.
980    """
981    source_files = {}  # type: Dict[str, _SourceFile]
982    test_programs = []  # type: List[_TestProgram]
983    for item in item_cache.all.values():
984        _GATHER.get(item.type, _gather_default)(item, source_files,
985                                                test_programs)
986
987    test_case_to_suites = {}  # type: Dict[str, List[_TestItem]]
988    for test_program in test_programs:
989        test_program.add_source_files(source_files)
990        test_suites = []  # type: List[_TestItem]
991        for source_file in test_program.source_files:
992            test_suites.extend(source_file.test_suites)
993        for source_file in test_program.source_files:
994            for test_case in source_file.test_cases:
995                test_case_to_suites.setdefault(test_case.uid,
996                                               []).extend(test_suites)
997
998    for src in source_files.values():
999        src.generate(config["base-directory"], test_case_to_suites)
Note: See TracBrowser for help on using the repository browser.