Files
calvana/ayn-antivirus/ayn_antivirus/signatures/feeds/malwarebazaar.py

175 lines
6.0 KiB
Python

"""MalwareBazaar feed for AYN Antivirus.
Fetches recent malware sample hashes from the abuse.ch MalwareBazaar
CSV export (free, no API key required).
CSV export: https://bazaar.abuse.ch/export/
"""
from __future__ import annotations
import csv
import io
import logging
from typing import Any, Dict, List, Optional
import requests
from ayn_antivirus.signatures.feeds.base_feed import BaseFeed
logger = logging.getLogger(__name__)
_CSV_RECENT_URL = "https://bazaar.abuse.ch/export/csv/recent/"
_CSV_FULL_URL = "https://bazaar.abuse.ch/export/csv/full/"
_API_URL = "https://mb-api.abuse.ch/api/v1/"
_TIMEOUT = 60
class MalwareBazaarFeed(BaseFeed):
"""Fetch malware SHA-256 hashes from MalwareBazaar.
Uses the free CSV export by default. Falls back to JSON API
if an api_key is provided.
"""
def __init__(self, api_key: Optional[str] = None, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.api_key = api_key
def get_name(self) -> str:
return "malwarebazaar"
def fetch(self) -> List[Dict[str, Any]]:
"""Fetch recent malware hashes from CSV export."""
return self._fetch_csv(_CSV_RECENT_URL)
def fetch_recent(self, hours: int = 24) -> List[Dict[str, Any]]:
"""Fetch recent samples. CSV export returns last ~1000 samples."""
return self._fetch_csv(_CSV_RECENT_URL)
def _fetch_csv(self, url: str) -> List[Dict[str, Any]]:
"""Download and parse the MalwareBazaar CSV export."""
self._rate_limit_wait()
self._log("Fetching hashes from %s", url)
try:
resp = requests.get(url, timeout=_TIMEOUT)
resp.raise_for_status()
except requests.RequestException as exc:
self._error("CSV download failed: %s", exc)
return []
results: List[Dict[str, Any]] = []
lines = [
line for line in resp.text.splitlines()
if line.strip() and not line.startswith("#")
]
reader = csv.reader(io.StringIO("\n".join(lines)))
for row in reader:
if len(row) < 8:
continue
# CSV columns:
# 0: first_seen, 1: sha256, 2: md5, 3: sha1,
# 4: reporter, 5: filename, 6: file_type, 7: mime_type,
# 8+: signature, ...
sha256 = row[1].strip().strip('"')
if not sha256 or len(sha256) != 64:
continue
filename = row[5].strip().strip('"') if len(row) > 5 else ""
file_type = row[6].strip().strip('"') if len(row) > 6 else ""
signature = row[8].strip().strip('"') if len(row) > 8 else ""
reporter = row[4].strip().strip('"') if len(row) > 4 else ""
threat_name = (
signature
if signature and signature not in ("null", "n/a", "None", "")
else f"Malware.{_map_type_name(file_type)}"
)
results.append({
"hash": sha256.lower(),
"threat_name": threat_name,
"threat_type": _map_type(file_type),
"severity": "HIGH",
"source": "malwarebazaar",
"details": (
f"file={filename}, type={file_type}, reporter={reporter}"
),
})
self._log("Parsed %d hash signature(s) from CSV", len(results))
self._mark_updated()
return results
def fetch_by_tag(self, tag: str) -> List[Dict[str, Any]]:
"""Fetch samples by tag (requires API key, falls back to empty)."""
if not self.api_key:
self._warn("fetch_by_tag requires API key")
return []
self._rate_limit_wait()
payload = {"query": "get_taginfo", "tag": tag, "limit": 100}
if self.api_key:
payload["api_key"] = self.api_key
try:
resp = requests.post(_API_URL, data=payload, timeout=_TIMEOUT)
resp.raise_for_status()
data = resp.json()
except requests.RequestException as exc:
self._error("API request failed: %s", exc)
return []
if data.get("query_status") != "ok":
return []
results = []
for entry in data.get("data", []):
sha256 = entry.get("sha256_hash", "")
if not sha256:
continue
results.append({
"hash": sha256.lower(),
"threat_name": entry.get("signature") or f"Malware.{tag}",
"threat_type": _map_type(entry.get("file_type", "")),
"severity": "HIGH",
"source": "malwarebazaar",
"details": f"tag={tag}, file_type={entry.get('file_type', '')}",
})
self._mark_updated()
return results
def _map_type(file_type: str) -> str:
ft = file_type.lower()
if any(x in ft for x in ("exe", "dll", "elf", "pe32")):
return "MALWARE"
if any(x in ft for x in ("doc", "xls", "pdf", "rtf")):
return "MALWARE"
if any(x in ft for x in ("script", "js", "vbs", "ps1", "bat", "sh")):
return "MALWARE"
return "MALWARE"
def _map_type_name(file_type: str) -> str:
"""Map file type to a readable threat name suffix."""
ft = file_type.lower().strip()
m = {
"exe": "Win32.Executable", "dll": "Win32.DLL", "msi": "Win32.Installer",
"elf": "Linux.ELF", "so": "Linux.SharedLib",
"doc": "Office.Document", "docx": "Office.Document",
"xls": "Office.Spreadsheet", "xlsx": "Office.Spreadsheet",
"pdf": "PDF.Document", "rtf": "Office.RTF",
"js": "Script.JavaScript", "vbs": "Script.VBScript",
"ps1": "Script.PowerShell", "bat": "Script.Batch",
"sh": "Script.Shell", "py": "Script.Python",
"apk": "Android.APK", "ipa": "iOS.IPA",
"app": "macOS.App", "pkg": "macOS.Pkg", "dmg": "macOS.DMG",
"rar": "Archive.RAR", "zip": "Archive.ZIP",
"7z": "Archive.7Z", "tar": "Archive.TAR", "gz": "Archive.GZ",
"iso": "DiskImage.ISO", "img": "DiskImage.IMG",
}
return m.get(ft, "Generic")