379 lines
13 KiB
Python
379 lines
13 KiB
Python
"""Encrypted quarantine vault for AYN Antivirus.
|
|
|
|
Isolates malicious files by encrypting them with Fernet (AES-128-CBC +
|
|
HMAC-SHA256) and storing them alongside JSON metadata in a dedicated
|
|
vault directory. Files can be restored, inspected, or permanently deleted.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import fcntl
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
import stat
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
from uuid import uuid4
|
|
|
|
from cryptography.fernet import Fernet
|
|
|
|
from ayn_antivirus.constants import (
|
|
DEFAULT_QUARANTINE_PATH,
|
|
QUARANTINE_ENCRYPTION_KEY_FILE,
|
|
SCAN_CHUNK_SIZE,
|
|
)
|
|
from ayn_antivirus.core.event_bus import EventType, event_bus
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class QuarantineVault:
|
|
"""Encrypted file quarantine vault.
|
|
|
|
Parameters
|
|
----------
|
|
quarantine_dir:
|
|
Directory where encrypted files and metadata are stored.
|
|
key_file_path:
|
|
Path to the Fernet key file. Generated automatically on first use.
|
|
"""
|
|
|
|
_VALID_QID_PATTERN = re.compile(r'^[a-f0-9]{32}$')
|
|
|
|
# Directories that should never be a restore destination.
|
|
_BLOCKED_DIRS = frozenset({
|
|
Path("/etc"), Path("/usr/bin"), Path("/usr/sbin"), Path("/sbin"),
|
|
Path("/bin"), Path("/boot"), Path("/root/.ssh"), Path("/proc"),
|
|
Path("/sys"), Path("/dev"), Path("/var/run"),
|
|
})
|
|
|
|
# Directories used for scheduled tasks — never restore into these.
|
|
_CRON_DIRS = frozenset({
|
|
Path("/etc/cron.d"), Path("/etc/cron.daily"),
|
|
Path("/etc/cron.hourly"), Path("/var/spool/cron"),
|
|
Path("/etc/systemd"),
|
|
})
|
|
|
|
def __init__(
|
|
self,
|
|
quarantine_dir: str | Path = DEFAULT_QUARANTINE_PATH,
|
|
key_file_path: str | Path = QUARANTINE_ENCRYPTION_KEY_FILE,
|
|
) -> None:
|
|
self.vault_dir = Path(quarantine_dir)
|
|
self.key_file = Path(key_file_path)
|
|
self._fernet: Optional[Fernet] = None
|
|
|
|
# Ensure directories exist.
|
|
self.vault_dir.mkdir(parents=True, exist_ok=True)
|
|
self.key_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Input validation
|
|
# ------------------------------------------------------------------
|
|
|
|
def _validate_qid(self, quarantine_id: str) -> str:
|
|
"""Validate quarantine ID is a hex UUID (no path traversal).
|
|
|
|
Raises :class:`ValueError` if the ID does not match the expected
|
|
32-character hexadecimal format.
|
|
"""
|
|
qid = quarantine_id.strip()
|
|
if not self._VALID_QID_PATTERN.match(qid):
|
|
raise ValueError(
|
|
f"Invalid quarantine ID format: {quarantine_id!r} "
|
|
f"(must be 32 hex chars)"
|
|
)
|
|
return qid
|
|
|
|
def _validate_restore_path(self, path_str: str) -> Path:
|
|
"""Validate restore path to prevent directory traversal.
|
|
|
|
Blocks restoring to sensitive system directories and scheduled-
|
|
task directories. Resolves all paths to handle symlinks like
|
|
``/etc`` → ``/private/etc`` on macOS.
|
|
"""
|
|
dest = Path(path_str).resolve()
|
|
for blocked in self._BLOCKED_DIRS:
|
|
resolved = blocked.resolve()
|
|
if dest == resolved or resolved in dest.parents or dest.parent == resolved:
|
|
raise ValueError(f"Refusing to restore to protected path: {dest}")
|
|
for cron_dir in self._CRON_DIRS:
|
|
resolved = cron_dir.resolve()
|
|
if resolved in dest.parents or dest.parent == resolved:
|
|
raise ValueError(
|
|
f"Refusing to restore to scheduled task directory: {dest}"
|
|
)
|
|
return dest
|
|
|
|
# ------------------------------------------------------------------
|
|
# Key management
|
|
# ------------------------------------------------------------------
|
|
|
|
def _get_fernet(self) -> Fernet:
|
|
"""Return the cached Fernet instance, loading or generating the key."""
|
|
if self._fernet is not None:
|
|
return self._fernet
|
|
|
|
if self.key_file.exists():
|
|
key = self.key_file.read_bytes().strip()
|
|
else:
|
|
key = Fernet.generate_key()
|
|
# Write key with restricted permissions.
|
|
fd = os.open(
|
|
str(self.key_file),
|
|
os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
|
|
0o600,
|
|
)
|
|
try:
|
|
os.write(fd, key + b"\n")
|
|
finally:
|
|
os.close(fd)
|
|
logger.info("Generated new quarantine encryption key: %s", self.key_file)
|
|
|
|
self._fernet = Fernet(key)
|
|
return self._fernet
|
|
|
|
# ------------------------------------------------------------------
|
|
# Quarantine
|
|
# ------------------------------------------------------------------
|
|
|
|
def quarantine_file(
|
|
self,
|
|
file_path: str | Path,
|
|
threat_info: Dict[str, Any],
|
|
) -> str:
|
|
"""Encrypt and move a file into the vault.
|
|
|
|
Parameters
|
|
----------
|
|
file_path:
|
|
Path to the file to quarantine.
|
|
threat_info:
|
|
Metadata dict (typically from a detector result). Expected keys:
|
|
``threat_name``, ``threat_type``, ``severity``, ``file_hash``.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
The quarantine ID (UUID) for this entry.
|
|
"""
|
|
src = Path(file_path).resolve()
|
|
if not src.is_file():
|
|
raise FileNotFoundError(f"Cannot quarantine: {src} does not exist or is not a file")
|
|
|
|
qid = uuid4().hex
|
|
fernet = self._get_fernet()
|
|
|
|
# Lock, read, and encrypt (prevents TOCTOU races).
|
|
with open(src, "rb") as f:
|
|
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
|
|
plaintext = f.read()
|
|
st = os.fstat(f.fileno())
|
|
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
|
|
ciphertext = fernet.encrypt(plaintext)
|
|
|
|
# Gather metadata.
|
|
meta = {
|
|
"id": qid,
|
|
"original_path": str(src),
|
|
"original_permissions": oct(st.st_mode & 0o7777),
|
|
"threat_name": threat_info.get("threat_name", "Unknown"),
|
|
"threat_type": threat_info.get("threat_type", "MALWARE"),
|
|
"severity": threat_info.get("severity", "HIGH"),
|
|
"quarantine_date": datetime.utcnow().isoformat(),
|
|
"file_hash": threat_info.get("file_hash", ""),
|
|
"file_size": st.st_size,
|
|
}
|
|
|
|
# Write encrypted file + metadata.
|
|
enc_path = self.vault_dir / f"{qid}.enc"
|
|
meta_path = self.vault_dir / f"{qid}.json"
|
|
enc_path.write_bytes(ciphertext)
|
|
meta_path.write_text(json.dumps(meta, indent=2))
|
|
|
|
# Remove original.
|
|
try:
|
|
src.unlink()
|
|
logger.info("Quarantined %s → %s (threat: %s)", src, qid, meta["threat_name"])
|
|
except OSError as exc:
|
|
logger.warning("Encrypted copy saved but failed to remove original %s: %s", src, exc)
|
|
|
|
event_bus.publish(EventType.QUARANTINE_ACTION, {
|
|
"action": "quarantine",
|
|
"quarantine_id": qid,
|
|
"original_path": str(src),
|
|
"threat_name": meta["threat_name"],
|
|
})
|
|
|
|
return qid
|
|
|
|
# ------------------------------------------------------------------
|
|
# Restore
|
|
# ------------------------------------------------------------------
|
|
|
|
def restore_file(
|
|
self,
|
|
quarantine_id: str,
|
|
restore_path: Optional[str | Path] = None,
|
|
) -> str:
|
|
"""Decrypt and restore a quarantined file.
|
|
|
|
Parameters
|
|
----------
|
|
quarantine_id:
|
|
UUID returned by :meth:`quarantine_file`.
|
|
restore_path:
|
|
Where to write the restored file. Defaults to the original path.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
Absolute path of the restored file.
|
|
|
|
Raises
|
|
------
|
|
ValueError
|
|
If the quarantine ID is malformed or the restore path points
|
|
to a protected system directory.
|
|
"""
|
|
qid = self._validate_qid(quarantine_id)
|
|
meta = self._load_meta(qid)
|
|
enc_path = self.vault_dir / f"{qid}.enc"
|
|
|
|
if not enc_path.exists():
|
|
raise FileNotFoundError(f"Encrypted file not found for quarantine ID {qid}")
|
|
|
|
# Validate restore destination.
|
|
if restore_path:
|
|
dest = self._validate_restore_path(str(restore_path))
|
|
else:
|
|
dest = self._validate_restore_path(str(meta["original_path"]))
|
|
|
|
fernet = self._get_fernet()
|
|
ciphertext = enc_path.read_bytes()
|
|
plaintext = fernet.decrypt(ciphertext)
|
|
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
dest.write_bytes(plaintext)
|
|
|
|
# Restore original permissions, stripping SUID/SGID/sticky bits.
|
|
try:
|
|
perms = int(meta.get("original_permissions", "0o644"), 8)
|
|
perms = perms & 0o0777 # Keep only rwx bits
|
|
dest.chmod(perms)
|
|
except (ValueError, OSError):
|
|
pass
|
|
|
|
logger.info("Restored quarantined file %s → %s", qid, dest)
|
|
|
|
event_bus.publish(EventType.QUARANTINE_ACTION, {
|
|
"action": "restore",
|
|
"quarantine_id": qid,
|
|
"restored_path": str(dest),
|
|
})
|
|
|
|
return str(dest.resolve())
|
|
|
|
# ------------------------------------------------------------------
|
|
# Delete
|
|
# ------------------------------------------------------------------
|
|
|
|
def delete_file(self, quarantine_id: str) -> bool:
|
|
"""Permanently remove a quarantined entry (encrypted file + metadata).
|
|
|
|
Returns ``True`` if files were deleted.
|
|
"""
|
|
qid = self._validate_qid(quarantine_id)
|
|
enc_path = self.vault_dir / f"{qid}.enc"
|
|
meta_path = self.vault_dir / f"{qid}.json"
|
|
|
|
deleted = False
|
|
for p in (enc_path, meta_path):
|
|
if p.exists():
|
|
p.unlink()
|
|
deleted = True
|
|
|
|
if deleted:
|
|
logger.info("Permanently deleted quarantine entry: %s", qid)
|
|
event_bus.publish(EventType.QUARANTINE_ACTION, {
|
|
"action": "delete",
|
|
"quarantine_id": qid,
|
|
})
|
|
|
|
return deleted
|
|
|
|
# ------------------------------------------------------------------
|
|
# Listing / info
|
|
# ------------------------------------------------------------------
|
|
|
|
def list_quarantined(self) -> List[Dict[str, Any]]:
|
|
"""Return a summary list of all quarantined items."""
|
|
items: List[Dict[str, Any]] = []
|
|
for meta_file in sorted(self.vault_dir.glob("*.json")):
|
|
try:
|
|
meta = json.loads(meta_file.read_text())
|
|
items.append({
|
|
"id": meta.get("id", meta_file.stem),
|
|
"original_path": meta.get("original_path", "?"),
|
|
"threat_name": meta.get("threat_name", "?"),
|
|
"quarantine_date": meta.get("quarantine_date", "?"),
|
|
"size": meta.get("file_size", 0),
|
|
})
|
|
except (json.JSONDecodeError, OSError):
|
|
continue
|
|
return items
|
|
|
|
def get_info(self, quarantine_id: str) -> Dict[str, Any]:
|
|
"""Return full metadata for a quarantine entry.
|
|
|
|
Raises ``FileNotFoundError`` if the ID is unknown.
|
|
"""
|
|
qid = self._validate_qid(quarantine_id)
|
|
return self._load_meta(qid)
|
|
|
|
def count(self) -> int:
|
|
"""Number of items currently in the vault."""
|
|
return len(list(self.vault_dir.glob("*.json")))
|
|
|
|
# ------------------------------------------------------------------
|
|
# Maintenance
|
|
# ------------------------------------------------------------------
|
|
|
|
def clean_old(self, days: int = 30) -> int:
|
|
"""Delete quarantine entries older than *days*.
|
|
|
|
Returns the number of entries removed.
|
|
"""
|
|
cutoff = datetime.utcnow() - timedelta(days=days)
|
|
removed = 0
|
|
|
|
for meta_file in self.vault_dir.glob("*.json"):
|
|
try:
|
|
meta = json.loads(meta_file.read_text())
|
|
qdate = datetime.fromisoformat(meta.get("quarantine_date", ""))
|
|
if qdate < cutoff:
|
|
qid = meta.get("id", meta_file.stem)
|
|
self.delete_file(qid)
|
|
removed += 1
|
|
except (json.JSONDecodeError, ValueError, OSError):
|
|
continue
|
|
|
|
if removed:
|
|
logger.info("Cleaned %d quarantine entries older than %d days", removed, days)
|
|
return removed
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal
|
|
# ------------------------------------------------------------------
|
|
|
|
def _load_meta(self, quarantine_id: str) -> Dict[str, Any]:
|
|
qid = self._validate_qid(quarantine_id)
|
|
meta_path = self.vault_dir / f"{qid}.json"
|
|
if not meta_path.exists():
|
|
raise FileNotFoundError(f"Quarantine metadata not found: {qid}")
|
|
return json.loads(meta_path.read_text())
|