Skip to content

State Manager API

core.engine.state_manager

Functions

get_local_state(exporters_dir=EXPORTERS_DIR)

Reads all local manifest.yaml files to build the current desired state.

Source code in core/engine/state_manager.py
def get_local_state(exporters_dir=EXPORTERS_DIR):
    """
    Reads all local manifest.yaml files to build the current desired state.
    """
    local_state = {}
    if not os.path.isdir(exporters_dir):
        return {}

    for exporter_name in os.listdir(exporters_dir):
        manifest_path = os.path.join(exporters_dir, exporter_name, "manifest.yaml")
        if os.path.exists(manifest_path):
            try:
                with open(manifest_path) as f:
                    data = yaml.safe_load(f)
                    # Normalize version (strip 'v' prefix if present to match catalog standard)
                    version = data["version"].lstrip("v")
                    local_state[exporter_name] = version
            except Exception as e:
                print(f"Error reading {manifest_path}: {e}", file=sys.stderr)
    return local_state

get_remote_catalog(catalog_url=DEFAULT_CATALOG_URL)

Fetches the current state of the repository from the deployed catalog.json. Returns a dictionary keyed by exporter name with version info.

Source code in core/engine/state_manager.py
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type(requests.exceptions.RequestException),
    reraise=True,
)
def get_remote_catalog(catalog_url=DEFAULT_CATALOG_URL):
    """
    Fetches the current state of the repository from the deployed catalog.json.
    Returns a dictionary keyed by exporter name with version info.
    """
    try:
        print(f"Fetching remote catalog from {catalog_url}...", file=sys.stderr)
        r = requests.get(catalog_url, timeout=10)
        if r.status_code == 200:
            data = r.json()
            # Convert list to dict for easier lookup: {'node_exporter': '1.8.1', ...}
            return {item["name"]: item["version"] for item in data.get("exporters", [])}
        else:
            print(
                f"Warning: Remote catalog not found (Status {r.status_code}). Assuming empty state.",
                file=sys.stderr,
            )
            return {}
    except Exception as e:
        print(
            f"Warning: Could not fetch remote catalog: {e}. Assuming empty state.",
            file=sys.stderr,
        )
        return {}