How to use the pybuilder.execution.TaskDependency function in pybuilder

To help you get started, we’ve selected a few pybuilder examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github pybuilder / pybuilder / src / unittest / python / execution_tests.py View on Github external
def test_ensure_that_dependencies_are_resolved_when_simple_dependency_is_found(self):
        one = Mock(name="one", dependencies=[])
        two = Mock(name="two", dependencies=[TaskDependency("one")])

        self.execution_manager.register_task(one, two)
        self.execution_manager.resolve_dependencies()

        self.assertEqual([], self.execution_manager._task_dependencies.get("one"))
        self.assertEqual([TaskDependency(one)], self.execution_manager._task_dependencies.get("two"))
github pybuilder / pybuilder / src / unittest / python / execution_tests.py View on Github external
def test_should_collect_all_tasks_when_there_is_a_transitive_dependency(self):
        one = Mock(name="one", dependencies=[TaskDependency("two")])
        two = Mock(name="two", dependencies=[TaskDependency("three")])
        three = Mock(name="three", dependencies=[])
        self.execution_manager.register_task(one, two, three)
        self.execution_manager.resolve_dependencies()

        self.assertEqual(self.execution_manager.collect_all_transitive_tasks(["one"]), set([one, two, three]))
github pybuilder / pybuilder / src / unittest / python / execution_tests.py View on Github external
def test_is_task_in_current_execution_plan(self):
        one = Mock(name="one", dependencies=[])
        two = Mock(name="two", dependencies=[TaskDependency("one")])
        three = Mock(name="three", dependencies=[TaskDependency("one"), TaskDependency("two")])

        self.execution_manager.register_task(one, two, three)
        self.execution_manager.resolve_dependencies(exclude_all_optional=True)
        self.execution_manager._current_execution_plan = self.execution_manager.build_execution_plan("three")

        self.assertTrue(self.execution_manager.is_task_in_current_execution_plan("three"))
        self.assertTrue(self.execution_manager.is_task_in_current_execution_plan("two"))
        self.assertTrue(self.execution_manager.is_task_in_current_execution_plan("one"))
        self.assertFalse(self.execution_manager.is_task_in_current_execution_plan("four"))
github pybuilder / pybuilder / src / unittest / python / execution_tests.py View on Github external
def test_shortest_execution_plan_reruns_on_demand(self):
        one = Mock(name="one", dependencies=[])
        two = Mock(name="two", dependencies=[TaskDependency("one")])
        three = Mock(name="three", dependencies=[TaskDependency("two")])

        self.execution_manager.register_task(one, two, three)
        self.execution_manager.resolve_dependencies()

        self.execution_manager._tasks_executed.append(one)
        self.execution_manager._tasks_executed.append(two)

        self.assertEqual([two, three], self.execution_manager.build_shortest_execution_plan(("two", "three")))
        self.assertEqual([two, three], self.execution_manager.build_shortest_execution_plan(("three", "two")))
        self.assertEqual([one, two, three], self.execution_manager.build_shortest_execution_plan(("three", "one")))
github pybuilder / pybuilder / src / unittest / python / execution_tests.py View on Github external
def test_verify_error_unresolved_late_dependency(self):
        one = Mock(name="one", dependencies=[])
        two = Mock(name="two", dependencies=[TaskDependency("one")])
        three = Mock(name="three", dependencies=[TaskDependency("one")])

        self.execution_manager.register_task(one, two)
        self.execution_manager.register_task(three)
        self.execution_manager.register_late_task_dependencies(
            {"four": [TaskDependency("three")]})
        self.assertRaises(NoSuchTaskException, self.execution_manager.resolve_dependencies)
