"""File-system scanner for AYN Antivirus. Walks directories, gathers file metadata, hashes files, and classifies them by type (ELF binary, script, suspicious extension) so that downstream detectors can focus on high-value targets. """ from __future__ import annotations import grp import logging import os import pwd import stat from datetime import datetime from pathlib import Path from typing import Any, Dict, Generator, List, Optional from ayn_antivirus.constants import ( MAX_FILE_SIZE, SUSPICIOUS_EXTENSIONS, ) from ayn_antivirus.scanners.base import BaseScanner logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Well-known magic bytes # --------------------------------------------------------------------------- _ELF_MAGIC = b"\x7fELF" _SCRIPT_SHEBANGS = (b"#!", b"#!/") _PE_MAGIC = b"MZ" class FileScanner(BaseScanner): """Enumerates, classifies, and hashes files on disk. This scanner does **not** perform threat detection itself — it prepares the metadata that detectors (YARA, hash-lookup, heuristic) consume. Parameters ---------- max_file_size: Skip files larger than this (bytes). Defaults to :pydata:`constants.MAX_FILE_SIZE`. """ def __init__(self, max_file_size: int = MAX_FILE_SIZE) -> None: self.max_file_size = max_file_size # ------------------------------------------------------------------ # BaseScanner interface # ------------------------------------------------------------------ @property def name(self) -> str: return "file_scanner" @property def description(self) -> str: return "Enumerates and classifies files on disk" def scan(self, target: Any) -> Dict[str, Any]: """Scan a single file and return its metadata + hash. Parameters ---------- target: A path (``str`` or ``Path``) to the file. Returns ------- dict Keys: ``path``, ``size``, ``hash``, ``is_elf``, ``is_script``, ``suspicious_ext``, ``info``, ``header``, ``error``. """ filepath = Path(target) result: Dict[str, Any] = { "path": str(filepath), "size": 0, "hash": "", "is_elf": False, "is_script": False, "suspicious_ext": False, "info": {}, "header": b"", "error": None, } try: info = self.get_file_info(filepath) result["info"] = info result["size"] = info.get("size", 0) except OSError as exc: result["error"] = str(exc) return result if result["size"] > self.max_file_size: result["error"] = f"Exceeds max size ({result['size']} > {self.max_file_size})" return result try: result["hash"] = self.compute_hash(filepath) except OSError as exc: result["error"] = f"Hash failed: {exc}" return result try: result["header"] = self.read_file_header(filepath) except OSError: pass # non-fatal result["is_elf"] = self.is_elf_binary(filepath) result["is_script"] = self.is_script(filepath) result["suspicious_ext"] = self.is_suspicious_extension(filepath) return result # ------------------------------------------------------------------ # Directory walking # ------------------------------------------------------------------ @staticmethod def walk_directory( path: str | Path, recursive: bool = True, exclude_patterns: Optional[List[str]] = None, ) -> Generator[Path, None, None]: """Yield every regular file under *path*. Parameters ---------- path: Root directory to walk. recursive: If ``False``, only yield files in the top-level directory. exclude_patterns: Path prefixes or glob-style patterns to skip. A file is skipped if its absolute path starts with any pattern string. """ root = Path(path).resolve() exclude = [str(Path(p).resolve()) for p in (exclude_patterns or [])] if root.is_file(): yield root return iterator = root.rglob("*") if recursive else root.iterdir() try: for entry in iterator: if not entry.is_file(): continue entry_str = str(entry) if any(entry_str.startswith(ex) for ex in exclude): continue yield entry except PermissionError: logger.warning("Permission denied walking: %s", root) # ------------------------------------------------------------------ # File metadata # ------------------------------------------------------------------ @staticmethod def get_file_info(path: str | Path) -> Dict[str, Any]: """Return a metadata dict for the file at *path*. Keys ---- size, permissions, permissions_octal, owner, group, modified_time, created_time, is_symlink, is_suid, is_sgid. Raises ------ OSError If the file cannot be stat'd. """ p = Path(path) st = p.stat() mode = st.st_mode # Owner / group — fall back gracefully on systems without the user. try: owner = pwd.getpwuid(st.st_uid).pw_name except (KeyError, ImportError): owner = str(st.st_uid) try: group = grp.getgrgid(st.st_gid).gr_name except (KeyError, ImportError): group = str(st.st_gid) return { "size": st.st_size, "permissions": stat.filemode(mode), "permissions_octal": oct(mode & 0o7777), "owner": owner, "group": group, "modified_time": datetime.utcfromtimestamp(st.st_mtime).isoformat(), "created_time": datetime.utcfromtimestamp(st.st_ctime).isoformat(), "is_symlink": p.is_symlink(), "is_suid": bool(mode & stat.S_ISUID), "is_sgid": bool(mode & stat.S_ISGID), } # ------------------------------------------------------------------ # Hashing # ------------------------------------------------------------------ @staticmethod def compute_hash(path: str | Path, algorithm: str = "sha256") -> str: """Compute file hash. Delegates to canonical implementation.""" from ayn_antivirus.utils.helpers import hash_file return hash_file(str(path), algo=algorithm) # ------------------------------------------------------------------ # Header / magic number # ------------------------------------------------------------------ @staticmethod def read_file_header(path: str | Path, size: int = 8192) -> bytes: """Read the first *size* bytes of a file (for magic-number checks). Raises ------ OSError If the file cannot be opened. """ with open(path, "rb") as fh: return fh.read(size) # ------------------------------------------------------------------ # Type classification # ------------------------------------------------------------------ @staticmethod def is_elf_binary(path: str | Path) -> bool: """Return ``True`` if *path* begins with the ELF magic number.""" try: with open(path, "rb") as fh: return fh.read(4) == _ELF_MAGIC except OSError: return False @staticmethod def is_script(path: str | Path) -> bool: """Return ``True`` if *path* starts with a shebang (``#!``).""" try: with open(path, "rb") as fh: head = fh.read(3) return any(head.startswith(s) for s in _SCRIPT_SHEBANGS) except OSError: return False @staticmethod def is_suspicious_extension(path: str | Path) -> bool: """Return ``True`` if the file suffix is in :pydata:`SUSPICIOUS_EXTENSIONS`.""" return Path(path).suffix.lower() in SUSPICIOUS_EXTENSIONS