Skip to content

storage

storage

In-memory storage for reports and batch jobs.

InMemoryStorage

Simple in-memory storage for reports and jobs.

Source code in hier_config_api/utils/storage.py
class InMemoryStorage:
    """Simple in-memory storage for reports and jobs."""

    def __init__(self) -> None:
        """Initialize storage."""
        self._reports: dict[str, dict[str, Any]] = {}
        self._jobs: dict[str, dict[str, Any]] = {}
        self._remediations: dict[str, dict[str, Any]] = {}

    def store_report(self, report_data: dict[str, Any]) -> str:
        """Store a report and return its ID."""
        report_id = str(uuid.uuid4())
        self._reports[report_id] = report_data
        return report_id

    def get_report(self, report_id: str) -> dict[str, Any] | None:
        """Retrieve a report by ID."""
        return self._reports.get(report_id)

    def store_job(self, job_data: dict[str, Any]) -> str:
        """Store a batch job and return its ID."""
        job_id = str(uuid.uuid4())
        self._jobs[job_id] = job_data
        return job_id

    def get_job(self, job_id: str) -> dict[str, Any] | None:
        """Retrieve a batch job by ID."""
        return self._jobs.get(job_id)

    def update_job(self, job_id: str, updates: dict[str, Any]) -> bool:
        """Update a batch job."""
        if job_id in self._jobs:
            self._jobs[job_id].update(updates)
            return True
        return False

    def store_remediation(self, remediation_data: dict[str, Any]) -> str:
        """Store a remediation and return its ID."""
        remediation_id = str(uuid.uuid4())
        self._remediations[remediation_id] = remediation_data
        return remediation_id

    def get_remediation(self, remediation_id: str) -> dict[str, Any] | None:
        """Retrieve a remediation by ID."""
        return self._remediations.get(remediation_id)

    def update_remediation(self, remediation_id: str, updates: dict[str, Any]) -> bool:
        """Update a remediation."""
        if remediation_id in self._remediations:
            self._remediations[remediation_id].update(updates)
            return True
        return False

__init__()

Initialize storage.

Source code in hier_config_api/utils/storage.py
def __init__(self) -> None:
    """Initialize storage."""
    self._reports: dict[str, dict[str, Any]] = {}
    self._jobs: dict[str, dict[str, Any]] = {}
    self._remediations: dict[str, dict[str, Any]] = {}

store_report(report_data)

Store a report and return its ID.

Source code in hier_config_api/utils/storage.py
def store_report(self, report_data: dict[str, Any]) -> str:
    """Store a report and return its ID."""
    report_id = str(uuid.uuid4())
    self._reports[report_id] = report_data
    return report_id

get_report(report_id)

Retrieve a report by ID.

Source code in hier_config_api/utils/storage.py
def get_report(self, report_id: str) -> dict[str, Any] | None:
    """Retrieve a report by ID."""
    return self._reports.get(report_id)

store_job(job_data)

Store a batch job and return its ID.

Source code in hier_config_api/utils/storage.py
def store_job(self, job_data: dict[str, Any]) -> str:
    """Store a batch job and return its ID."""
    job_id = str(uuid.uuid4())
    self._jobs[job_id] = job_data
    return job_id

get_job(job_id)

Retrieve a batch job by ID.

Source code in hier_config_api/utils/storage.py
def get_job(self, job_id: str) -> dict[str, Any] | None:
    """Retrieve a batch job by ID."""
    return self._jobs.get(job_id)

update_job(job_id, updates)

Update a batch job.

Source code in hier_config_api/utils/storage.py
def update_job(self, job_id: str, updates: dict[str, Any]) -> bool:
    """Update a batch job."""
    if job_id in self._jobs:
        self._jobs[job_id].update(updates)
        return True
    return False

store_remediation(remediation_data)

Store a remediation and return its ID.

Source code in hier_config_api/utils/storage.py
def store_remediation(self, remediation_data: dict[str, Any]) -> str:
    """Store a remediation and return its ID."""
    remediation_id = str(uuid.uuid4())
    self._remediations[remediation_id] = remediation_data
    return remediation_id

get_remediation(remediation_id)

Retrieve a remediation by ID.

Source code in hier_config_api/utils/storage.py
def get_remediation(self, remediation_id: str) -> dict[str, Any] | None:
    """Retrieve a remediation by ID."""
    return self._remediations.get(remediation_id)

update_remediation(remediation_id, updates)

Update a remediation.

Source code in hier_config_api/utils/storage.py
def update_remediation(self, remediation_id: str, updates: dict[str, Any]) -> bool:
    """Update a remediation."""
    if remediation_id in self._remediations:
        self._remediations[remediation_id].update(updates)
        return True
    return False