feat(auth): Implement robust session handling and 2FA unlock flow

This commit is contained in:
Pascal Bouquet 2025-12-03 16:01:17 +01:00
parent 71acc0f1f0
commit db99aed6ba

View File

@ -3,25 +3,26 @@
import requests import requests
import json import json
import getpass # Needed for secure password input in 2FA prompt import getpass # Needed for secure user input
# from onetimepass import get_totp # Kann für automatisches TOTP verwendet werden
# --- Global Configuration --- # --- Global Configuration ---
API_ENDPOINT = "https://api.domrobot.com/jsonrpc/" API_ENDPOINT = "https://api.domrobot.com/jsonrpc/"
SESSION_ID = None GLOBAL_SESSION = None # Stores the requests.Session object for all API calls
def api_call(method, params={}): def api_call(method, params={}):
""" """
Executes an API call (Post-Login) and returns the JSON response. Executes an API call using the global requests.Session object.
Uses the global SESSION_ID in the Cookie header for authentication. Cookies and session state are managed automatically.
""" """
global SESSION_ID global GLOBAL_SESSION
# Ensure the session ID is included in the Cookie header if GLOBAL_SESSION is None:
headers = { print("[FATAL] Session not initialized. Please log in first.")
"Content-Type": "application/json", return None
"Cookie": f"domrobot={SESSION_ID}" if SESSION_ID else ""
}
headers = {"Content-Type": "application/json"}
payload = { payload = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": method, "method": method,
@ -30,22 +31,43 @@ def api_call(method, params={}):
} }
try: try:
response = requests.post(API_ENDPOINT, headers=headers, data=json.dumps(payload), timeout=10) # Use the GLOBAL_SESSION object for the request
response = GLOBAL_SESSION.post(API_ENDPOINT, headers=headers, data=json.dumps(payload), timeout=10)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print(f"\n[ERROR] API call failed for {method}: {e}") print(f"\n[ERROR] API call failed for {method}: {e}")
return None return None
def login_2fa_unlock():
"""Performs the 2FA unlock step using the account.unlock API method."""
# Prompt user securely for the TOTP code
topt_code = input("Enter your TOTP/2FA code: ").strip()
# Use account.unlock with the token (tan) to complete the login
unlock_result = api_call("account.unlock", {'tan': topt_code})
if unlock_result and unlock_result.get('code') == 1000:
print("[SUCCESS] 2FA Unlock successful.")
return True
else:
print(f"[ERROR] 2FA Unlock failed: {unlock_result.get('msg', 'Incorrect code or unknown error.')}")
return False
def login(user, password): def login(user, password):
""" """
Logs into the INWX API and stores the session ID. Logs into the INWX API, initializes the global session, and handles 2FA challenges.
Includes a check for required Two-Factor Authentication (2FA).
""" """
global SESSION_ID global GLOBAL_SESSION
print("--- Attempting standard login...") print("--- Attempting standard login...")
# Initialize the global session object
GLOBAL_SESSION = requests.Session()
headers = {"Content-Type": "application/json"} headers = {"Content-Type": "application/json"}
payload = { payload = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
@ -55,25 +77,24 @@ def login(user, password):
} }
try: try:
response = requests.post(API_ENDPOINT, headers=headers, data=json.dumps(payload), timeout=10) # Send Request using the newly initialized session
response = GLOBAL_SESSION.post(API_ENDPOINT, headers=headers, data=json.dumps(payload), timeout=10)
response.raise_for_status() response.raise_for_status()
result_json = response.json() result_json = response.json()
# 1. Standard Login Successful # 1. Standard Login Successful
if result_json.get('code') == 1000: if result_json.get('code') == 1000:
if 'domrobot' in response.cookies:
SESSION_ID = response.cookies['domrobot']
print("[SUCCESS] Login successful! Session ID stored.")
return True
else:
print("[ERROR] Login failed: 'code 1000' received, but no session cookie.")
# 2. 2FA Required (Error Message Check) # Check for immediate 2FA requirement (if 'tfa' is not '0')
elif result_json.get('msg') and "two-factor-auth" in result_json.get('msg').lower(): if result_json.get('resData', {}).get('tfa') not in [None, '0']:
print("[NOTICE] Two-Factor Authentication (2FA) is required.") print("[NOTICE] Two-Factor Authentication (2FA) required.")
return login_2fa(user, password) # Proceed to 2FA login flow return login_2fa_unlock() # Proceed to 2FA unlock flow
# 3. Other Login Error # Regular login successful (or 2FA successful via unlock)
print("[SUCCESS] Login successful! Session stored.")
return True
# 2. Other Login Error
else: else:
print(f"[ERROR] Login failed: {result_json.get('msg', 'Unknown error.')}") print(f"[ERROR] Login failed: {result_json.get('msg', 'Unknown error.')}")
@ -82,53 +103,19 @@ def login(user, password):
return False return False
def login_2fa(user, password):
"""Performs the login using TOTP (account.login2fa)."""
global SESSION_ID
# Prompt user securely for the TOTP code
topt_code = input("Enter your TOTP/2FA code: ").strip()
headers = {"Content-Type": "application/json"}
payload = {
"jsonrpc": "2.0",
"method": "account.login2fa",
"params": {
"user": user,
"pass": password,
"topt": topt_code
},
"id": 1
}
try:
response = requests.post(API_ENDPOINT, headers=headers, data=json.dumps(payload), timeout=10)
response.raise_for_status()
result_json = response.json()
if result_json.get('code') == 1000:
if 'domrobot' in response.cookies:
SESSION_ID = response.cookies['domrobot']
print("[SUCCESS] 2FA Login successful! Session ID stored.")
return True
else:
print("[ERROR] 2FA Login failed: No session cookie received.")
else:
print(f"[ERROR] 2FA Login failed: {result_json.get('msg', 'Incorrect code or unknown error.')}")
except requests.exceptions.RequestException as e:
print(f"\n[ERROR] API request failed during 2FA login: {e}")
return False
def logout(): def logout():
"""Logs out of the INWX API and clears the global session ID.""" """Logs out of the INWX API and clears the global session object."""
global SESSION_ID global GLOBAL_SESSION
if not SESSION_ID:
if GLOBAL_SESSION is None:
return return
print("\n--- Logging out...") print("\n--- Logging out...")
# Use api_call, which uses GLOBAL_SESSION, to send the logout request
api_call("account.logout", {}) api_call("account.logout", {})
SESSION_ID = None
GLOBAL_SESSION = None
print("[SUCCESS] Logout successful.") print("[SUCCESS] Logout successful.")