"""MalwareBazaar feed for AYN Antivirus. Fetches recent malware sample hashes from the abuse.ch MalwareBazaar CSV export (free, no API key required). CSV export: https://bazaar.abuse.ch/export/ """ from __future__ import annotations import csv import io import logging from typing import Any, Dict, List, Optional import requests from ayn_antivirus.signatures.feeds.base_feed import BaseFeed logger = logging.getLogger(__name__) _CSV_RECENT_URL = "https://bazaar.abuse.ch/export/csv/recent/" _CSV_FULL_URL = "https://bazaar.abuse.ch/export/csv/full/" _API_URL = "https://mb-api.abuse.ch/api/v1/" _TIMEOUT = 60 class MalwareBazaarFeed(BaseFeed): """Fetch malware SHA-256 hashes from MalwareBazaar. Uses the free CSV export by default. Falls back to JSON API if an api_key is provided. """ def __init__(self, api_key: Optional[str] = None, **kwargs: Any) -> None: super().__init__(**kwargs) self.api_key = api_key def get_name(self) -> str: return "malwarebazaar" def fetch(self) -> List[Dict[str, Any]]: """Fetch recent malware hashes from CSV export.""" return self._fetch_csv(_CSV_RECENT_URL) def fetch_recent(self, hours: int = 24) -> List[Dict[str, Any]]: """Fetch recent samples. CSV export returns last ~1000 samples.""" return self._fetch_csv(_CSV_RECENT_URL) def _fetch_csv(self, url: str) -> List[Dict[str, Any]]: """Download and parse the MalwareBazaar CSV export.""" self._rate_limit_wait() self._log("Fetching hashes from %s", url) try: resp = requests.get(url, timeout=_TIMEOUT) resp.raise_for_status() except requests.RequestException as exc: self._error("CSV download failed: %s", exc) return [] results: List[Dict[str, Any]] = [] lines = [ line for line in resp.text.splitlines() if line.strip() and not line.startswith("#") ] reader = csv.reader(io.StringIO("\n".join(lines))) for row in reader: if len(row) < 8: continue # CSV columns: # 0: first_seen, 1: sha256, 2: md5, 3: sha1, # 4: reporter, 5: filename, 6: file_type, 7: mime_type, # 8+: signature, ... sha256 = row[1].strip().strip('"') if not sha256 or len(sha256) != 64: continue filename = row[5].strip().strip('"') if len(row) > 5 else "" file_type = row[6].strip().strip('"') if len(row) > 6 else "" signature = row[8].strip().strip('"') if len(row) > 8 else "" reporter = row[4].strip().strip('"') if len(row) > 4 else "" threat_name = ( signature if signature and signature not in ("null", "n/a", "None", "") else f"Malware.{_map_type_name(file_type)}" ) results.append({ "hash": sha256.lower(), "threat_name": threat_name, "threat_type": _map_type(file_type), "severity": "HIGH", "source": "malwarebazaar", "details": ( f"file={filename}, type={file_type}, reporter={reporter}" ), }) self._log("Parsed %d hash signature(s) from CSV", len(results)) self._mark_updated() return results def fetch_by_tag(self, tag: str) -> List[Dict[str, Any]]: """Fetch samples by tag (requires API key, falls back to empty).""" if not self.api_key: self._warn("fetch_by_tag requires API key") return [] self._rate_limit_wait() payload = {"query": "get_taginfo", "tag": tag, "limit": 100} if self.api_key: payload["api_key"] = self.api_key try: resp = requests.post(_API_URL, data=payload, timeout=_TIMEOUT) resp.raise_for_status() data = resp.json() except requests.RequestException as exc: self._error("API request failed: %s", exc) return [] if data.get("query_status") != "ok": return [] results = [] for entry in data.get("data", []): sha256 = entry.get("sha256_hash", "") if not sha256: continue results.append({ "hash": sha256.lower(), "threat_name": entry.get("signature") or f"Malware.{tag}", "threat_type": _map_type(entry.get("file_type", "")), "severity": "HIGH", "source": "malwarebazaar", "details": f"tag={tag}, file_type={entry.get('file_type', '')}", }) self._mark_updated() return results def _map_type(file_type: str) -> str: ft = file_type.lower() if any(x in ft for x in ("exe", "dll", "elf", "pe32")): return "MALWARE" if any(x in ft for x in ("doc", "xls", "pdf", "rtf")): return "MALWARE" if any(x in ft for x in ("script", "js", "vbs", "ps1", "bat", "sh")): return "MALWARE" return "MALWARE" def _map_type_name(file_type: str) -> str: """Map file type to a readable threat name suffix.""" ft = file_type.lower().strip() m = { "exe": "Win32.Executable", "dll": "Win32.DLL", "msi": "Win32.Installer", "elf": "Linux.ELF", "so": "Linux.SharedLib", "doc": "Office.Document", "docx": "Office.Document", "xls": "Office.Spreadsheet", "xlsx": "Office.Spreadsheet", "pdf": "PDF.Document", "rtf": "Office.RTF", "js": "Script.JavaScript", "vbs": "Script.VBScript", "ps1": "Script.PowerShell", "bat": "Script.Batch", "sh": "Script.Shell", "py": "Script.Python", "apk": "Android.APK", "ipa": "iOS.IPA", "app": "macOS.App", "pkg": "macOS.Pkg", "dmg": "macOS.DMG", "rar": "Archive.RAR", "zip": "Archive.ZIP", "7z": "Archive.7Z", "tar": "Archive.TAR", "gz": "Archive.GZ", "iso": "DiskImage.ISO", "img": "DiskImage.IMG", } return m.get(ft, "Generic")