github pybuilder / pybuilder / src / unittest / python / execution_tests.py View on Github external
def test_verify_duplicate_late_dependency_removed(self):
        one = Mock(name="one", dependencies=[])
        two = Mock(name="two", dependencies=[TaskDependency("one")])
        three = Mock(name="three", dependencies=[TaskDependency("one"), TaskDependency("one")])

        self.execution_manager.register_task(one, two)
        self.execution_manager.register_task(three)
        self.execution_manager.register_late_task_dependencies(
            {"two": [TaskDependency("three"), TaskDependency("three")]})
        self.execution_manager.resolve_dependencies()

        self.assertEqual([], self.execution_manager._task_dependencies.get("one"))
        self.assertEqual([TaskDependency(one), TaskDependency(three)],
                         self.execution_manager._task_dependencies.get("two"))
        self.assertEqual([TaskDependency(one)],
                         self.execution_manager._task_dependencies.get("three"))
github pybuilder / pybuilder / src / unittest / python / execution_tests.py View on Github external
def test_ensure_that_required_branch_is_force_excluded(self):
        one = Mock(name="one", dependencies=[])
        two = Mock(name="two", dependencies=[TaskDependency("one")])
        three = Mock(name="three", dependencies=[TaskDependency("two")])

        self.execution_manager.register_task(one, two, three)
        self.execution_manager.resolve_dependencies(exclude_tasks=["two"])

        execution_plan = self.execution_manager.build_execution_plan("three")

        self.assertEqual([three], execution_plan)
github pybuilder / pybuilder / src / unittest / python / reactor_tests.py View on Github external
module.task1 = task1
            module.task2 = task2
            module.task3 = task3
            module.task4 = task4
            module.task5 = task5
            module.task6 = task6

            self.reactor.collect_tasks_and_actions_and_initializers(module)

            pybuilder.reactor.Task.assert_has_calls([call("task1", task1, [], ''),
                                                     call("task2", task2, [TaskDependency(task1)], ''),
                                                     call("task3", task3, [TaskDependency(task5, True)], ''),
                                                     call("task4", task4, [TaskDependency(task3, True)], ''),
                                                     call("task5", task5, [], ''),
                                                     call("task6", task6,
                                                          [TaskDependency(task1), TaskDependency(task2, True),
                                                           TaskDependency(task4), TaskDependency(task5)], '')])
github pybuilder / pybuilder / src / unittest / python / execution_tests.py View on Github external
def test_verify_error_unresolved_late_dependency(self):
        one = Mock(name="one", dependencies=[])
        two = Mock(name="two", dependencies=[TaskDependency("one")])
        three = Mock(name="three", dependencies=[TaskDependency("one")])

        self.execution_manager.register_task(one, two)
        self.execution_manager.register_task(three)
        self.execution_manager.register_late_task_dependencies(
            {"four": [TaskDependency("three")]})
        self.assertRaises(NoSuchTaskException, self.execution_manager.resolve_dependencies)
github pybuilder / pybuilder / src / main / python / pybuilder / reactor.py View on Github external
for name in dir(project_module):
            candidate = getattr(project_module, name)
            name = normalize_candidate_name(candidate)

            description = getattr(candidate, DESCRIPTION_ATTRIBUTE, "")

            if getattr(candidate, TASK_ATTRIBUTE, None):
                dependencies = getattr(candidate, DEPENDS_ATTRIBUTE, None)

                task_dependencies = list()
                if dependencies:
                    dependencies = list(as_list(dependencies))
                    for d in dependencies:
                        if isinstance(d, optional):
                            d = as_list(d())
                            task_dependencies.extend([TaskDependency(item, True) for item in d])
                        else:
                            task_dependencies.append(TaskDependency(d))

                # Add injected
                if name in injected_task_dependencies:
                    task_dependencies.extend(injected_task_dependencies[name])
                    del injected_task_dependencies[name]

                self.logger.debug("Found task '%s' with dependencies %s", name, task_dependencies)
                self.execution_manager.register_task(
                    Task(name, candidate, task_dependencies, description))

            elif getattr(candidate, ACTION_ATTRIBUTE, None):
                before = getattr(candidate, BEFORE_ATTRIBUTE, None)
                after = getattr(candidate, AFTER_ATTRIBUTE, None)