259 lines
8.1 KiB
Python
259 lines
8.1 KiB
Python
"""File-system scanner for AYN Antivirus.
|
|
|
|
Walks directories, gathers file metadata, hashes files, and classifies
|
|
them by type (ELF binary, script, suspicious extension) so that downstream
|
|
detectors can focus on high-value targets.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import grp
|
|
import logging
|
|
import os
|
|
import pwd
|
|
import stat
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Generator, List, Optional
|
|
|
|
from ayn_antivirus.constants import (
|
|
MAX_FILE_SIZE,
|
|
SUSPICIOUS_EXTENSIONS,
|
|
)
|
|
from ayn_antivirus.scanners.base import BaseScanner
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Well-known magic bytes
|
|
# ---------------------------------------------------------------------------
|
|
_ELF_MAGIC = b"\x7fELF"
|
|
_SCRIPT_SHEBANGS = (b"#!", b"#!/")
|
|
_PE_MAGIC = b"MZ"
|
|
|
|
|
|
class FileScanner(BaseScanner):
|
|
"""Enumerates, classifies, and hashes files on disk.
|
|
|
|
This scanner does **not** perform threat detection itself — it prepares
|
|
the metadata that detectors (YARA, hash-lookup, heuristic) consume.
|
|
|
|
Parameters
|
|
----------
|
|
max_file_size:
|
|
Skip files larger than this (bytes). Defaults to
|
|
:pydata:`constants.MAX_FILE_SIZE`.
|
|
"""
|
|
|
|
def __init__(self, max_file_size: int = MAX_FILE_SIZE) -> None:
|
|
self.max_file_size = max_file_size
|
|
|
|
# ------------------------------------------------------------------
|
|
# BaseScanner interface
|
|
# ------------------------------------------------------------------
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "file_scanner"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return "Enumerates and classifies files on disk"
|
|
|
|
def scan(self, target: Any) -> Dict[str, Any]:
|
|
"""Scan a single file and return its metadata + hash.
|
|
|
|
Parameters
|
|
----------
|
|
target:
|
|
A path (``str`` or ``Path``) to the file.
|
|
|
|
Returns
|
|
-------
|
|
dict
|
|
Keys: ``path``, ``size``, ``hash``, ``is_elf``, ``is_script``,
|
|
``suspicious_ext``, ``info``, ``header``, ``error``.
|
|
"""
|
|
filepath = Path(target)
|
|
result: Dict[str, Any] = {
|
|
"path": str(filepath),
|
|
"size": 0,
|
|
"hash": "",
|
|
"is_elf": False,
|
|
"is_script": False,
|
|
"suspicious_ext": False,
|
|
"info": {},
|
|
"header": b"",
|
|
"error": None,
|
|
}
|
|
|
|
try:
|
|
info = self.get_file_info(filepath)
|
|
result["info"] = info
|
|
result["size"] = info.get("size", 0)
|
|
except OSError as exc:
|
|
result["error"] = str(exc)
|
|
return result
|
|
|
|
if result["size"] > self.max_file_size:
|
|
result["error"] = f"Exceeds max size ({result['size']} > {self.max_file_size})"
|
|
return result
|
|
|
|
try:
|
|
result["hash"] = self.compute_hash(filepath)
|
|
except OSError as exc:
|
|
result["error"] = f"Hash failed: {exc}"
|
|
return result
|
|
|
|
try:
|
|
result["header"] = self.read_file_header(filepath)
|
|
except OSError:
|
|
pass # non-fatal
|
|
|
|
result["is_elf"] = self.is_elf_binary(filepath)
|
|
result["is_script"] = self.is_script(filepath)
|
|
result["suspicious_ext"] = self.is_suspicious_extension(filepath)
|
|
|
|
return result
|
|
|
|
# ------------------------------------------------------------------
|
|
# Directory walking
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def walk_directory(
|
|
path: str | Path,
|
|
recursive: bool = True,
|
|
exclude_patterns: Optional[List[str]] = None,
|
|
) -> Generator[Path, None, None]:
|
|
"""Yield every regular file under *path*.
|
|
|
|
Parameters
|
|
----------
|
|
path:
|
|
Root directory to walk.
|
|
recursive:
|
|
If ``False``, only yield files in the top-level directory.
|
|
exclude_patterns:
|
|
Path prefixes or glob-style patterns to skip. A file is skipped
|
|
if its absolute path starts with any pattern string.
|
|
"""
|
|
root = Path(path).resolve()
|
|
exclude = [str(Path(p).resolve()) for p in (exclude_patterns or [])]
|
|
|
|
if root.is_file():
|
|
yield root
|
|
return
|
|
|
|
iterator = root.rglob("*") if recursive else root.iterdir()
|
|
try:
|
|
for entry in iterator:
|
|
if not entry.is_file():
|
|
continue
|
|
entry_str = str(entry)
|
|
if any(entry_str.startswith(ex) for ex in exclude):
|
|
continue
|
|
yield entry
|
|
except PermissionError:
|
|
logger.warning("Permission denied walking: %s", root)
|
|
|
|
# ------------------------------------------------------------------
|
|
# File metadata
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def get_file_info(path: str | Path) -> Dict[str, Any]:
|
|
"""Return a metadata dict for the file at *path*.
|
|
|
|
Keys
|
|
----
|
|
size, permissions, permissions_octal, owner, group, modified_time,
|
|
created_time, is_symlink, is_suid, is_sgid.
|
|
|
|
Raises
|
|
------
|
|
OSError
|
|
If the file cannot be stat'd.
|
|
"""
|
|
p = Path(path)
|
|
st = p.stat()
|
|
mode = st.st_mode
|
|
|
|
# Owner / group — fall back gracefully on systems without the user.
|
|
try:
|
|
owner = pwd.getpwuid(st.st_uid).pw_name
|
|
except (KeyError, ImportError):
|
|
owner = str(st.st_uid)
|
|
|
|
try:
|
|
group = grp.getgrgid(st.st_gid).gr_name
|
|
except (KeyError, ImportError):
|
|
group = str(st.st_gid)
|
|
|
|
return {
|
|
"size": st.st_size,
|
|
"permissions": stat.filemode(mode),
|
|
"permissions_octal": oct(mode & 0o7777),
|
|
"owner": owner,
|
|
"group": group,
|
|
"modified_time": datetime.utcfromtimestamp(st.st_mtime).isoformat(),
|
|
"created_time": datetime.utcfromtimestamp(st.st_ctime).isoformat(),
|
|
"is_symlink": p.is_symlink(),
|
|
"is_suid": bool(mode & stat.S_ISUID),
|
|
"is_sgid": bool(mode & stat.S_ISGID),
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Hashing
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def compute_hash(path: str | Path, algorithm: str = "sha256") -> str:
|
|
"""Compute file hash. Delegates to canonical implementation."""
|
|
from ayn_antivirus.utils.helpers import hash_file
|
|
return hash_file(str(path), algo=algorithm)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Header / magic number
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def read_file_header(path: str | Path, size: int = 8192) -> bytes:
|
|
"""Read the first *size* bytes of a file (for magic-number checks).
|
|
|
|
Raises
|
|
------
|
|
OSError
|
|
If the file cannot be opened.
|
|
"""
|
|
with open(path, "rb") as fh:
|
|
return fh.read(size)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Type classification
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def is_elf_binary(path: str | Path) -> bool:
|
|
"""Return ``True`` if *path* begins with the ELF magic number."""
|
|
try:
|
|
with open(path, "rb") as fh:
|
|
return fh.read(4) == _ELF_MAGIC
|
|
except OSError:
|
|
return False
|
|
|
|
@staticmethod
|
|
def is_script(path: str | Path) -> bool:
|
|
"""Return ``True`` if *path* starts with a shebang (``#!``)."""
|
|
try:
|
|
with open(path, "rb") as fh:
|
|
head = fh.read(3)
|
|
return any(head.startswith(s) for s in _SCRIPT_SHEBANGS)
|
|
except OSError:
|
|
return False
|
|
|
|
@staticmethod
|
|
def is_suspicious_extension(path: str | Path) -> bool:
|
|
"""Return ``True`` if the file suffix is in :pydata:`SUSPICIOUS_EXTENSIONS`."""
|
|
return Path(path).suffix.lower() in SUSPICIOUS_EXTENSIONS
|