0x1998 - MANAGER
Düzenlenen Dosya: __init__.py
import os from functools import lru_cache from typing import AbstractSet, Tuple from defence360agent.utils import subprocess from defence360agent.utils.common import LooseVersion from im360.contracts.config import Firewall, UnifiedAccessLogger from defence360agent.utils.validate import IP, IPVersion from .exe import ( # noqa: F401 _find_executable, get_ip6tables_exe, get_ip6tables_restore_exe, get_iptables_exe, get_iptables_restore_exe, ) from .iptables import Iptables #: Firewall rule definition represented as iptables command-line args RuleDef = Tuple[str, ...] # TypeAlias 3.10+ @lru_cache(maxsize=1) def iptables_version(): cmd = [get_iptables_exe(), "-V"] out = subprocess.check_output(cmd, stderr=subprocess.PIPE) _, vstring = out.decode().split("v") return vstring.strip() @lru_cache() def is_nat_available(ip_version: IPVersion): """ip6tables nat table correctly works only with kernel version >= 3.8 and iptables version >= 1.4.18 https://sector7g.be/posts/ipv6-nat-pre-routing-with-iptables """ if ip_version == IP.V4: return True else: return iptables_version() >= LooseVersion("1.4.18") async def get_firewall(ip_version: IPVersion): return Iptables(version=iptables_version(), ip_version=ip_version) @lru_cache(maxsize=1) def firewall_logging_enabled(): return not os.path.exists(Firewall.LOGGING_DISABLE_FLAG) def rule_logging_enabled(rule_id: str) -> bool: """Return True if *rule_id* is present in the UAL config ``rules`` section. When a rule is absent (e.g. ``im360-whitelist`` is commented out by default), the firewall should not emit NFLOG for that rule type because UAL would receive and immediately discard those packets. Falls back to True (old behaviour) when the config is missing or unreadable so that NFLOG is never silently suppressed by accident. """ try: ual_rules = UnifiedAccessLogger.RULES if ual_rules is None: return True return rule_id in ual_rules except Exception: return True class FirewallRules: # The lower number the higher is the priority. # 0 is the highest priority # priority is used to create rules in correct order HIGHEST_PRIORITY = 0 # Priority for remote-proxy rules REMOTE_PROXY_PRIORITY = 2 # Priority for rule for whitelisted ips with full access FULL_ACCESS_PRIORITY = 4 # Priority for blocked ports rules PORT_PROTO_PRIORITY = 6 # priority for ipset with ip of current host HOST_IPS_PRIORITY = 6 # Common whitelist WHITELIST_PRIORITY = 8 # Black list (both country and user-defined) BLACKLIST_PRIORITY = 10 # Cloud-synced whitelist (below blacklist so clients can override) WHITE_SYNC_PRIORITY = 11 # Static whitelist STATIC_WHITELIST_PRIORITY = 12 # drop.sync DROP_SYNC_PRIORITY = 14 # default priority for rules DEFAULT_PRIORITY = 20 LOWEST_PRIORITY = 30 ACCEPT = "ACCEPT" DROP = "DROP" RETURN = "RETURN" REJECT = "REJECT" REDIRECT = "REDIRECT" LOG = "LOG" FILTER, NAT, MANGLE = "filter", "nat", "mangle" IMUNIFY_INPUT_CHAIN = "INPUT_imunify360" IMUNIFY_OUTPUT_CHAIN = "OUTPUT_imunify360" COUNTRY_BLACKLIST_CHAIN = "imunify360_country_blacklist" COUNTRY_WHITELIST_CHAIN = "imunify360_country_whitelist" BP_INPUT_CHAIN = "INPUT_imunify360_bp" BP_OUTPUT_CHAIN = "OUTPUT_imunify360_bp" LOG_BLACKLIST_CHAIN = "imunify360_log_bl" LOG_BLACKLISTED_COUNTRY_CHAIN = "imunify360_log_bl_country" LOG_GRAYLIST_CHAIN = "imunify360_log_gl" LOG_BLOCK_PORT_CHAIN = "imunify360_log_port_block" WEBSHIELD_PORTS_INPUT_CHAIN = "imunify360_webshield_ports" OSSEC_INPUT_CHAIN = "imunify360_ossec" DEFAULT_LOGLEVEL = "info" @classmethod def compose_rule(cls, *filters, action) -> RuleDef: return sum(filters, tuple()) + action @classmethod def compose_action(cls, action, **kwargs) -> RuleDef: args = ["-j", action] for k in sorted(kwargs.keys()): args.append("--" + k.replace("_", "-")) args.append(kwargs[k]) return tuple(args) @classmethod def interface(cls, interface): assert interface, 'Network interface "%s" is not valid!' % interface return ("-i", interface) @classmethod def block_dst_port_list(cls, ports, policy=DROP) -> tuple: return ( "-p", "tcp", "-m", "multiport", "--dport", ",".join(map(str, sorted(ports))), "-j", policy, ) @classmethod def protected_by_webshield(cls, dst_port, target_port) -> tuple: return ( "-p", "tcp", "-m", "multiport", "--dport", str(dst_port), "-j", "DNAT", "--to-destination", ":" + str(target_port), ) @classmethod def open_all_for_src_net(cls, net: str) -> RuleDef: """Return a rule to open traffic with source IP address""" return ("-s", net, "-j", cls.ACCEPT) @classmethod def open_dst_ports_for_src_list( cls, listname: str, ports: AbstractSet[int], policy=ACCEPT ) -> RuleDef: """Return a rule to open traffic with TCP destination `ports` and source addresses in ipset `listname`.""" return ( "-m", "set", "--match-set", listname, "src", "-m", "multiport", "-p", "tcp", "--dport", ",".join([str(p) for p in sorted(ports)]), "-j", policy, ) @staticmethod def redirect_to_captcha( listname: str, dest_port: int, target: int ) -> RuleDef: """Returns iptables command parameters to redirect traffic destined to tcp port `dest_port` to local port `target`""" return ( "-m", "set", "--match-set", listname, "src", "-p", "tcp", "--dport", str(dest_port), "-j", "DNAT", "--to-destination", ":" + str(target), ) @staticmethod def stop_redirection(listname: str) -> RuleDef: """Returns iptables command parameters to do not redirect traffic destined to webshield port""" return ( "-m", "set", "--match-set", listname, "dst", "-p", "tcp", "-j", "RETURN", ) @staticmethod def redirect_to_captcha_via_tproxy( listname: str, dest_port: int, target: int ) -> RuleDef: """Returns iptables command parameters to redirect traffic destined to tcp port `dest_port` to local port `target`, using TPROXY""" return ( "-m", "set", "--match-set", listname, "src", "-p", "tcp", "--dport", str(dest_port), "-j", "TPROXY", "--tproxy-mark", "0x1/0x1", # mark traffic for TPROXY "--on-port", str(target), ) @staticmethod def traffic_not_from_tproxy(set_name: str, policy: str = DROP) -> RuleDef: return ( "-m", "set", "--match-set", set_name, "src", "-m", "mark", "!", "--mark", "0x1/0x1", # mark traffic for TPROXY "-j", policy, ) @staticmethod def ipset(set_name): return ("-m", "set", "--match-set", set_name, "src") @staticmethod def ipset_rule(set_name, policy) -> RuleDef: """ RuleDef for ipset :param policy: ACCEPT, RETURN or DROP :param set_name: ipset collection :return: """ return FirewallRules.ipset(set_name) + FirewallRules.compose_action( policy ) @staticmethod def port_rule(set_name, port, proto, policy=REJECT) -> RuleDef: return ( "-m", "set", "!", "--match-set", set_name, "src", "-p", proto, "--dport", str(port), "-j", policy, ) @classmethod def smtp_test_rule(cls) -> RuleDef: return ( "-p", "tcp", "--dport", "9999", "-m", "owner", "--uid-owner", "0", "-j", cls.ACCEPT, ) @staticmethod def nflog_group(ip_version: IPVersion): return str(UnifiedAccessLogger.NFLOG_GROUPS[ip_version]) @staticmethod def nflog_action(group, prefix): return FirewallRules.compose_action( "NFLOG", nflog_group=group, nflog_prefix=prefix )
geri dön