"""Encrypted quarantine vault for AYN Antivirus. Isolates malicious files by encrypting them with Fernet (AES-128-CBC + HMAC-SHA256) and storing them alongside JSON metadata in a dedicated vault directory. Files can be restored, inspected, or permanently deleted. """ from __future__ import annotations import fcntl import json import logging import os import re import shutil import stat from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, List, Optional from uuid import uuid4 from cryptography.fernet import Fernet from ayn_antivirus.constants import ( DEFAULT_QUARANTINE_PATH, QUARANTINE_ENCRYPTION_KEY_FILE, SCAN_CHUNK_SIZE, ) from ayn_antivirus.core.event_bus import EventType, event_bus logger = logging.getLogger(__name__) class QuarantineVault: """Encrypted file quarantine vault. Parameters ---------- quarantine_dir: Directory where encrypted files and metadata are stored. key_file_path: Path to the Fernet key file. Generated automatically on first use. """ _VALID_QID_PATTERN = re.compile(r'^[a-f0-9]{32}$') # Directories that should never be a restore destination. _BLOCKED_DIRS = frozenset({ Path("/etc"), Path("/usr/bin"), Path("/usr/sbin"), Path("/sbin"), Path("/bin"), Path("/boot"), Path("/root/.ssh"), Path("/proc"), Path("/sys"), Path("/dev"), Path("/var/run"), }) # Directories used for scheduled tasks — never restore into these. _CRON_DIRS = frozenset({ Path("/etc/cron.d"), Path("/etc/cron.daily"), Path("/etc/cron.hourly"), Path("/var/spool/cron"), Path("/etc/systemd"), }) def __init__( self, quarantine_dir: str | Path = DEFAULT_QUARANTINE_PATH, key_file_path: str | Path = QUARANTINE_ENCRYPTION_KEY_FILE, ) -> None: self.vault_dir = Path(quarantine_dir) self.key_file = Path(key_file_path) self._fernet: Optional[Fernet] = None # Ensure directories exist. self.vault_dir.mkdir(parents=True, exist_ok=True) self.key_file.parent.mkdir(parents=True, exist_ok=True) # ------------------------------------------------------------------ # Input validation # ------------------------------------------------------------------ def _validate_qid(self, quarantine_id: str) -> str: """Validate quarantine ID is a hex UUID (no path traversal). Raises :class:`ValueError` if the ID does not match the expected 32-character hexadecimal format. """ qid = quarantine_id.strip() if not self._VALID_QID_PATTERN.match(qid): raise ValueError( f"Invalid quarantine ID format: {quarantine_id!r} " f"(must be 32 hex chars)" ) return qid def _validate_restore_path(self, path_str: str) -> Path: """Validate restore path to prevent directory traversal. Blocks restoring to sensitive system directories and scheduled- task directories. Resolves all paths to handle symlinks like ``/etc`` → ``/private/etc`` on macOS. """ dest = Path(path_str).resolve() for blocked in self._BLOCKED_DIRS: resolved = blocked.resolve() if dest == resolved or resolved in dest.parents or dest.parent == resolved: raise ValueError(f"Refusing to restore to protected path: {dest}") for cron_dir in self._CRON_DIRS: resolved = cron_dir.resolve() if resolved in dest.parents or dest.parent == resolved: raise ValueError( f"Refusing to restore to scheduled task directory: {dest}" ) return dest # ------------------------------------------------------------------ # Key management # ------------------------------------------------------------------ def _get_fernet(self) -> Fernet: """Return the cached Fernet instance, loading or generating the key.""" if self._fernet is not None: return self._fernet if self.key_file.exists(): key = self.key_file.read_bytes().strip() else: key = Fernet.generate_key() # Write key with restricted permissions. fd = os.open( str(self.key_file), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600, ) try: os.write(fd, key + b"\n") finally: os.close(fd) logger.info("Generated new quarantine encryption key: %s", self.key_file) self._fernet = Fernet(key) return self._fernet # ------------------------------------------------------------------ # Quarantine # ------------------------------------------------------------------ def quarantine_file( self, file_path: str | Path, threat_info: Dict[str, Any], ) -> str: """Encrypt and move a file into the vault. Parameters ---------- file_path: Path to the file to quarantine. threat_info: Metadata dict (typically from a detector result). Expected keys: ``threat_name``, ``threat_type``, ``severity``, ``file_hash``. Returns ------- str The quarantine ID (UUID) for this entry. """ src = Path(file_path).resolve() if not src.is_file(): raise FileNotFoundError(f"Cannot quarantine: {src} does not exist or is not a file") qid = uuid4().hex fernet = self._get_fernet() # Lock, read, and encrypt (prevents TOCTOU races). with open(src, "rb") as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) plaintext = f.read() st = os.fstat(f.fileno()) fcntl.flock(f.fileno(), fcntl.LOCK_UN) ciphertext = fernet.encrypt(plaintext) # Gather metadata. meta = { "id": qid, "original_path": str(src), "original_permissions": oct(st.st_mode & 0o7777), "threat_name": threat_info.get("threat_name", "Unknown"), "threat_type": threat_info.get("threat_type", "MALWARE"), "severity": threat_info.get("severity", "HIGH"), "quarantine_date": datetime.utcnow().isoformat(), "file_hash": threat_info.get("file_hash", ""), "file_size": st.st_size, } # Write encrypted file + metadata. enc_path = self.vault_dir / f"{qid}.enc" meta_path = self.vault_dir / f"{qid}.json" enc_path.write_bytes(ciphertext) meta_path.write_text(json.dumps(meta, indent=2)) # Remove original. try: src.unlink() logger.info("Quarantined %s → %s (threat: %s)", src, qid, meta["threat_name"]) except OSError as exc: logger.warning("Encrypted copy saved but failed to remove original %s: %s", src, exc) event_bus.publish(EventType.QUARANTINE_ACTION, { "action": "quarantine", "quarantine_id": qid, "original_path": str(src), "threat_name": meta["threat_name"], }) return qid # ------------------------------------------------------------------ # Restore # ------------------------------------------------------------------ def restore_file( self, quarantine_id: str, restore_path: Optional[str | Path] = None, ) -> str: """Decrypt and restore a quarantined file. Parameters ---------- quarantine_id: UUID returned by :meth:`quarantine_file`. restore_path: Where to write the restored file. Defaults to the original path. Returns ------- str Absolute path of the restored file. Raises ------ ValueError If the quarantine ID is malformed or the restore path points to a protected system directory. """ qid = self._validate_qid(quarantine_id) meta = self._load_meta(qid) enc_path = self.vault_dir / f"{qid}.enc" if not enc_path.exists(): raise FileNotFoundError(f"Encrypted file not found for quarantine ID {qid}") # Validate restore destination. if restore_path: dest = self._validate_restore_path(str(restore_path)) else: dest = self._validate_restore_path(str(meta["original_path"])) fernet = self._get_fernet() ciphertext = enc_path.read_bytes() plaintext = fernet.decrypt(ciphertext) dest.parent.mkdir(parents=True, exist_ok=True) dest.write_bytes(plaintext) # Restore original permissions, stripping SUID/SGID/sticky bits. try: perms = int(meta.get("original_permissions", "0o644"), 8) perms = perms & 0o0777 # Keep only rwx bits dest.chmod(perms) except (ValueError, OSError): pass logger.info("Restored quarantined file %s → %s", qid, dest) event_bus.publish(EventType.QUARANTINE_ACTION, { "action": "restore", "quarantine_id": qid, "restored_path": str(dest), }) return str(dest.resolve()) # ------------------------------------------------------------------ # Delete # ------------------------------------------------------------------ def delete_file(self, quarantine_id: str) -> bool: """Permanently remove a quarantined entry (encrypted file + metadata). Returns ``True`` if files were deleted. """ qid = self._validate_qid(quarantine_id) enc_path = self.vault_dir / f"{qid}.enc" meta_path = self.vault_dir / f"{qid}.json" deleted = False for p in (enc_path, meta_path): if p.exists(): p.unlink() deleted = True if deleted: logger.info("Permanently deleted quarantine entry: %s", qid) event_bus.publish(EventType.QUARANTINE_ACTION, { "action": "delete", "quarantine_id": qid, }) return deleted # ------------------------------------------------------------------ # Listing / info # ------------------------------------------------------------------ def list_quarantined(self) -> List[Dict[str, Any]]: """Return a summary list of all quarantined items.""" items: List[Dict[str, Any]] = [] for meta_file in sorted(self.vault_dir.glob("*.json")): try: meta = json.loads(meta_file.read_text()) items.append({ "id": meta.get("id", meta_file.stem), "original_path": meta.get("original_path", "?"), "threat_name": meta.get("threat_name", "?"), "quarantine_date": meta.get("quarantine_date", "?"), "size": meta.get("file_size", 0), }) except (json.JSONDecodeError, OSError): continue return items def get_info(self, quarantine_id: str) -> Dict[str, Any]: """Return full metadata for a quarantine entry. Raises ``FileNotFoundError`` if the ID is unknown. """ qid = self._validate_qid(quarantine_id) return self._load_meta(qid) def count(self) -> int: """Number of items currently in the vault.""" return len(list(self.vault_dir.glob("*.json"))) # ------------------------------------------------------------------ # Maintenance # ------------------------------------------------------------------ def clean_old(self, days: int = 30) -> int: """Delete quarantine entries older than *days*. Returns the number of entries removed. """ cutoff = datetime.utcnow() - timedelta(days=days) removed = 0 for meta_file in self.vault_dir.glob("*.json"): try: meta = json.loads(meta_file.read_text()) qdate = datetime.fromisoformat(meta.get("quarantine_date", "")) if qdate < cutoff: qid = meta.get("id", meta_file.stem) self.delete_file(qid) removed += 1 except (json.JSONDecodeError, ValueError, OSError): continue if removed: logger.info("Cleaned %d quarantine entries older than %d days", removed, days) return removed # ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ def _load_meta(self, quarantine_id: str) -> Dict[str, Any]: qid = self._validate_qid(quarantine_id) meta_path = self.vault_dir / f"{qid}.json" if not meta_path.exists(): raise FileNotFoundError(f"Quarantine metadata not found: {qid}") return json.loads(meta_path.read_text())