Files
calvana/ayn-antivirus/ayn_antivirus/signatures/feeds/virusshare.py

115 lines
3.5 KiB
Python

"""VirusShare feed for AYN Antivirus.
Downloads MD5 hash lists from VirusShare.com — one of the largest
free malware hash databases. Each list contains 65,536 MD5 hashes
of known malware samples (.exe, .dll, .rar, .doc, .pdf, .app, etc).
https://virusshare.com/hashes
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
import requests
from ayn_antivirus.signatures.feeds.base_feed import BaseFeed
logger = logging.getLogger(__name__)
_BASE_URL = "https://virusshare.com/hashfiles/VirusShare_{:05d}.md5"
_TIMEOUT = 30
_STATE_FILE = "/var/lib/ayn-antivirus/.virusshare_last"
class VirusShareFeed(BaseFeed):
"""Fetch malware MD5 hashes from VirusShare.
Tracks the last downloaded list number so incremental updates
only fetch new lists.
"""
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._last_list = self._load_state()
def get_name(self) -> str:
return "virusshare"
def fetch(self) -> List[Dict[str, Any]]:
"""Fetch new hash lists since last update."""
return self.fetch_new_lists(max_lists=3)
def fetch_new_lists(self, max_lists: int = 3) -> List[Dict[str, Any]]:
"""Download up to max_lists new VirusShare hash files."""
results: List[Dict[str, Any]] = []
start = self._last_list + 1
fetched = 0
for i in range(start, start + max_lists):
self._rate_limit_wait()
url = _BASE_URL.format(i)
self._log("Fetching VirusShare_%05d", i)
try:
resp = requests.get(url, timeout=_TIMEOUT)
if resp.status_code == 404:
self._log("VirusShare_%05d not found — at latest", i)
break
resp.raise_for_status()
except requests.RequestException as exc:
self._error("Failed to fetch list %d: %s", i, exc)
break
hashes = [
line.strip()
for line in resp.text.splitlines()
if line.strip() and not line.startswith("#") and len(line.strip()) == 32
]
for h in hashes:
results.append({
"hash": h.lower(),
"threat_name": "Malware.VirusShare",
"threat_type": "MALWARE",
"severity": "HIGH",
"source": "virusshare",
"details": f"md5,list={i:05d}",
})
self._last_list = i
self._save_state(i)
fetched += 1
self._log("VirusShare_%05d: %d hashes", i, len(hashes))
self._log("Fetched %d list(s), %d total hashes", fetched, len(results))
if results:
self._mark_updated()
return results
def fetch_initial(self, start_list: int = 470, count: int = 11) -> List[Dict[str, Any]]:
"""Bulk download for initial setup."""
old = self._last_list
self._last_list = start_list - 1
results = self.fetch_new_lists(max_lists=count)
if not results:
self._last_list = old
return results
@staticmethod
def _load_state() -> int:
try:
return int(Path(_STATE_FILE).read_text().strip())
except Exception:
return 480 # Default: start after list 480
@staticmethod
def _save_state(n: int) -> None:
try:
Path(_STATE_FILE).write_text(str(n))
except Exception:
pass