"""Emerging Threats (ET Open) feed for AYN Antivirus. Parses community Suricata / Snort rules from Proofpoint's ET Open project to extract IOCs (IP addresses and domains) referenced in active detection rules. Source: https://rules.emergingthreats.net/open/suricata/rules/ """ from __future__ import annotations import logging import re from typing import Any, Dict, List, Set import requests from ayn_antivirus.signatures.feeds.base_feed import BaseFeed logger = logging.getLogger(__name__) # We focus on the compromised-IP and C2 rule files. _RULE_URLS = [ "https://rules.emergingthreats.net/open/suricata/rules/compromised-ips.txt", "https://rules.emergingthreats.net/open/suricata/rules/botcc.rules", "https://rules.emergingthreats.net/open/suricata/rules/ciarmy.rules", "https://rules.emergingthreats.net/open/suricata/rules/emerging-malware.rules", ] _TIMEOUT = 30 # Regex patterns to extract IPs and domains from rule bodies. _RE_IPV4 = re.compile(r"\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b") _RE_DOMAIN = re.compile( r'content:"([a-zA-Z0-9](?:[a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?' r'(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*' r'\.[a-zA-Z]{2,})"' ) # Private / non-routable ranges to exclude from IP results. _PRIVATE_PREFIXES = ( "10.", "127.", "172.16.", "172.17.", "172.18.", "172.19.", "172.20.", "172.21.", "172.22.", "172.23.", "172.24.", "172.25.", "172.26.", "172.27.", "172.28.", "172.29.", "172.30.", "172.31.", "192.168.", "0.", "255.", "224.", ) class EmergingThreatsFeed(BaseFeed): """Parse ET Open rule files to extract malicious IPs and domains.""" def get_name(self) -> str: return "emergingthreats" def fetch(self) -> List[Dict[str, Any]]: """Download and parse ET Open rules, returning IOC dicts. Each dict has: ``ioc_type`` (``"ip"`` or ``"domain"``), ``value``, ``threat_name``, ``type``, ``source``. """ self._log("Downloading ET Open rule files") all_ips: Set[str] = set() all_domains: Set[str] = set() for url in _RULE_URLS: self._rate_limit_wait() try: resp = requests.get(url, timeout=_TIMEOUT) resp.raise_for_status() text = resp.text except requests.RequestException as exc: self._warn("Failed to fetch %s: %s", url, exc) continue # Extract IPs. if url.endswith(".txt"): # Plain text IP list (one per line). for line in text.splitlines(): line = line.strip() if not line or line.startswith("#"): continue match = _RE_IPV4.match(line) if match: ip = match.group(1) if not ip.startswith(_PRIVATE_PREFIXES): all_ips.add(ip) else: # Suricata rule file — extract IPs from rule body. for ip_match in _RE_IPV4.finditer(text): ip = ip_match.group(1) if not ip.startswith(_PRIVATE_PREFIXES): all_ips.add(ip) # Extract domains from content matches. for domain_match in _RE_DOMAIN.finditer(text): domain = domain_match.group(1).lower() # Filter out very short or generic patterns. if "." in domain and len(domain) > 4: all_domains.add(domain) # Build result list. results: List[Dict[str, Any]] = [] for ip in all_ips: results.append({ "ioc_type": "ip", "value": ip, "threat_name": "ET.Compromised", "type": "C2", "source": "emergingthreats", "details": "IP from Emerging Threats ET Open rules", }) for domain in all_domains: results.append({ "ioc_type": "domain", "value": domain, "threat_name": "ET.MaliciousDomain", "type": "C2", "source": "emergingthreats", "details": "Domain extracted from ET Open Suricata rules", }) self._log("Extracted %d IP(s) and %d domain(s)", len(all_ips), len(all_domains)) self._mark_updated() return results