cardano_node_tests.cluster_management package
Submodules
cardano_node_tests.cluster_management.cache module
- class cardano_node_tests.cluster_management.cache.CacheManager[source]
Bases:
objectSet of cache management methods.
- cache: ClassVar[dict[int, ClusterManagerCache]] = {}[source]
- classmethod get_cache() dict[int, ClusterManagerCache][source]
- classmethod get_instance_cache(instance_num: int) ClusterManagerCache[source]
- class cardano_node_tests.cluster_management.cache.ClusterManagerCache(cluster_obj: ~cardano_clusterlib.clusterlib_klass.ClusterLib | None = None, test_data: dict = <factory>, addrs_data: dict = <factory>, last_checksum: str = '')[source]
Bases:
objectCache for a single cluster instance.
Here goes only data that makes sense to reuse in multiple tests.
cardano_node_tests.cluster_management.cluster_getter module
Functionality for obtaining and setting up a cluster instance for parallel test execution.
The ClusterGetter class is responsible for managing a pool of cluster instances and assigning them to tests running in parallel on different pytest workers. It ensures that tests get a suitable, properly configured, and healthy cluster instance to run on.
Coordination between workers is achieved through a system of status files created in a shared temporary directory. These files signal the state of each cluster instance (e.g., running, needs respin), which tests are running on which instance, and what resources are locked or in use.
The core logic is implemented in the get_cluster_instance method. It enters a loop where it evaluates the state of all available cluster instances against the requirements of the current test (e.g., resource needs, custom scripts, priority). It will wait and retry until a suitable instance is found and all conditions for starting the test are met. This includes handling cluster restarts (respins), resource allocation, and synchronization for tests that share expensive setups (marked tests).
- class cardano_node_tests.cluster_management.cluster_getter.ClusterGetter(worker_id: str, pytest_config: Config, num_of_instances: int, log_func: Callable)[source]
Bases:
objectInternal class that encapsulate functionality for getting a cluster instance.
- get_cluster_instance(mark: str = '', lock_resources: Iterable[str | BaseFilter] = (), use_resources: Iterable[str | BaseFilter] = (), prio: bool = False, cleanup: bool = False, scriptsdir: str | Path = '') int[source]
Return a number of initialized cluster instance once we can start the test.
It checks current conditions and waits if the conditions don’t allow to start the test right away.
- Parameters:
mark – A string marking group of tests. Useful when group of tests need the same expensive setup. The mark will make sure the marked tests run on the same cluster instance.
lock_resources – An iterable of resources (names of resources) that will be used exclusively by the test (or marked group of tests). A locked resource cannot be used by other tests.
use_resources – An iterable of resources (names of resources) that will be used by the test (or marked group of tests). The resources can be shared with other tests, however resources in use cannot be locked by other tests.
prio – A boolean indicating that the test has priority in obtaining cluster instance. All other tests that also want to get a cluster instance need to wait.
cleanup – A boolean indicating if the cluster will be respun after the test (or marked group of tests) is finished. Can be used only for tests that locked whole cluster (“singleton” tests).
scriptsdir – Path to custom scripts for the cluster.
cardano_node_tests.cluster_management.cluster_management module
Module for exposing useful components of cluster management.
The cluster management system is designed to manage a pool of Cardano cluster instances for running tests in parallel using pytest-xdist. It coordinates access to these shared cluster instances by multiple test workers.
- Key concepts:
Pool of Instances: Multiple cluster instances can be running concurrently. Each test worker requests a cluster instance to run a test on.
Coordination via File System: Workers communicate and coordinate through a system of status files created on a shared file system. These files act as locks and signals to indicate the state of cluster instances (e.g., which test is running, if a respin is needed, which resources are locked). The status_files module manages the creation and lookup of these files.
Resource Management: Tests can declare what resources they need. A resource can be, for example, a specific feature of a cluster that cannot be used by multiple tests at the same time. The ClusterManager handles locking of these resources so that only one test can use them at a time.
Cluster Respin: Some tests can modify the state of a cluster in a way that it cannot be used by subsequent tests. These tests can request a “respin” of the cluster instance, which re-initializes it to a clean state.
`ClusterManager`: This is the main class that test fixtures interact with. Its get() method is used to acquire a suitable cluster instance for a test, taking into account available instances, resource requirements, and scheduling priority.
This system allows for efficient parallel execution of tests that require a running Cardano cluster, by reusing cluster instances and managing contention for shared resources.
cardano_node_tests.cluster_management.common module
cardano_node_tests.cluster_management.manager module
High-level management of cluster instances.
This module provides the ClusterManager class, which is the main interface for tests to get a fully initialized cluster instance. The ClusterManager is responsible for selecting an available cluster instance that meets the test’s resource requirements, preparing the clusterlib object, and performing cleanup actions after the test has finished.
The ClusterManager is instantiated by the cluster_manager fixture for each test worker and is used by the cluster fixture to get a cluster instance for a test.
- class cardano_node_tests.cluster_management.manager.ClusterManager(worker_id: str, pytest_config: Config)[source]
Bases:
objectSet of management methods for cluster instances.
- property cache: ClusterManagerCache[source]
- cache_fixture(key: str = '') Iterator[FixtureCache[Any]][source]
Cache fixture value - context manager.
- get(mark: str = '', lock_resources: Iterable[str | BaseFilter] = (), use_resources: Iterable[str | BaseFilter] = (), prio: bool = False, cleanup: bool = False, scriptsdir: str | Path = '', check_initialized: bool = True) ClusterLib[source]
Get cardano_clusterlib.ClusterLib object on an initialized cluster instance.
Convenience method that calls init.
- get_locked_resources(from_set: Iterable[str] | None = None, worker_id: str | None = None) list[str][source]
Get resources locked by worker.
It is possible to use glob patterns for worker_id (e.g. worker_id=”*”).
- get_used_resources(from_set: Iterable[str] | None = None, worker_id: str | None = None) list[str][source]
Get resources used by worker.
It is possible to use glob patterns for worker_id (e.g. worker_id=”*”).
- init(mark: str = '', lock_resources: Iterable[str | BaseFilter] = (), use_resources: Iterable[str | BaseFilter] = (), prio: bool = False, cleanup: bool = False, scriptsdir: str | Path = '') None[source]
Get an initialized cluster instance.
This method will wait until a cluster instance is ready to be used.
IMPORTANT: This method must be called before any other method of this class.
- respin_on_failure() Iterator[None][source]
Indicate that the cluster instance needs respin if command failed - context manager.
cardano_node_tests.cluster_management.netstat_tools module
Functions based on netstat.
- cardano_node_tests.cluster_management.netstat_tools.get_netstat_conn() str[source]
Get listing of connections from the netstat command.
cardano_node_tests.cluster_management.resources module
- class cardano_node_tests.cluster_management.resources.Resources[source]
Bases:
objectResources that can be used for lock_resources or use_resources.
cardano_node_tests.cluster_management.resources_management module
Functionality for selecting a cluster instance that has required resources available.
Resources can be requested by name (string).
It is also possible to use filters. A filter is a class that gets a list of all unavailable resources and returns a list of resources that should be used. An example is OneOf, which returns one usable resource from a given list of resources. The unavailable resources passed to the filter include resources that are unavailable because they are locked, and also resources that were already selected by preceding filters in the same request.
It is possible to use multiple OneOf filters in a single request. For example, using OneOf filter with the same set of resources twice will result in selecting two different resources from that set.
- class cardano_node_tests.cluster_management.resources_management.BaseFilter(resources: Iterable[str])[source]
Bases:
objectBase class for resource filters.
- class cardano_node_tests.cluster_management.resources_management.OneOf(resources: Iterable[str])[source]
Bases:
BaseFilterFilter that returns one usable resource out of list of resources.
- cardano_node_tests.cluster_management.resources_management.get_resources(resources: Iterable[str | BaseFilter], unavailable: Iterable[str]) list[str][source]
Get resources that can be used or locked.
cardano_node_tests.cluster_management.status_files module
Cluster instance status files.
Status files are used for communication and synchronization between pytest workers.
All status files are created in the single temp directory shared by all workers (the directory returned by temptools.get_pytest_root_tmp()). This allows all workers to see status files created by other workers.
Common components of status file names: * _@@<resource_name>@@_: resource name * _%%<mark>%%_: test mark * _<worker_id>: pytest worker ID
- cardano_node_tests.cluster_management.status_files.create_cluster_dead_file(instance_num: int) Path[source]
Create the status file that indicates that the cluster instance is in broken state.
- cardano_node_tests.cluster_management.status_files.create_cluster_stopped_file(instance_num: int) Path[source]
Create the status file that indicates that the cluster instance is stopped.
- cardano_node_tests.cluster_management.status_files.create_curr_mark_file(instance_num: int, worker_id: str, mark: str) Path[source]
Create the status file that indicates presence of marked test on a pytest worker.
- cardano_node_tests.cluster_management.status_files.create_prio_in_progress_file(worker_id: str) Path[source]
Create the status file that indicates that priority test is in progress.
- cardano_node_tests.cluster_management.status_files.create_resource_locked_files(instance_num: int, worker_id: str, lock_names: Iterable[str], mark: str = '') list[Path][source]
Create status files that indicate that the given resources are locked.
- cardano_node_tests.cluster_management.status_files.create_resource_used_files(instance_num: int, worker_id: str, use_names: Iterable[str], mark: str = '') list[Path][source]
Create status files that indicate that the given resources are in use.
- cardano_node_tests.cluster_management.status_files.create_respin_after_mark_file(instance_num: int, worker_id: str, mark: str) Path[source]
Create the status file that indicates that the cluster instance needs respin.
The respin will happen after marked tests are finished on the dedicated cluster instance.
- cardano_node_tests.cluster_management.status_files.create_respin_needed_file(instance_num: int, worker_id: str) Path[source]
Create the status file that indicates that the cluster instance needs respin.
- cardano_node_tests.cluster_management.status_files.create_respin_progress_file(instance_num: int, worker_id: str) Path[source]
Create the status file that indicates that respin is in progress.
- cardano_node_tests.cluster_management.status_files.create_started_by_framework_file(state_dir: Path) Path[source]
Create the status file that indicates the cluster instance was started by test framework.
- cardano_node_tests.cluster_management.status_files.create_test_running_file(instance_num: int, worker_id: str, test_id: str, mark: str = '') Path[source]
Create the status file that indicates that a test is running on a pytest worker.
Save the test name in the status file.
- cardano_node_tests.cluster_management.status_files.get_cluster_dead_file(instance_num: int) Path[source]
Return the status file that indicates that the cluster instance is in broken state.
- cardano_node_tests.cluster_management.status_files.get_cluster_running_file(instance_num: int) Path[source]
Return the status file that indicates that the cluster instance is running.
- cardano_node_tests.cluster_management.status_files.get_cluster_stopped_file(instance_num: int) Path[source]
Return the status file that indicates that the cluster instance is stopped.
- cardano_node_tests.cluster_management.status_files.get_curr_mark_file(instance_num: int, worker_id: str, mark: str) Path[source]
Return the status file that indicates presence of marked test on a pytest worker.
- cardano_node_tests.cluster_management.status_files.get_instance_dir(instance_num: int) Path[source]
Return cluster instance directory for the given instance number.
- cardano_node_tests.cluster_management.status_files.get_marks_in_progress(instance_num: int | None = None, worker_id: str = '*') list[str][source]
Return list of marks that are in progress.
- cardano_node_tests.cluster_management.status_files.get_prio_in_progress_file(worker_id: str) Path[source]
Return the status file that indicates that priority test is in progress.
- cardano_node_tests.cluster_management.status_files.get_resources_from_path(paths: Iterable[Path]) list[str][source]
Get resources names from status files path.
- cardano_node_tests.cluster_management.status_files.get_respin_after_mark_file(instance_num: int, worker_id: str, mark: str) Path[source]
Return the status file that indicates that the cluster instance needs respin.
The respin will happen after marked tests are finished on the dedicated cluster instance.
- cardano_node_tests.cluster_management.status_files.get_respin_needed_file(instance_num: int, worker_id: str) Path[source]
Return the status file that indicates that the cluster instance needs respin.
- cardano_node_tests.cluster_management.status_files.get_respin_progress_file(instance_num: int, worker_id: str) Path[source]
Return the status file that indicates that respin is in progress.
- cardano_node_tests.cluster_management.status_files.get_started_by_framework_file(state_dir: Path) Path[source]
Return the status file that indicates the cluster instance was started by test framework.
- cardano_node_tests.cluster_management.status_files.get_test_names(instance_num: int | None = None, worker_id: str = '*', mark: str | None = None) list[str][source]
Return list of test names that are currently running.
If mark is None, list all status files regarless whether any mark is present or not. If mark is *, list all status files that have any mark. If mark is an empty string, list all status files that don’t have mark.
- cardano_node_tests.cluster_management.status_files.get_test_running_file(instance_num: int, worker_id: str, mark: str = '') Path[source]
Return the status file that indicates that a test is running on a pytest worker.
- cardano_node_tests.cluster_management.status_files.list_cluster_dead_files(instance_num: int | None = None) list[Path][source]
List all “cluster dead” status files.
- cardano_node_tests.cluster_management.status_files.list_curr_mark_files(instance_num: int | None = None, worker_id: str = '*', mark: str = '*') list[Path][source]
List all “current mark” status files.
- cardano_node_tests.cluster_management.status_files.list_prio_in_progress_files(worker_id: str = '*') list[Path][source]
List all “priority test in progress” status files.
- cardano_node_tests.cluster_management.status_files.list_resource_locked_files(instance_num: int | None = None, worker_id: str = '*', mark: str | None = None) list[Path][source]
List all “resource locked” status files.
If mark is None, list all status files regarless whether any mark is present or not. If mark is *, list all status files that have any mark. If mark is an empty string, list all status files that don’t have mark.
- cardano_node_tests.cluster_management.status_files.list_resource_used_files(instance_num: int | None = None, worker_id: str = '*', mark: str | None = None) list[Path][source]
List all “resource used” status files.
If mark is None, list all status files regarless whether any mark is present or not. If mark is *, list all status files that have any mark. If mark is an empty string, list all status files that don’t have mark.
- cardano_node_tests.cluster_management.status_files.list_respin_after_mark_files(instance_num: int | None = None, worker_id: str = '*', mark: str = '*') list[Path][source]
List all “respin after mark” status files.
- cardano_node_tests.cluster_management.status_files.list_respin_needed_files(instance_num: int | None = None, worker_id: str = '*') list[Path][source]
List all “needs respin” status files.
- cardano_node_tests.cluster_management.status_files.list_respin_progress_files(instance_num: int | None = None, worker_id: str = '*') list[Path][source]
List all “respin in progress” status files.
- cardano_node_tests.cluster_management.status_files.list_test_running_files(instance_num: int | None = None, worker_id: str = '*', mark: str | None = None) list[Path][source]
List all “test running” status files.
If mark is None, list all status files regarless whether any mark is present or not. If mark is *, list all status files that have any mark. If mark is an empty string, list all status files that don’t have mark.
- cardano_node_tests.cluster_management.status_files.rm_curr_mark_files(instance_num: int | None = None, worker_id: str = '*', mark: str = '*') list[Path][source]
Delete all “current mark” status files.
- cardano_node_tests.cluster_management.status_files.rm_prio_in_progress_files(worker_id: str = '*') list[Path][source]
Delete all “priority test in progress” status files.
- cardano_node_tests.cluster_management.status_files.rm_resource_locked_files(instance_num: int | None = None, worker_id: str = '*', mark: str | None = None) list[Path][source]
Delete all “resource locked” status files.
If mark is None, list all status files regarless whether any mark is present or not. If mark is *, list all status files that have any mark. If mark is an empty string, list all status files that don’t have mark.
- cardano_node_tests.cluster_management.status_files.rm_resource_used_files(instance_num: int | None = None, worker_id: str = '*', mark: str | None = None) list[Path][source]
Delete all “resource used” status files.
If mark is None, list all status files regarless whether any mark is present or not. If mark is *, list all status files that have any mark. If mark is an empty string, list all status files that don’t have mark.
- cardano_node_tests.cluster_management.status_files.rm_respin_after_mark_files(instance_num: int | None = None, worker_id: str = '*', mark: str = '*') list[Path][source]
Delete all “respin after mark” status files.
- cardano_node_tests.cluster_management.status_files.rm_respin_needed_files(instance_num: int | None = None, worker_id: str = '*') list[Path][source]
Delete all “needs respin” status files.
- cardano_node_tests.cluster_management.status_files.rm_respin_progress_files(instance_num: int | None = None, worker_id: str = '*') list[Path][source]
Delete all “respin in progress” status files.
- cardano_node_tests.cluster_management.status_files.rm_test_running_files(instance_num: int | None = None, worker_id: str = '*', mark: str | None = None) list[Path][source]
Delete all “test running” status files.
If mark is None, list all status files regarless whether any mark is present or not. If mark is *, list all status files that have any mark. If mark is an empty string, list all status files that don’t have mark.
Module contents
Functionality for parallel execution of tests on multiple cluster instances.