refactor: Clean up and enhance cross-platform compatibility

This commit is contained in:
Pascal Bouquet 2025-12-03 15:28:22 +01:00
parent a768db1478
commit 71acc0f1f0
3 changed files with 162 additions and 87 deletions

59
inwx_cli.py Normal file → Executable file
View File

@ -3,61 +3,72 @@
import os
import getpass
from inwx_config import login, logout # Importiere Login/Logout aus der Konfigurationsdatei
from inwx_dns_functions import get_dns_info, add_record, update_record, delete_record # Importiere DNS-Funktionen
# Local imports from the modular structure
from inwx_config import login, logout
from inwx_dns_functions import get_dns_info, add_record, update_record, delete_record
# 1. Login-Daten abfragen (kann auch über Umgebungsvariablen INWX_USER/INWX_PASS gesetzt werden)
def main_menu():
"""Hauptmenü der CLI-Anwendung."""
"""
Main loop for the INWX DNS CLI application.
Handles user login, displays the main action menu, and routes input.
"""
# 1. Login-Daten abfragen
# 1. AUTHENTICATION & LOGIN
# Try to get credentials from environment variables first
INWX_USER = os.getenv('INWX_USER')
INWX_PASS = os.getenv('INWX_PASS')
if not INWX_USER:
INWX_USER = input("Gib deinen INWX-Benutzernamen ein: ")
INWX_USER = input("Enter your INWX username: ")
if not INWX_PASS:
# **WICHTIGE KORREKTUR HIER:** Verwende getpass.getpass()
INWX_PASS = getpass.getpass("Gib dein INWX-Passwort ein: ")
# Use getpass to securely hide the password input in the console
INWX_PASS = getpass.getpass("Enter your INWX password: ")
# Attempt to log in to the INWX API
if not login(INWX_USER, INWX_PASS):
return
# Exit if login fails (login function handles error messages)
return
# 2. Hauptschleife
# 2. MAIN APPLICATION LOOP
try:
while True:
# Das Menü im MC-Stil
# Display the main action menu (Emojis replaced for maximum compatibility)
print("\n" + "=" * 40)
print(" INWX DNS-CLI (MC-Style) - DOMAIN-VERWALTUNG")
print(" INWX DNS-CLI (MC-Style) - DOMAIN MANAGEMENT")
print("=" * 40)
print("1: 🌐 **Anzeigen** (nameserver.info)")
print("1: [i] **View Records** (nameserver.info)")
print("---")
print("2: **Hinzufügen** (nameserver.addRecord)")
print("3: ✏️ **Ändern** (nameserver.updateRecord)")
print("4: 🗑️ **Löschen** (nameserver.deleteRecord)")
print("2: [+] **Add Record** (nameserver.createRecord)")
print("3: [~] **Modify Record** (nameserver.updateRecord)")
print("4: [x] **Delete Record** (nameserver.deleteRecord)")
print("---")
print("9: 🚪 Logout & Beenden")
print("9: [>] Logout & Exit")
print("-" * 40)
choice = input("Wähle eine Aktion (1-4 oder 9): ").strip()
choice = input("Select an action (1-4 or 9): ").strip()
if choice == '1':
domain = input("Gib die Domain ein, deren Einträge du sehen möchtest: ").strip()
domain = input("Enter the domain whose records you want to view: ").strip()
if domain:
get_dns_info(domain)
elif choice == '2':
# nameserver.createRecord
add_record()
elif choice == '3':
# nameserver.updateRecord
update_record()
elif choice == '4':
# nameserver.deleteRecord
delete_record()
elif choice == '9':
break
else:
print("Ungültige Auswahl. Bitte versuche es erneut.")
print("Invalid selection. Please try again.")
finally:
# Stelle sicher, dass immer ausgeloggt wird
# Ensure logout is always performed, even if an error occurs
logout()
if __name__ == "__main__":

View File

