ÿØÿà JFIF ÿþ; %PDF-1.5 %���� ºaâÚÎΞ-ÌE1ÍØÄ÷{òò2ÿ ÛÖ^ÔÀá TÎ{¦?§®¥kuµùÕ5sLOšuY
Server IP : 157.90.209.209 / Your IP : 216.73.216.148 [ Web Server : Apache System : Linux hcomm124.dns-wk.info 4.18.0-553.64.1.el8_10.x86_64 #1 SMP Mon Jul 28 12:01:56 EDT 2025 x86_64 User : evidenciarevista ( 1049) PHP Version : 7.2.34 Disable Function : exec,passthru,shell_exec,system Domains : 216 Domains MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/subsys/ |
Upload File : |
"""Send events via Notification service""" import asyncio import base64 import json SOCKET_PATH = "/opt/imunify360/lib/event.sock" SOCKET_TIMEOUT = 10.0 # seconds _LEN_BYTES = 4 _MAX_SIZE = 1024 * 1024 CONFIG_UPDATED_EVENT_ID = "CONFIG_UPDATED" USER_SCAN_STARTED_EVENT_ID = "USER_SCAN_STARTED" USER_SCAN_FINISHED_EVENT_ID = "USER_SCAN_FINISHED" USER_SCAN_MALWARE_FOUND_EVENT_ID = "USER_SCAN_MALWARE_FOUND" CUSTOM_SCAN_STARTED_EVENT_ID = "CUSTOM_SCAN_STARTED" CUSTOM_SCAN_FINISHED_EVENT_ID = "CUSTOM_SCAN_FINISHED" CUSTOM_SCAN_MALWARE_FOUND_EVENT_ID = "CUSTOM_SCAN_MALWARE_FOUND" SCRIPT_BLOCKED_EVENT_ID = "SCRIPT_BLOCKED" def _prepare_event(event_id: str, user: str, body: dict) -> bytes: event = json.dumps( { "event_id": event_id, "user": user, "body": base64.b64encode(json.dumps(body).encode("utf-8")).decode( "utf-8" ), } ) binary = event.encode("utf-8") if len(binary) > _MAX_SIZE: raise Exception( "message size {} exceeds limit of {}".format( len(binary), _MAX_SIZE ) ) return len(binary).to_bytes(_LEN_BYTES, byteorder="big") + binary async def _send_event(event: bytes) -> None: _, writer = await asyncio.open_unix_connection(SOCKET_PATH) try: writer.write(event) await writer.drain() finally: writer.close() async def trigger_event(event_id: str, user: str, body: dict) -> None: """Send an event with given event_id and user, having given body.""" event = _prepare_event(event_id, user, body) await asyncio.wait_for(_send_event(event), SOCKET_TIMEOUT) async def config_updated() -> None: """Send CONFIG_UPDATED event. This forces imunify-notifier to reread its config.""" await trigger_event(CONFIG_UPDATED_EVENT_ID, "", {})