115 lines
3.5 KiB
Python
115 lines
3.5 KiB
Python
"""VirusShare feed for AYN Antivirus.
|
|
|
|
Downloads MD5 hash lists from VirusShare.com — one of the largest
|
|
free malware hash databases. Each list contains 65,536 MD5 hashes
|
|
of known malware samples (.exe, .dll, .rar, .doc, .pdf, .app, etc).
|
|
|
|
https://virusshare.com/hashes
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import requests
|
|
|
|
from ayn_antivirus.signatures.feeds.base_feed import BaseFeed
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_BASE_URL = "https://virusshare.com/hashfiles/VirusShare_{:05d}.md5"
|
|
_TIMEOUT = 30
|
|
_STATE_FILE = "/var/lib/ayn-antivirus/.virusshare_last"
|
|
|
|
|
|
class VirusShareFeed(BaseFeed):
|
|
"""Fetch malware MD5 hashes from VirusShare.
|
|
|
|
Tracks the last downloaded list number so incremental updates
|
|
only fetch new lists.
|
|
"""
|
|
|
|
def __init__(self, **kwargs: Any) -> None:
|
|
super().__init__(**kwargs)
|
|
self._last_list = self._load_state()
|
|
|
|
def get_name(self) -> str:
|
|
return "virusshare"
|
|
|
|
def fetch(self) -> List[Dict[str, Any]]:
|
|
"""Fetch new hash lists since last update."""
|
|
return self.fetch_new_lists(max_lists=3)
|
|
|
|
def fetch_new_lists(self, max_lists: int = 3) -> List[Dict[str, Any]]:
|
|
"""Download up to max_lists new VirusShare hash files."""
|
|
results: List[Dict[str, Any]] = []
|
|
start = self._last_list + 1
|
|
fetched = 0
|
|
|
|
for i in range(start, start + max_lists):
|
|
self._rate_limit_wait()
|
|
url = _BASE_URL.format(i)
|
|
self._log("Fetching VirusShare_%05d", i)
|
|
|
|
try:
|
|
resp = requests.get(url, timeout=_TIMEOUT)
|
|
if resp.status_code == 404:
|
|
self._log("VirusShare_%05d not found — at latest", i)
|
|
break
|
|
resp.raise_for_status()
|
|
except requests.RequestException as exc:
|
|
self._error("Failed to fetch list %d: %s", i, exc)
|
|
break
|
|
|
|
hashes = [
|
|
line.strip()
|
|
for line in resp.text.splitlines()
|
|
if line.strip() and not line.startswith("#") and len(line.strip()) == 32
|
|
]
|
|
|
|
for h in hashes:
|
|
results.append({
|
|
"hash": h.lower(),
|
|
"threat_name": "Malware.VirusShare",
|
|
"threat_type": "MALWARE",
|
|
"severity": "HIGH",
|
|
"source": "virusshare",
|
|
"details": f"md5,list={i:05d}",
|
|
})
|
|
|
|
self._last_list = i
|
|
self._save_state(i)
|
|
fetched += 1
|
|
self._log("VirusShare_%05d: %d hashes", i, len(hashes))
|
|
|
|
self._log("Fetched %d list(s), %d total hashes", fetched, len(results))
|
|
if results:
|
|
self._mark_updated()
|
|
return results
|
|
|
|
def fetch_initial(self, start_list: int = 470, count: int = 11) -> List[Dict[str, Any]]:
|
|
"""Bulk download for initial setup."""
|
|
old = self._last_list
|
|
self._last_list = start_list - 1
|
|
results = self.fetch_new_lists(max_lists=count)
|
|
if not results:
|
|
self._last_list = old
|
|
return results
|
|
|
|
@staticmethod
|
|
def _load_state() -> int:
|
|
try:
|
|
return int(Path(_STATE_FILE).read_text().strip())
|
|
except Exception:
|
|
return 480 # Default: start after list 480
|
|
|
|
@staticmethod
|
|
def _save_state(n: int) -> None:
|
|
try:
|
|
Path(_STATE_FILE).write_text(str(n))
|
|
except Exception:
|
|
pass
|