@ -3,22 +3,23 @@
import requests
import json
import getpass # Needed for secure password input in 2FA prompt
# --- Konfiguration ---
# --- Global Configuration ---
API_ENDPOINT = "https://api.domrobot.com/jsonrpc/"
SESSION_ID = None
def api_call(method, params={}):
"""
Führt einen API-Aufruf aus (Post-Login) und gibt die JSON-Antwort zurück.
Verwendet die globale SESSION_ID im Cookie-Header.
Executes an API call (Post-Login) and returns the JSON response.
Uses the global SESSION_ID in the Cookie header for authentication.
"""
global SESSION_ID
# Stelle sicher, dass die Session-ID für alle Anfragen nach dem Login im Cookie-Header ist
# Ensure the session ID is included in the Cookie header
headers = {
"Content-Type": "application/json",
"Cookie": f"domrobot={SESSION_ID}"
"Cookie": f"domrobot={SESSION_ID}" if SESSION_ID else ""
}
payload = {
@ -33,14 +34,17 @@ def api_call(method, params={}):
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"\n❌ API-Fehler bei {method}: {e}")
print(f"\n[ERROR] API call failed for {method}: {e}")
return None
def login(user, password):
"""Loggt sich in die INWX API ein und speichert die Session-ID."""
"""
Logs into the INWX API and stores the session ID.
Includes a check for required Two-Factor Authentication (2FA).
"""
global SESSION_ID
print("⏳ Versuche, mich einzuloggen...")
print("--- Attempting standard login...")
headers = {"Content-Type": "application/json"}
payload = {
@ -53,32 +57,78 @@ def login(user, password):
try:
response = requests.post(API_ENDPOINT, headers=headers, data=json.dumps(payload), timeout=10)
response.raise_for_status()
result_json = response.json()
# 1. Standard Login Successful
if result_json.get('code') == 1000:
if 'domrobot' in response.cookies:
SESSION_ID = response.cookies['domrobot']
print("✅ Login erfolgreich! Session-ID gespeichert.")
print("[SUCCESS] Login successful! Session ID stored.")
return True
else:
print("❌ Login fehlgeschlagen: 'code 1000' erhalten, aber kein Session-Cookie.")
print("[ERROR] Login failed: 'code 1000' received, but no session cookie.")
# 2. 2FA Required (Error Message Check)
elif result_json.get('msg') and "two-factor-auth" in result_json.get('msg').lower():
print("[NOTICE] Two-Factor Authentication (2FA) is required.")
return login_2fa(user, password) # Proceed to 2FA login flow
# 3. Other Login Error
else:
print(f"❌ Login fehlgeschlagen: {result_json.get('msg', 'Unbekannter Fehler.')}")
print(f"[ERROR] Login failed: {result_json.get('msg', 'Unknown error.')}")
except requests.exceptions.RequestException as e:
print(f"\n❌ API-Fehler beim Login-Request: {e}")
print(f"\n[ERROR] API request failed during standard login: {e}")
return False
def login_2fa(user, password):
"""Performs the login using TOTP (account.login2fa)."""
global SESSION_ID
# Prompt user securely for the TOTP code
topt_code = input("Enter your TOTP/2FA code: ").strip()
headers = {"Content-Type": "application/json"}
payload = {
"jsonrpc": "2.0",
"method": "account.login2fa",
"params": {
"user": user,
"pass": password,
"topt": topt_code
},
"id": 1
}
try:
response = requests.post(API_ENDPOINT, headers=headers, data=json.dumps(payload), timeout=10)
response.raise_for_status()
result_json = response.json()
if result_json.get('code') == 1000:
if 'domrobot' in response.cookies:
SESSION_ID = response.cookies['domrobot']
print("[SUCCESS] 2FA Login successful! Session ID stored.")
return True
else:
print("[ERROR] 2FA Login failed: No session cookie received.")
else:
print(f"[ERROR] 2FA Login failed: {result_json.get('msg', 'Incorrect code or unknown error.')}")
except requests.exceptions.RequestException as e:
print(f"\n[ERROR] API request failed during 2FA login: {e}")
return False
def logout():
"""Loggt sich aus der INWX API aus."""
"""Logs out of the INWX API and clears the global session ID."""
global SESSION_ID
if not SESSION_ID:
return
print("\n⏳ Logge mich aus...")
# Hier verwenden wir api_call, die jetzt die SESSION_ID verwendet
print("\n--- Logging out...")
api_call("account.logout", {})
SESSION_ID = None
print("✅ Logout erfolgreich.")
print("[SUCCESS] Logout successful.")

View File

@ -1,50 +1,57 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from inwx_config import api_call # Importiere die Basis-API-Funktion
from inwx_config import api_call # Import the base API calling function
def get_dns_info(domain):
"""Ruft die DNS-Einträge für eine Domain ab und gibt sie im MC-Stil aus."""
print(f"\n⏳ Rufe DNS-Einträge für {domain} ab...")
"""Fetches and displays all DNS records for a given domain in a formatted table."""
print(f"\n--- Fetching DNS records for {domain}...")
result = api_call("nameserver.info", {"domain": domain})
if result and result.get('code') == 1000 and 'record' in result.get('resData', {}):
print(f"✅ DNS-Einträge für **{domain}**:")
print(f"[SUCCESS] DNS Records for **{domain}**:")
records = result['resData']['record']
# Tabelle im MC-Stil ausgeben
# Output the table header
print("-" * 100)
print(f"| {'ID':<10} | {'Name/Host':<15} | {'Typ':<5} | {'TTL':<5} | {'Inhalt/Content':<50} |")
print(f"| {'ID':<10} | {'Name/Host':<15} | {'Type':<5} | {'TTL':<5} | {'Content':<50} |")
print("-" * 100)
for record in records:
# Begrenze Name und Content für eine saubere Ausgabe
# Truncate Name and Content for clean table output
name_short = record.get('name', '')[:14].ljust(15)
content_short = record.get('content', '')[:49].ljust(50)
# Note: record['id'] is handled as string input/output now (API change)
print(f"| {str(record['id']):<10} | {name_short} | {record['type']:<5} | {str(record['ttl']):<5} | {content_short} |")
print("-" * 100)
return True
elif result and 'msg' in result:
print(f"❌ Fehler: {result.get('msg', 'Unbekannter Fehler.')}")
print(f"[ERROR] Failed to fetch records: {result.get('msg', 'Unknown error.')}")
else:
print("❌ Fehler beim Abrufen der DNS-Einträge (API-Antwort unerwartet).")
print("[ERROR] Failed to fetch records (Unexpected API response).")
return False
def add_record():
"""Fügt einen neuen DNS-Eintrag hinzu (nameserver.createRecord)."""
print("\n--- NEUEN DNS-EINTRAG HINZUFÜGEN ---")
domain = input("Domain (z.B. bouquet24.de): ").strip()
name = input("Host/Subdomain (leer lassen für die Hauptdomain '@'): ").strip()
record_type = input("Typ (A, CNAME, TXT, MX, etc.): ").strip().upper()
content = input("Inhalt/Content (IP-Adresse, Ziel, Text): ").strip()
# Optional: TTL mit Standardwert 3600
def add_record():
"""Adds a new DNS record using the nameserver.createRecord API method."""
print("\n--- ADD NEW DNS RECORD ---")
domain = input("Domain (e.g., example.com): ").strip()
name = input("Host/Subdomain (leave blank for main domain '@'): ").strip()
record_type = input("Type (A, CNAME, TXT, MX, etc.): ").strip().upper()
content = input("Content (IP address, target, text): ").strip()
# Optional: TTL with default value 3600
try:
ttl = int(input("TTL (Sekunden, Standard: 3600): ") or 3600)
ttl = int(input("TTL (seconds, default: 3600): ") or 3600)
except ValueError:
print("Ungültige TTL. Verwende Standardwert 3600.")
print("[WARNING] Invalid TTL. Using default value 3600.")
ttl = 3600
params = {
@ -55,71 +62,78 @@ def add_record():
"ttl": ttl
}
print(f"\n⏳ Sende Anfrage zum Hinzufügen von {name}.{domain}...")
print(f"\n--- Sending request to add record {name}.{domain}...")
result = api_call("nameserver.createRecord", params)
if result and result.get('code') == 1000:
new_id = result.get('resData', {}).get('id', 'N/A')
print(f"✅ Eintrag erfolgreich hinzugefügt! ID: **{new_id}**")
print(f"[SUCCESS] Record successfully added! ID: **{new_id}**")
else:
print(f"❌ Fehler beim Hinzufügen: {result.get('msg', 'Unbekannter Fehler.')}")
print(f"[ERROR] Failed to add record: {result.get('msg', 'Unknown error.')}")
def update_record():
"""Ändert einen bestehenden DNS-Eintrag (nameserver.updateRecord)."""
print("\n--- BESTEHENDEN DNS-EINTRAG ÄNDERN ---")
"""Modifies an existing DNS record using the nameserver.updateRecord API method."""
print("\n--- MODIFY EXISTING DNS RECORD ---")
record_id = input("ID des zu ändernden Eintrags (siehe Option 1): ").strip()
record_id = input("ID of the record to modify (see Option 1): ").strip()
if not record_id.isdigit():
print("❌ Ungültige ID.")
print("[ERROR] Invalid ID.")
return
# Frage nach den Werten, die geändert werden sollen
new_content = input("Neuer Inhalt/Content (leer lassen, um nicht zu ändern): ").strip()
new_ttl = input("Neue TTL (Sekunden, leer lassen, um nicht zu ändern): ").strip()
# Prompt for values to change
new_content = input("New Content (leave blank to skip): ").strip()
new_ttl = input("New TTL (seconds, leave blank to skip): ").strip()
params = {"id": record_id}
# Pass ID as string (INWX API change compliance)
params = {"id": record_id}
if new_content:
params["content"] = new_content
if new_ttl:
try:
params["ttl"] = int(new_ttl)
except ValueError:
print("❌ Ungültige TTL. Abbruch der Änderung.")
print("[ERROR] Invalid TTL. Modification aborted.")
return
if len(params) <= 1:
print("Keine Änderungen angegeben. Vorgang abgebrochen.")
print("No changes specified. Operation aborted.")
return
print(f"\n⏳ Sende Anfrage zur Änderung der ID **{record_id}**...")
print(f"\n--- Sending request to modify ID **{record_id}**...")
result = api_call("nameserver.updateRecord", params)
if result and result.get('code') == 1000:
print(f"✅ Eintrag ID **{record_id}** erfolgreich aktualisiert.")
print(f"[SUCCESS] Record ID **{record_id}** successfully updated.")
else:
print(f"❌ Fehler beim Ändern: {result.get('msg', 'Unbekannter Fehler.')}")
print(f"[ERROR] Failed to modify record: {result.get('msg', 'Unknown error.')}")
def delete_record():
"""Löscht einen DNS-Eintrag (nameserver.deleteRecord)."""
print("\n--- DNS-EINTRAG LÖSCHEN ---")
"""Deletes a DNS record using the nameserver.deleteRecord API method."""
print("\n--- DELETE DNS RECORD ---")
record_id = input("ID des zu löschenden Eintrags (siehe Option 1): ").strip()
record_id = input("ID of the record to delete (see Option 1): ").strip()
if not record_id.isdigit():
print("❌ Ungültige ID.")
print("[ERROR] Invalid ID.")
return
confirm = input(f"Soll Eintrag ID **{record_id}** wirklich gelöscht werden? (J/N): ").strip().upper()
if confirm != 'J':
print("Löschvorgang abgebrochen.")
confirm = input(f"Are you sure you want to delete record ID **{record_id}**? (Y/N): ").strip().upper()
if confirm != 'Y':
print("Deletion aborted.")
return
params = {"id": record_id}
# Pass ID as string (INWX API change compliance)
params = {"id": record_id}
print(f"\n⏳ Sende Anfrage zum Löschen der ID **{record_id}**...")
print(f"\n--- Sending request to delete ID **{record_id}**...")
result = api_call("nameserver.deleteRecord", params)
if result and result.get('code') == 1000:
print(f"✅ Eintrag ID **{record_id}** erfolgreich gelöscht.")
print(f"[SUCCESS] Record ID **{record_id}** successfully deleted.")
else:
print(f"❌ Fehler beim Löschen: {result.get('msg', 'Unbekannter Fehler.')}")
print(f"[ERROR] Failed to delete record: {result.get('msg', 'Unknown error.')}")