{ "packages": ["sqlite3"] }

PyScript EÜR

Login + SQLite + Belege + EÜR + CSV + Jahresfilter + DB Export/Import + ZIP Export (CSV+Belege).

Login / Registrierung

import asyncio import base64, hashlib, hmac, os, csv, json from io import StringIO from datetime import date, datetime from pyscript import window from pyscript import storage, py_import from pyodide.ffi import create_proxy DOM = window.document def el(id_: str): return DOM.getElementById(id_) DB_KEY = "eur_sqlite_db_b64" SESSION_KEY = "eur_session_user" PROXIES = [] # ---------------------------- # UI helpers # ---------------------------- def set_text(id_: str, txt: str, danger=False, ok=False): e = el(id_) e.innerText = txt cls = "muted" if danger: cls += " danger" if ok: cls += " ok" e.className = cls def show(id_: str, visible: bool): e = el(id_) if visible: e.classList.remove("hidden") else: e.classList.add("hidden") def get_value(id_: str) -> str: return el(id_).value def reset_msgs(): set_text("login_msg", "") set_text("tx_msg", "") set_text("table_msg", "") set_text("backup_msg", "") def eur(x: float) -> str: s = f"{x:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") return f"{s} €" def now_iso() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") # ---------------------------- # Pure-Python PBKDF2-HMAC-SHA256 # ---------------------------- def pbkdf2_hmac_sha256(password: bytes, salt: bytes, iterations: int, dklen: int = 32) -> bytes: hlen = 32 l = (dklen + hlen - 1) // hlen dk = b"" for i in range(1, l + 1): u = hmac.new(password, salt + i.to_bytes(4, "big"), hashlib.sha256).digest() t = u for _ in range(iterations - 1): u = hmac.new(password, u, hashlib.sha256).digest() t = bytes(a ^ b for a, b in zip(t, u)) dk += t return dk[:dklen] def pbkdf2_hash(password: str, salt: bytes, iterations: int = 120_000) -> bytes: return pbkdf2_hmac_sha256(password.encode("utf-8"), salt, iterations, dklen=32) def consteq(a: bytes, b: bytes) -> bool: return hmac.compare_digest(a, b) # ---------------------------- # Storage # ---------------------------- async def storage_get(key: str): store = await storage("pyscript-eur-demo") return store.get(key) async def storage_set(key: str, value): store = await storage("pyscript-eur-demo") store[key] = value async def storage_del(key: str): store = await storage("pyscript-eur-demo") try: del store[key] except Exception: pass # ---------------------------- # sqlite3 laden # ---------------------------- sqlite3 = None try: import sqlite3 as _sqlite3 sqlite3 = _sqlite3 except Exception: sqlite3 = None async def ensure_sqlite3(): global sqlite3 if sqlite3 is not None: return True try: sqlite3_mod, = await py_import("sqlite3") sqlite3 = sqlite3_mod return True except Exception as e: set_text("tech_msg", f"sqlite3 nicht ladbar: {type(e).__name__}: {e}", danger=True) return False # ---------------------------- # SQLite + Persistenz # ---------------------------- def create_conn(db_path: str): conn = sqlite3.connect(db_path) conn.execute("PRAGMA journal_mode=DELETE;") conn.execute("PRAGMA synchronous=FULL;") conn.execute("PRAGMA foreign_keys=ON;") return conn def init_schema(conn): conn.executescript(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, salt_b64 TEXT NOT NULL, pw_hash_b64 TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS receipts ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT, mime TEXT, content_b64 TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS transactions ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, t_date TEXT NOT NULL, t_type TEXT NOT NULL, category TEXT NOT NULL, amount REAL NOT NULL, description TEXT, receipt_id INTEGER, created_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE, FOREIGN KEY(receipt_id) REFERENCES receipts(id) ON DELETE SET NULL ); """) conn.commit() async def load_db_bytes(): b64 = await storage_get(DB_KEY) if not b64: return None try: return base64.b64decode(b64) except Exception: return None async def save_db_bytes(db_path: str, conn=None): try: if conn is not None: conn.commit() except Exception: pass with open(db_path, "rb") as f: raw = f.read() await storage_set(DB_KEY, base64.b64encode(raw).decode("ascii")) async def open_db(): db_path = "eur.sqlite" raw = await load_db_bytes() if raw is None: conn = create_conn(db_path) init_schema(conn) await save_db_bytes(db_path, conn) return conn, db_path with open(db_path, "wb") as f: f.write(raw) conn = create_conn(db_path) init_schema(conn) return conn, db_path async def reopen_db_from_storage(): global CONN, DB_PATH try: if CONN is not None: CONN.close() except Exception: pass CONN, DB_PATH = await open_db() CONN = None DB_PATH = None CURRENT_USER = None # ---------------------------- # Auth # ---------------------------- def get_user_by_username(conn, username: str): cur = conn.execute("SELECT id, username, salt_b64, pw_hash_b64 FROM users WHERE username = ?", (username,)) row = cur.fetchone() if not row: return None return {"id": row[0], "username": row[1], "salt_b64": row[2], "pw_hash_b64": row[3]} def create_user(conn, username: str, password: str): salt = os.urandom(16) pw_hash = pbkdf2_hash(password, salt) conn.execute( "INSERT INTO users(username, salt_b64, pw_hash_b64, created_at) VALUES(?,?,?,?)", (username, base64.b64encode(salt).decode("ascii"), base64.b64encode(pw_hash).decode("ascii"), now_iso()) ) conn.commit() async def set_session(username: str): await storage_set(SESSION_KEY, username) async def clear_session(): await storage_del(SESSION_KEY) async def get_session_user(): return await storage_get(SESSION_KEY) def render_whoami(): el("whoami").innerText = CURRENT_USER["username"] if CURRENT_USER else "" def set_year_range(year: int): el("f_from").value = f"{year}-01-01" el("f_to").value = f"{year}-12-31" def set_default_dates(): today = date.today().isoformat() el("t_date").value = today set_year_range(date.today().year) # ---------------------------- # Event binding # ---------------------------- def bind_click(id_: str, async_func): def _py_handler(evt): asyncio.create_task(_safe_call(async_func, evt)) proxy = create_proxy(_py_handler) PROXIES.append(proxy) el(id_).addEventListener("click", proxy) def bind_change(id_: str, async_func): def _py_handler(evt): asyncio.create_task(_safe_call(async_func, evt)) proxy = create_proxy(_py_handler) PROXIES.append(proxy) el(id_).addEventListener("change", proxy) async def _safe_call(async_func, evt): try: await async_func(evt) except Exception as e: set_text("tech_msg", f"Fehler: {type(e).__name__}: {e}", danger=True) raise # ---------------------------- # Year filter # ---------------------------- def fetch_years_for_user(user_id: int): cur = CONN.execute( "SELECT DISTINCT substr(t_date, 1, 4) AS y FROM transactions WHERE user_id = ? ORDER BY y DESC", (user_id,) ) years = [int(r[0]) for r in cur.fetchall() if r and r[0]] return years async def rebuild_year_select(select_year: int | None = None): sel = el("year_select") while sel.firstChild: sel.removeChild(sel.firstChild) years = fetch_years_for_user(CURRENT_USER["id"]) cy = date.today().year if cy not in years: years = [cy] + years if not years: years = [cy] for y in years: opt = DOM.createElement("option") opt.value = str(y) opt.innerText = str(y) sel.appendChild(opt) if select_year is None: select_year = years[0] sel.value = str(select_year) async def on_year_change(evt=None): y = int(get_value("year_select")) set_year_range(y) await refresh_dashboard() # ---------------------------- # Transactions / EÜR # ---------------------------- def parse_filters(): f_from = get_value("f_from") or "1900-01-01" f_to = get_value("f_to") or "2999-12-31" return f_from, f_to def fetch_transactions(): f_from, f_to = parse_filters() cur = CONN.execute( """SELECT t.id, t.t_date, t.t_type, t.category, t.amount, t.description, r.id as receipt_id, r.filename, r.mime FROM transactions t LEFT JOIN receipts r ON r.id = t.receipt_id WHERE t.user_id = ? AND t.t_date BETWEEN ? AND ? ORDER BY t.t_date DESC, t.id DESC""", (CURRENT_USER["id"], f_from, f_to) ) rows = cur.fetchall() out = [] for r in rows: out.append({ "id": r[0], "t_date": r[1], "t_type": r[2], "category": r[3], "amount": float(r[4]), "description": r[5] or "", "receipt_id": r[6], "receipt_filename": r[7] or "", "receipt_mime": r[8] or "" }) return out def fetch_receipt_by_id(rid: int): cur = CONN.execute("SELECT filename, mime, content_b64 FROM receipts WHERE id = ?", (rid,)) row = cur.fetchone() if not row: return None return {"filename": row[0] or f"beleg_{rid}", "mime": row[1] or "application/octet-stream", "content_b64": row[2]} def safe_filename(name: str) -> str: bad = ['\\', '/', ':', '*', '?', '"', '<', '>', '|'] for ch in bad: name = name.replace(ch, "_") return name.strip() or "beleg" def download_receipt(rid: int): rec = fetch_receipt_by_id(rid) if not rec: set_text("table_msg", "Beleg nicht gefunden.", danger=True) return window.downloadBase64File(rec["filename"], rec["mime"], rec["content_b64"]) async def delete_tx(tid: int): CONN.execute("DELETE FROM transactions WHERE id = ? AND user_id = ?", (tid, CURRENT_USER["id"])) CONN.commit() await save_db_bytes(DB_PATH, CONN) set_text("table_msg", "Buchung gelöscht & DB gespeichert ✅", ok=True) await rebuild_year_select(select_year=int(get_value("year_select") or date.today().year)) await refresh_dashboard() def render_table(txs): tbody = el("tx_table") while tbody.firstChild: tbody.removeChild(tbody.firstChild) set_text("table_msg", f"{len(txs)} Buchung(en) gefunden.", ok=True) for x in txs: tr = DOM.createElement("tr") def td_text(text, right=False): td = DOM.createElement("td") td.innerText = text if right: td.classList.add("right") return td tr.appendChild(td_text(x["t_date"])) ttype = "Einnahme" if x["t_type"] == "income" else "Ausgabe" tr.appendChild(td_text(ttype)) tr.appendChild(td_text(x["category"])) tr.appendChild(td_text(eur(x["amount"]), right=True)) tr.appendChild(td_text(x["description"] or "")) td_receipt = DOM.createElement("td") if x["receipt_id"]: btn = DOM.createElement("button") btn.classList.add("smallbtn") btn.innerText = "Download" rid = int(x["receipt_id"]) proxy = create_proxy(lambda evt, rid=rid: download_receipt(rid)) PROXIES.append(proxy) btn.addEventListener("click", proxy) td_receipt.appendChild(btn) fn = DOM.createElement("div") fn.classList.add("muted") fn.innerText = x["receipt_filename"] td_receipt.appendChild(fn) else: td_receipt.innerText = "—" td_receipt.classList.add("muted") tr.appendChild(td_receipt) td_del = DOM.createElement("td") btn_del = DOM.createElement("button") btn_del.classList.add("smallbtn") btn_del.innerText = "Löschen" tid = int(x["id"]) proxy_del = create_proxy(lambda evt, tid=tid: asyncio.create_task(delete_tx(tid))) PROXIES.append(proxy_del) btn_del.addEventListener("click", proxy_del) td_del.appendChild(btn_del) tr.appendChild(td_del) tbody.appendChild(tr) async def add_transaction(evt=None): reset_msgs() if not CURRENT_USER: set_text("tx_msg", "Nicht eingeloggt.", danger=True) return t_date = get_value("t_date") or date.today().isoformat() t_type = get_value("t_type") category = (get_value("t_category").strip() or "Ohne Kategorie") desc = get_value("t_desc").strip() amount_str = get_value("t_amount").strip() try: amount = float(amount_str) except Exception: set_text("tx_msg", "Betrag ist ungültig.", danger=True) return if amount <= 0: set_text("tx_msg", "Betrag muss > 0 sein.", danger=True) return receipt_id = None files = el("t_receipt").files if files and files.length > 0: f = files.item(0) mime = f.type or "application/octet-stream" filename = f.name or "beleg" buf = await f.arrayBuffer() u8 = window.Uint8Array.new(buf) raw = bytes([int(u8[i]) for i in range(u8.length)]) b64 = base64.b64encode(raw).decode("ascii") cur = CONN.execute( "INSERT INTO receipts(filename, mime, content_b64, created_at) VALUES(?,?,?,?)", (filename, mime, b64, now_iso()) ) receipt_id = cur.lastrowid CONN.execute( """INSERT INTO transactions(user_id, t_date, t_type, category, amount, description, receipt_id, created_at) VALUES(?,?,?,?,?,?,?,?)""", (CURRENT_USER["id"], t_date, t_type, category, amount, (desc if desc else None), receipt_id, now_iso()) ) CONN.commit() await save_db_bytes(DB_PATH, CONN) el("t_amount").value = "" el("t_desc").value = "" el("t_receipt").value = "" set_text("tx_msg", "Gespeichert & DB gespeichert ✅", ok=True) y = int(t_date[:4]) await rebuild_year_select(select_year=y) set_year_range(y) await refresh_dashboard() # ---------------------------- # CSV / ZIP Export # ---------------------------- def build_csv_transactions(txs): out = StringIO() w = csv.writer(out, delimiter=";") w.writerow(["id","datum","typ","kategorie","betrag","beschreibung","receipt_id","beleg_dateiname"]) for x in txs: w.writerow([ x["id"], x["t_date"], x["t_type"], x["category"], f"{x['amount']:.2f}".replace(".", ","), x["description"], (x["receipt_id"] if x.get("receipt_id") is not None else ""), x["receipt_filename"] ]) return out.getvalue() def build_csv_eur_report(): f_from, f_to = parse_filters() cur = CONN.execute( """SELECT substr(t_date, 1, 7) as month, category, t_type, SUM(amount) as total FROM transactions WHERE user_id = ? AND t_date BETWEEN ? AND ? GROUP BY substr(t_date, 1, 7), category, t_type ORDER BY month, category, t_type""", (CURRENT_USER["id"], f_from, f_to) ) out = StringIO() w = csv.writer(out, delimiter=";") w.writerow(["monat","kategorie","typ","summe"]) for m, cat, ttype, total in cur.fetchall(): w.writerow([m, cat, ttype, f"{float(total):.2f}".replace(".", ",")]) return out.getvalue() async def download_csv_tx(evt=None): txs = fetch_transactions() window.downloadTextFile("buchungen.csv", build_csv_transactions(txs)) async def download_csv_eur(evt=None): window.downloadTextFile("eur_report.csv", build_csv_eur_report()) async def export_zip_with_receipts(evt=None): reset_msgs() if not CURRENT_USER: set_text("table_msg", "Nicht eingeloggt.", danger=True) return txs = fetch_transactions() csv_text = build_csv_transactions(txs) eur_text = build_csv_eur_report() files = [ {"path": "csv/buchungen.csv", "content": csv_text, "isBase64": False}, {"path": "csv/eur_report.csv", "content": eur_text, "isBase64": False}, ] added = 0 missing = 0 for x in txs: rid = x.get("receipt_id") if not rid: continue rec = fetch_receipt_by_id(int(rid)) if not rec: missing += 1 continue fn = safe_filename(rec["filename"]) files.append({ "path": f"receipts/{rid}_{fn}", "content": rec["content_b64"], # base64 "isBase64": True }) added += 1 y = "" try: y = get_value("year_select") except Exception: y = "" zip_name = f"eur_export_{y}.zip" if y else "eur_export.zip" # ✅ Übergabe als JSON-String (robust) payload = json.dumps(files, ensure_ascii=False) set_text( "table_msg", f"ZIP wird erstellt… Buchungen: {len(txs)}, CSV-Bytes: {len(csv_text.encode('utf-8'))}, Belege: {added}, fehlend: {missing}", ok=True ) await window.downloadZipFromJson(zip_name, payload) set_text("table_msg", f"ZIP Export OK ✅ (Belege: {added}, fehlend: {missing})", ok=True) async def refresh_dashboard(evt=None): if not CURRENT_USER: return txs = fetch_transactions() inc = sum(x["amount"] for x in txs if x["t_type"] == "income") exp = sum(x["amount"] for x in txs if x["t_type"] == "expense") net = inc - exp el("sum_income").innerText = eur(inc) el("sum_expense").innerText = eur(exp) el("sum_net").innerText = eur(net) el("sum_net").className = ("ok" if net >= 0 else "danger") render_table(txs) # ---------------------------- # Backup / Restore (DB Datei) # ---------------------------- async def export_db(evt=None): reset_msgs() await save_db_bytes(DB_PATH, CONN) raw = await load_db_bytes() if not raw: set_text("backup_msg", "Keine DB zum Export gefunden.", danger=True) return u8 = window.Uint8Array.new(list(raw)) window.downloadBinaryFile("eur.sqlite", u8) set_text("backup_msg", f"Export OK ({len(raw)} Bytes).", ok=True) async def import_db(evt=None): reset_msgs() files = el("import_db_file").files if not files or files.length == 0: set_text("backup_msg", "Bitte eine .sqlite Datei auswählen.", danger=True) return f = files.item(0) buf = await f.arrayBuffer() u8 = window.Uint8Array.new(buf) raw = bytes([int(u8[i]) for i in range(u8.length)]) if not raw.startswith(b"SQLite format 3"): set_text("backup_msg", "Import abgebrochen: Datei ist keine SQLite DB (Header fehlt).", danger=True) return await storage_set(DB_KEY, base64.b64encode(raw).decode("ascii")) await reopen_db_from_storage() await clear_session() global CURRENT_USER CURRENT_USER = None show("app_view", False) show("login_view", True) set_text("login_msg", "DB importiert ✅ Bitte neu einloggen.", ok=True) set_text("backup_msg", f"Import OK ({len(raw)} Bytes).", ok=True) async def reset_db(evt=None): reset_msgs() await storage_del(DB_KEY) await clear_session() global CURRENT_USER CURRENT_USER = None try: if CONN is not None: CONN.close() except Exception: pass set_text("backup_msg", "DB gelöscht. Seite neu laden oder neu registrieren.", ok=True) show("app_view", False) show("login_view", True) # ---------------------------- # Login/Register/Logout # ---------------------------- async def do_register(evt=None): reset_msgs() username = get_value("username").strip() password = get_value("password") if len(username) < 3 or len(password) < 6: set_text("login_msg", "Benutzername min. 3 Zeichen, Passwort min. 6 Zeichen.", danger=True) return if get_user_by_username(CONN, username): set_text("login_msg", "Benutzername existiert bereits.", danger=True) return create_user(CONN, username, password) await save_db_bytes(DB_PATH, CONN) set_text("login_msg", f"Registriert: {username}. Jetzt Login.", ok=True) async def do_login(evt=None): reset_msgs() username = get_value("username").strip() password = get_value("password") if not username or not password: set_text("login_msg", "Bitte Benutzername und Passwort eingeben.", danger=True) return user = get_user_by_username(CONN, username) if not user: set_text("login_msg", "Unbekannter Benutzer. Bitte registrieren.", danger=True) return salt = base64.b64decode(user["salt_b64"]) expected = base64.b64decode(user["pw_hash_b64"]) got = pbkdf2_hash(password, salt) if not consteq(expected, got): set_text("login_msg", "Passwort falsch.", danger=True) return global CURRENT_USER CURRENT_USER = {"id": user["id"], "username": user["username"]} await set_session(CURRENT_USER["username"]) show("login_view", False) show("app_view", True) render_whoami() set_default_dates() await rebuild_year_select(select_year=date.today().year) await refresh_dashboard() async def do_logout(evt=None): global CURRENT_USER CURRENT_USER = None await clear_session() show("app_view", False) show("login_view", True) set_text("login_msg", "Abgemeldet.", ok=True) # ---------------------------- # Startup # ---------------------------- async def main(): reset_msgs() ok_sqlite = await ensure_sqlite3() if not ok_sqlite: return global CONN, DB_PATH, CURRENT_USER CONN, DB_PATH = await open_db() bind_click("btn_login", do_login) bind_click("btn_register", do_register) bind_click("btn_logout", do_logout) bind_click("btn_add_tx", add_transaction) bind_click("btn_refresh", refresh_dashboard) bind_click("btn_csv_tx", download_csv_tx) bind_click("btn_csv_eur", download_csv_eur) bind_click("btn_zip_export", export_zip_with_receipts) bind_change("year_select", on_year_change) bind_click("btn_export_db", export_db) bind_click("btn_import_db", import_db) bind_click("btn_reset_db", reset_db) set_text("tech_msg", "Init OK ✅ (Jahresfilter + DB Export/Import + ZIP Export via JSON aktiv).", ok=True) username = await get_session_user() if username: user = get_user_by_username(CONN, username) if user: CURRENT_USER = {"id": user["id"], "username": user["username"]} show("login_view", False) show("app_view", True) render_whoami() set_default_dates() await rebuild_year_select(select_year=date.today().year) await refresh_dashboard() else: await clear_session() asyncio.create_task(main())