0x1998 - MANAGER
Düzenlenen Dosya: export_wblist.py
import asyncio import logging import os from pathlib import Path from defence360agent.utils import ensure_line_in_file_bytes from im360 import files from im360.contracts.plugins import IDSAwareMessageSink from im360.internals import strategy from im360.simple_rpc.resident_socket import send_to_socket from im360.utils.lazy_init import RULES_CHECK_IN_PROGRESS from im360.subsys import csf logger = logging.getLogger(__name__) class ExportWBList(IDSAwareMessageSink): STRATEGY = strategy.Strategy.CSF_COOP_STRATEGY AVAILABLE_ON_FREEMIUM = False async def create_sink(self, loop): self._loop = loop def _determine_csf_post_hook_script(self): """ Determine which CSF post-hook script to use based on priority rules: - If /usr/local/csf/bin/csfpost.sh exists, use it (higher priority) - Otherwise use /etc/csf/csfpost.sh - Preserve any custom content from either script Returns path to the script that should be used. """ if Path(csf.CSF_POST_HOOK_SCRIPT_USR_LOCAL).exists(): logger.info( "Using /usr/local/csf/bin/csfpost.sh as post-hook script" ) return csf.CSF_POST_HOOK_SCRIPT_USR_LOCAL else: logger.info("Using /etc/csf/csfpost.sh as post-hook script") return csf.CSF_POST_HOOK_SCRIPT_ETC def _preserve_custom_content(self, script_path: str) -> bool: """ Preserve custom content in the CSF post-hook script by ensuring our IPSET_RESTORE_SCRIPT is added without removing other content. Handles upgrade from legacy rules_checker.py to the new ipset_sync.py RPC script. """ script = Path(script_path) script.parent.mkdir(parents=True, exist_ok=True) if not script.exists(): script.write_text( f"#!/bin/sh\n{csf.IPSET_RESTORE_SCRIPT.decode('utf8')}\n" ) script.chmod(0o750) return True content = script.read_bytes() if csf.IPSET_RESTORE_SCRIPT_LEGACY in content: content = content.replace( csf.IPSET_RESTORE_SCRIPT_LEGACY, csf.IPSET_RESTORE_SCRIPT, ) script.write_bytes(content) logger.info( "Replaced legacy ipset restore script in %s", script_path ) return True ipset_line_added = ensure_line_in_file_bytes( script_path, csf.IPSET_RESTORE_SCRIPT, ) if ipset_line_added: logger.info("Added command to restore ipsets to %s", script_path) return ipset_line_added async def activate(self): """ When switching to CSF mode, some critical addresses added to csf allow list and the post-hook script is configured. """ prefix = files.Index.files_path(files.WHITELISTS) ALLOW_LIST = os.path.join(prefix, "imunify360.txt") try: # NOTE: it assumes ascii-based locale encoding/fs (very likely) include_added = ensure_line_in_file_bytes( csf.CSF_ALLOW_FILE, b"Include " + os.fsencode(ALLOW_LIST) ) if include_added: logger.info("Need to restart CSF to include imunify360.txt") script_path = self._determine_csf_post_hook_script() script_updated = self._preserve_custom_content(script_path) # Restart CSF if necessary if include_added or script_updated: logger.info("CSF config was changed, restarting CSF") while RULES_CHECK_IN_PROGRESS.exists(): await asyncio.sleep(1) await csf.restart_all() # on CSF restart we need to recheck rules immediately await send_to_socket( msg={ "method": "RECREATE_RULES", }, wait_for_response=False, ) self._mark_as_active() except asyncio.CancelledError: pass except Exception: logger.exception("Failed to activate %r plugin", self)
geri dön