118 lines
3.7 KiB
Python
118 lines
3.7 KiB
Python
"""Tests for the event bus pub/sub system."""
|
|
import pytest
|
|
|
|
from ayn_antivirus.core.event_bus import EventBus, EventType
|
|
|
|
|
|
def test_subscribe_and_publish():
|
|
bus = EventBus()
|
|
received = []
|
|
bus.subscribe(EventType.THREAT_FOUND, lambda et, data: received.append(data))
|
|
bus.publish(EventType.THREAT_FOUND, {"test": True})
|
|
assert len(received) == 1
|
|
assert received[0]["test"] is True
|
|
|
|
|
|
def test_multiple_subscribers():
|
|
bus = EventBus()
|
|
r1, r2 = [], []
|
|
bus.subscribe(EventType.SCAN_STARTED, lambda et, d: r1.append(d))
|
|
bus.subscribe(EventType.SCAN_STARTED, lambda et, d: r2.append(d))
|
|
bus.publish(EventType.SCAN_STARTED, "go")
|
|
assert len(r1) == 1
|
|
assert len(r2) == 1
|
|
|
|
|
|
def test_unsubscribe():
|
|
bus = EventBus()
|
|
received = []
|
|
cb = lambda et, d: received.append(d)
|
|
bus.subscribe(EventType.FILE_SCANNED, cb)
|
|
bus.unsubscribe(EventType.FILE_SCANNED, cb)
|
|
bus.publish(EventType.FILE_SCANNED, "data")
|
|
assert len(received) == 0
|
|
|
|
|
|
def test_unsubscribe_nonexistent():
|
|
"""Unsubscribing a callback that was never registered should not crash."""
|
|
bus = EventBus()
|
|
bus.unsubscribe(EventType.FILE_SCANNED, lambda et, d: None)
|
|
|
|
|
|
def test_publish_no_subscribers():
|
|
"""Publishing with no subscribers should not crash."""
|
|
bus = EventBus()
|
|
bus.publish(EventType.SCAN_COMPLETED, "no crash")
|
|
|
|
|
|
def test_subscriber_exception_isolated():
|
|
"""A failing subscriber must not prevent other subscribers from running."""
|
|
bus = EventBus()
|
|
received = []
|
|
bus.subscribe(EventType.THREAT_FOUND, lambda et, d: 1 / 0) # will raise
|
|
bus.subscribe(EventType.THREAT_FOUND, lambda et, d: received.append(d))
|
|
bus.publish(EventType.THREAT_FOUND, "data")
|
|
assert len(received) == 1
|
|
|
|
|
|
def test_all_event_types():
|
|
"""Every EventType value can be published without error."""
|
|
bus = EventBus()
|
|
for et in EventType:
|
|
bus.publish(et, None)
|
|
|
|
|
|
def test_clear_all():
|
|
bus = EventBus()
|
|
received = []
|
|
bus.subscribe(EventType.THREAT_FOUND, lambda et, d: received.append(d))
|
|
bus.subscribe(EventType.SCAN_STARTED, lambda et, d: received.append(d))
|
|
bus.clear()
|
|
bus.publish(EventType.THREAT_FOUND, "a")
|
|
bus.publish(EventType.SCAN_STARTED, "b")
|
|
assert len(received) == 0
|
|
|
|
|
|
def test_clear_single_event():
|
|
bus = EventBus()
|
|
r1, r2 = [], []
|
|
bus.subscribe(EventType.THREAT_FOUND, lambda et, d: r1.append(d))
|
|
bus.subscribe(EventType.SCAN_STARTED, lambda et, d: r2.append(d))
|
|
bus.clear(EventType.THREAT_FOUND)
|
|
bus.publish(EventType.THREAT_FOUND, "a")
|
|
bus.publish(EventType.SCAN_STARTED, "b")
|
|
assert len(r1) == 0 # cleared
|
|
assert len(r2) == 1 # still active
|
|
|
|
|
|
def test_callback_receives_event_type():
|
|
"""Callback receives (event_type, data) — verify event_type is correct."""
|
|
bus = EventBus()
|
|
calls = []
|
|
bus.subscribe(EventType.QUARANTINE_ACTION, lambda et, d: calls.append((et, d)))
|
|
bus.publish(EventType.QUARANTINE_ACTION, "payload")
|
|
assert calls[0][0] is EventType.QUARANTINE_ACTION
|
|
assert calls[0][1] == "payload"
|
|
|
|
|
|
def test_duplicate_subscribe():
|
|
"""Subscribing the same callback twice should only register it once."""
|
|
bus = EventBus()
|
|
received = []
|
|
cb = lambda et, d: received.append(d)
|
|
bus.subscribe(EventType.SCAN_COMPLETED, cb)
|
|
bus.subscribe(EventType.SCAN_COMPLETED, cb)
|
|
bus.publish(EventType.SCAN_COMPLETED, "x")
|
|
assert len(received) == 1
|
|
|
|
|
|
def test_event_type_values():
|
|
"""All expected event types exist."""
|
|
expected = {
|
|
"THREAT_FOUND", "SCAN_STARTED", "SCAN_COMPLETED", "FILE_SCANNED",
|
|
"SIGNATURE_UPDATED", "QUARANTINE_ACTION", "REMEDIATION_ACTION",
|
|
"DASHBOARD_METRIC",
|
|
}
|
|
actual = {et.name for et in EventType}
|
|
assert expected == actual
|