Initial commit: VMail Manager Tool mit Argon2id Support

This commit is contained in:
root
2026-01-10 10:16:36 +01:00
commit f9c8c4e6c7
3 changed files with 246 additions and 0 deletions

222
vmailtool.py Executable file
View File

@@ -0,0 +1,222 @@
#!/root/sql/vmail-env/bin/python3
import mysql.connector
import configparser
import getpass
import sys
import socket
from argon2 import PasswordHasher, Type
# Konfiguration laden
config = configparser.ConfigParser()
config.read('config.ini')
hostname = socket.getfqdn()
DEFAULT_DOM = config.get('database', 'default_domain', fallback="")
ph = PasswordHasher(
time_cost=3,
memory_cost=131072,
parallelism=4,
hash_len=32,
salt_len=16,
type=Type.ID
)
def get_db_connection():
try:
return mysql.connector.connect(
host=config['database']['host'],
user=config['database']['user'],
password=config['database']['password'],
database=config['database']['database']
)
except Exception as e:
print(f"Fehler: {e}")
sys.exit(1)
def hash_pw(password):
return f"{{ARGON2ID}}{ph.hash(password)}"
def get_domain_input(prompt="Domain"):
if DEFAULT_DOM:
val = input(f"{prompt} [{DEFAULT_DOM}]: ").strip()
return val if val else DEFAULT_DOM
return input(f"{prompt}: ").strip()
# --- FUNKTIONEN ---
def show_users():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT id, username, domain, enabled, quota FROM accounts ORDER BY id ASC")
rows = cursor.fetchall()
print(f"\n{'ID':<4} | {'Email Adresse':<35} | {'Status':<10} | {'Quota'}")
print("-" * 65)
for r in rows:
status = "AKTIV" if r[3] else "GESPERRT"
print(f"{r[0]:<4} | {r[1]+'@'+r[2]:<35} | {status:<10} | {r[4]} MB")
conn.close()
def add_user():
user = input("Username (vor dem @): ")
dom = get_domain_input()
pw = getpass.getpass("Passwort: ")
hashed = hash_pw(pw)
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO accounts (username, domain, password, quota, enabled) VALUES (%s, %s, %s, 1024, 1)",
(user, dom, hashed))
conn.commit()
print(f"✔ User {user}@{dom} angelegt.")
except Exception as e: print(f"❌ Fehler: {e}")
conn.close()
def del_user():
email = input("E-Mail des zu LÖSCHENDEN Users: ")
if '@' not in email: return
u, d = email.split('@')
confirm = input(f"Account {email} wirklich löschen? (y/N): ")
if confirm.lower() != 'y': return
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM accounts WHERE username = %s AND domain = %s", (u, d))
conn.commit()
print("✔ User gelöscht.")
conn.close()
def show_aliases():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT id, source_username, source_domain, destination_username, destination_domain FROM aliases ORDER BY id ASC")
rows = cursor.fetchall()
print(f"\n{'ID':<4} | {'Alias (Source)':<30} | {'Ziel (Destination)':<30}")
print("-" * 70)
for r in rows:
print(f"{r[0]:<4} | {r[1]+'@'+r[2]:<30} | {r[3]+'@'+r[4]:<30}")
conn.close()
def add_alias():
s_user = input("Alias Name (Source): ")
s_dom = get_domain_input("Alias Domain")
d_user = input("Ziel Username (Dest): ")
d_dom = get_domain_input("Ziel Domain")
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO aliases (source_username, source_domain, destination_username, destination_domain, enabled) VALUES (%s, %s, %s, %s, 1)",
(s_user, s_dom, d_user, d_dom))
conn.commit()
print("✔ Alias angelegt.")
except Exception as e: print(f"❌ Fehler: {e}")
conn.close()
def del_alias():
alias_id = input("ID des zu löschenden Alias: ")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM aliases WHERE id = %s", (alias_id,))
conn.commit()
print(f"✔ Alias ID {alias_id} gelöscht.")
conn.close()
def show_domains():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT id, domain FROM domains ORDER BY id ASC")
rows = cursor.fetchall()
print(f"\n{'ID':<4} | {'Domain'}")
print("-" * 30)
for r in rows:
print(f"{r[0]:<4} | {r[1]}")
conn.close()
def add_domain():
dom = input("Neue Domain: ").strip()
if not dom: return
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO domains (domain) VALUES (%s)", (dom,))
conn.commit()
print(f"✔ Domain {dom} registriert.")
except Exception as e: print(f"❌ Fehler: {e}")
conn.close()
def del_domain():
dom = input("Zu löschende Domain (Name eingeben): ").strip()
if not dom: return
confirm = input(f"Soll die Domain '{dom}' wirklich gelöscht werden? (y/N): ")
if confirm.lower() != 'y': return
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("DELETE FROM domains WHERE domain = %s", (dom,))
conn.commit()
if cursor.rowcount > 0:
print(f"✔ Domain '{dom}' wurde gelöscht.")
else:
print("❌ Domain nicht gefunden.")
except mysql.connector.Error as e:
if e.errno == 1451:
print("❌ Fehler: Domain kann nicht gelöscht werden, da noch User oder Aliase damit verknüpft sind!")
else:
print(f"❌ Fehler: {e}")
conn.close()
def change_pw():
email = input("E-Mail für PW-Wechsel: ")
if '@' not in email: return
u, d = email.split('@')
pw = getpass.getpass("Neues Passwort: ")
hashed = hash_pw(pw)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("UPDATE accounts SET password = %s WHERE username = %s AND domain = %s", (hashed, u, d))
conn.commit()
print("✔ Passwort aktualisiert.")
conn.close()
def toggle_status():
email = input("E-Mail zum (De)aktivieren: ")
if '@' not in email: return
u, d = email.split('@')
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("UPDATE accounts SET enabled = NOT enabled WHERE username = %s AND domain = %s", (u, d))
conn.commit()
print("✔ Status geändert.")
conn.close()
# --- MAIN ---
def menu():
while True:
print("\n" + "="*60)
print(f" VMAIL MANAGER ({hostname})")
print("="*60)
print("1) Show Users 6) Del Alias")
print("2) Add User (1024MB) 7) Domains (Show/Add/Del)")
print("3) Del User 8) Change Password")
print("4) Show Aliases 9) Toggle Status (An/Aus)")
print("5) Add Alias 0) Beenden")
cmd = input("\nAuswahl > ")
if cmd == '1': show_users()
elif cmd == '2': add_user()
elif cmd == '3': del_user()
elif cmd == '4': show_aliases()
elif cmd == '5': add_alias()
elif cmd == '6': del_alias()
elif cmd == '7':
show_domains()
sub_cmd = input("\n[A]dd Domain, [D]el Domain oder [Enter] zurück: ").lower()
if sub_cmd == 'a': add_domain()
elif sub_cmd == 'd': del_domain()
elif cmd == '8': change_pw()
elif cmd == '9': toggle_status()
elif cmd == '0': break
if __name__ == "__main__":
menu()