#!/usr/bin/env python3 import Milter import dkim import authres import sys import configparser import os import re import logging from io import BytesIO CONFIG_PATH = "/etc/pyarc/milter.conf" config = configparser.ConfigParser() if not os.path.exists(CONFIG_PATH): print(f"Fehler: Konfigurationsdatei nicht gefunden unter {CONFIG_PATH}", file=sys.stderr) sys.exit(1) config.read(CONFIG_PATH) # --- Logging Setup --- LOG_ENABLED = config.getboolean("logging", "file_logging", fallback=False) LOG_FILE = config.get("logging", "log_file", fallback="/var/log/pyarc/pyarc.log") logger = logging.getLogger("pyarc_milter") logger.setLevel(logging.INFO) stdout_handler = logging.StreamHandler(sys.stdout) stdout_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') stdout_handler.setFormatter(stdout_formatter) logger.addHandler(stdout_handler) if LOG_ENABLED: log_dir = os.path.dirname(LOG_FILE) if not os.path.exists(log_dir): try: os.makedirs(log_dir, exist_ok=True) except Exception as e: logger.error(f"Konnte Log-Verzeichnis {log_dir} nicht erstellen: {e}") if os.path.exists(log_dir): try: file_handler = logging.FileHandler(LOG_FILE) file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - [%(process)d] - %(message)s') file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) except Exception as e: logger.error(f"Konnte Log-Datei {LOG_FILE} nicht öffnen: {e}") # --- Allgemeine Config laden --- AUTH_SERV_ID = config.get("general", "auth_serv_id", fallback="mx01.domain.tld") LISTEN_SOCKET = config.get("general", "listen_socket", fallback="inet:8899@127.0.0.1") REJECT_ON_FAIL = config.getboolean("validation", "reject_on_fail", fallback=False) DOMAIN_REGEX = re.compile(r'@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})') class ArcMilter(Milter.Base): def __init__(self): self.id = Milter.uniqueID() self.headers = [] self.body_buffer = BytesIO() self.from_domain = None def header(self, name, hval): self.headers.append((name.encode('utf-8'), hval.encode('utf-8'))) if name.lower() == 'from': match = DOMAIN_REGEX.search(hval) if match: self.from_domain = match.group(1).lower().strip() return Milter.CONTINUE def body(self, chunk): self.body_buffer.write(chunk) return Milter.CONTINUE def eom(self): self.body_buffer.seek(0) full_mail = b"".join([b"%s: %s\r\n" % (k, v) for k, v in self.headers]) + b"\r\n" + self.body_buffer.read() try: # --- 1. Eingehend: ARC Validierung --- arc_verifier = dkim.ARC(full_mail) cv, results, comment = arc_verifier.verify() cv_str = cv.decode('utf-8') if isinstance(cv, bytes) else str(cv) logger.info(f"[{self.id}] Validierung für Domain '{self.from_domain}': CV={cv_str} ({comment})") self.addheader('Authentication-Results', f"{AUTH_SERV_ID}; arc={cv_str}") is_arc_valid = (cv_str == "pass") if cv_str == "fail" and REJECT_ON_FAIL: logger.warning(f"[{self.id}] Mail abgewiesen: ARC Validation fehlgeschlagen.") self.setreply('550', '5.7.1', 'ARC validation failed') return Milter.REJECT # --- 2. Ausgehend: Dynamische ARC Signierung --- if self.from_domain and config.has_section(self.from_domain): selector = config.get(self.from_domain, "selector") key_path = config.get(self.from_domain, "private_key_path") if os.path.exists(key_path): with open(key_path, "rb") as f: private_key = f.read() sig_headers = dkim.arc_sign( full_mail, selector=selector.encode('utf-8'), domain=self.from_domain.encode('utf-8'), privkey=private_key, srv_id=AUTH_SERV_ID.encode('utf-8') ) for header_line in sig_headers: if b':' in header_line: name, val = header_line.split(b':', 1) self.addheader(name.decode('utf-8').strip(), val.decode('utf-8').strip()) logger.info(f"[{self.id}] Mail für {self.from_domain} erfolgreich signiert (Selector: {selector}).") is_arc_valid = True else: logger.error(f"[{self.id}] Key für {self.from_domain} nicht gefunden unter {key_path}") self.addheader('X-ARC', 'TRUE' if is_arc_valid else 'FALSE') except Exception as e: logger.error(f"[{self.id}] Fehler beim ARC-Processing: {e}", exc_info=True) return Milter.CONTINUE def main(): Milter.factory = ArcMilter logger.info(f"Multidomain ARC Milter startet auf {LISTEN_SOCKET}...") Milter.runmilter("ArcMilter", LISTEN_SOCKET, timeout=30) if __name__ == "__main__": main()