PyScript EÜR
Login + SQLite + Belege + EÜR + CSV + Jahresfilter + DB Export/Import + ZIP Export (CSV+Belege).
Login / Registrierung
Benutzername
Passwort
Login
Registrieren
Dashboard
Angemeldet als:
Logout
Backup / Restore (DB)
DB exportieren (.sqlite)
Exportiert die komplette lokale Datenbank.
DB importieren (.sqlite)
Import starten
Überschreibt die lokale DB im Browser.
DB zurücksetzen
Löscht lokale DB + Session.
Neue Buchung
Datum
Typ
Einnahme
Ausgabe
Kategorie
Betrag (EUR)
Beschreibung
Beleg (optional)
Beleg wird Base64 in SQLite gespeichert (Demo).
Speichern
Filter / Auswertung
Jahr
Wechselt automatisch Von/Bis und aktualisiert.
Von
Bis
Aktualisieren
Einnahmen
0,00 €
Ausgaben
0,00 €
Überschuss
0,00 €
CSV: Buchungen
CSV: EÜR-Report (Monat+Kategorie)
ZIP: Buchungen + Belege
ZIP enthält: csv/buchungen.csv + csv/eur_report.csv + receipts/...
Buchungen
Datum
Typ
Kategorie
Betrag
Beschreibung
Beleg
import asyncio import base64, hashlib, hmac, os, csv, json from io import StringIO from datetime import date, datetime from pyscript import window from pyscript import storage, py_import from pyodide.ffi import create_proxy DOM = window.document def el(id_: str): return DOM.getElementById(id_) DB_KEY = "eur_sqlite_db_b64" SESSION_KEY = "eur_session_user" PROXIES = [] # ---------------------------- # UI helpers # ---------------------------- def set_text(id_: str, txt: str, danger=False, ok=False): e = el(id_) e.innerText = txt cls = "muted" if danger: cls += " danger" if ok: cls += " ok" e.className = cls def show(id_: str, visible: bool): e = el(id_) if visible: e.classList.remove("hidden") else: e.classList.add("hidden") def get_value(id_: str) -> str: return el(id_).value def reset_msgs(): set_text("login_msg", "") set_text("tx_msg", "") set_text("table_msg", "") set_text("backup_msg", "") def eur(x: float) -> str: s = f"{x:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") return f"{s} €" def now_iso() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") # ---------------------------- # Pure-Python PBKDF2-HMAC-SHA256 # ---------------------------- def pbkdf2_hmac_sha256(password: bytes, salt: bytes, iterations: int, dklen: int = 32) -> bytes: hlen = 32 l = (dklen + hlen - 1) // hlen dk = b"" for i in range(1, l + 1): u = hmac.new(password, salt + i.to_bytes(4, "big"), hashlib.sha256).digest() t = u for _ in range(iterations - 1): u = hmac.new(password, u, hashlib.sha256).digest() t = bytes(a ^ b for a, b in zip(t, u)) dk += t return dk[:dklen] def pbkdf2_hash(password: str, salt: bytes, iterations: int = 120_000) -> bytes: return pbkdf2_hmac_sha256(password.encode("utf-8"), salt, iterations, dklen=32) def consteq(a: bytes, b: bytes) -> bool: return hmac.compare_digest(a, b) # ---------------------------- # Storage # ---------------------------- async def storage_get(key: str): store = await storage("pyscript-eur-demo") return store.get(key) async def storage_set(key: str, value): store = await storage("pyscript-eur-demo") store[key] = value async def storage_del(key: str): store = await storage("pyscript-eur-demo") try: del store[key] except Exception: pass # ---------------------------- # sqlite3 laden # ---------------------------- sqlite3 = None try: import sqlite3 as _sqlite3 sqlite3 = _sqlite3 except Exception: sqlite3 = None async def ensure_sqlite3(): global sqlite3 if sqlite3 is not None: return True try: sqlite3_mod, = await py_import("sqlite3") sqlite3 = sqlite3_mod return True except Exception as e: set_text("tech_msg", f"sqlite3 nicht ladbar: {type(e).__name__}: {e}", danger=True) return False # ---------------------------- # SQLite + Persistenz # ---------------------------- def create_conn(db_path: str): conn = sqlite3.connect(db_path) conn.execute("PRAGMA journal_mode=DELETE;") conn.execute("PRAGMA synchronous=FULL;") conn.execute("PRAGMA foreign_keys=ON;") return conn def init_schema(conn): conn.executescript(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, salt_b64 TEXT NOT NULL, pw_hash_b64 TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS receipts ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT, mime TEXT, content_b64 TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS transactions ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, t_date TEXT NOT NULL, t_type TEXT NOT NULL, category TEXT NOT NULL, amount REAL NOT NULL, description TEXT, receipt_id INTEGER, created_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE, FOREIGN KEY(receipt_id) REFERENCES receipts(id) ON DELETE SET NULL ); """) conn.commit() async def load_db_bytes(): b64 = await storage_get(DB_KEY) if not b64: return None try: return base64.b64decode(b64) except Exception: return None async def save_db_bytes(db_path: str, conn=None): try: if conn is not None: conn.commit() except Exception: pass with open(db_path, "rb") as f: raw = f.read() await storage_set(DB_KEY, base64.b64encode(raw).decode("ascii")) async def open_db(): db_path = "eur.sqlite" raw = await load_db_bytes() if raw is None: conn = create_conn(db_path) init_schema(conn) await save_db_bytes(db_path, conn) return conn, db_path with open(db_path, "wb") as f: f.write(raw) conn = create_conn(db_path) init_schema(conn) return conn, db_path async def reopen_db_from_storage(): global CONN, DB_PATH try: if CONN is not None: CONN.close() except Exception: pass CONN, DB_PATH = await open_db() CONN = None DB_PATH = None CURRENT_USER = None # ---------------------------- # Auth # ---------------------------- def get_user_by_username(conn, username: str): cur = conn.execute("SELECT id, username, salt_b64, pw_hash_b64 FROM users WHERE username = ?", (username,)) row = cur.fetchone() if not row: return None return {"id": row[0], "username": row[1], "salt_b64": row[2], "pw_hash_b64": row[3]} def create_user(conn, username: str, password: str): salt = os.urandom(16) pw_hash = pbkdf2_hash(password, salt) conn.execute( "INSERT INTO users(username, salt_b64, pw_hash_b64, created_at) VALUES(?,?,?,?)", (username, base64.b64encode(salt).decode("ascii"), base64.b64encode(pw_hash).decode("ascii"), now_iso()) ) conn.commit() async def set_session(username: str): await storage_set(SESSION_KEY, username) async def clear_session(): await storage_del(SESSION_KEY) async def get_session_user(): return await storage_get(SESSION_KEY) def render_whoami(): el("whoami").innerText = CURRENT_USER["username"] if CURRENT_USER else "" def set_year_range(year: int): el("f_from").value = f"{year}-01-01" el("f_to").value = f"{year}-12-31" def set_default_dates(): today = date.today().isoformat() el("t_date").value = today set_year_range(date.today().year) # ---------------------------- # Event binding # ---------------------------- def bind_click(id_: str, async_func): def _py_handler(evt): asyncio.create_task(_safe_call(async_func, evt)) proxy = create_proxy(_py_handler) PROXIES.append(proxy) el(id_).addEventListener("click", proxy) def bind_change(id_: str, async_func): def _py_handler(evt): asyncio.create_task(_safe_call(async_func, evt)) proxy = create_proxy(_py_handler) PROXIES.append(proxy) el(id_).addEventListener("change", proxy) async def _safe_call(async_func, evt): try: await async_func(evt) except Exception as e: set_text("tech_msg", f"Fehler: {type(e).__name__}: {e}", danger=True) raise # ---------------------------- # Year filter # ---------------------------- def fetch_years_for_user(user_id: int): cur = CONN.execute( "SELECT DISTINCT substr(t_date, 1, 4) AS y FROM transactions WHERE user_id = ? ORDER BY y DESC", (user_id,) ) years = [int(r[0]) for r in cur.fetchall() if r and r[0]] return years async def rebuild_year_select(select_year: int | None = None): sel = el("year_select") while sel.firstChild: sel.removeChild(sel.firstChild) years = fetch_years_for_user(CURRENT_USER["id"]) cy = date.today().year if cy not in years: years = [cy] + years if not years: years = [cy] for y in years: opt = DOM.createElement("option") opt.value = str(y) opt.innerText = str(y) sel.appendChild(opt) if select_year is None: select_year = years[0] sel.value = str(select_year) async def on_year_change(evt=None): y = int(get_value("year_select")) set_year_range(y) await refresh_dashboard() # ---------------------------- # Transactions / EÜR # ---------------------------- def parse_filters(): f_from = get_value("f_from") or "1900-01-01" f_to = get_value("f_to") or "2999-12-31" return f_from, f_to def fetch_transactions(): f_from, f_to = parse_filters() cur = CONN.execute( """SELECT t.id, t.t_date, t.t_type, t.category, t.amount, t.description, r.id as receipt_id, r.filename, r.mime FROM transactions t LEFT JOIN receipts r ON r.id = t.receipt_id WHERE t.user_id = ? AND t.t_date BETWEEN ? AND ? ORDER BY t.t_date DESC, t.id DESC""", (CURRENT_USER["id"], f_from, f_to) ) rows = cur.fetchall() out = [] for r in rows: out.append({ "id": r[0], "t_date": r[1], "t_type": r[2], "category": r[3], "amount": float(r[4]), "description": r[5] or "", "receipt_id": r[6], "receipt_filename": r[7] or "", "receipt_mime": r[8] or "" }) return out def fetch_receipt_by_id(rid: int): cur = CONN.execute("SELECT filename, mime, content_b64 FROM receipts WHERE id = ?", (rid,)) row = cur.fetchone() if not row: return None return {"filename": row[0] or f"beleg_{rid}", "mime": row[1] or "application/octet-stream", "content_b64": row[2]} def safe_filename(name: str) -> str: bad = ['\\', '/', ':', '*', '?', '"', '<', '>', '|'] for ch in bad: name = name.replace(ch, "_") return name.strip() or "beleg" def download_receipt(rid: int): rec = fetch_receipt_by_id(rid) if not rec: set_text("table_msg", "Beleg nicht gefunden.", danger=True) return window.downloadBase64File(rec["filename"], rec["mime"], rec["content_b64"]) async def delete_tx(tid: int): CONN.execute("DELETE FROM transactions WHERE id = ? AND user_id = ?", (tid, CURRENT_USER["id"])) CONN.commit() await save_db_bytes(DB_PATH, CONN) set_text("table_msg", "Buchung gelöscht & DB gespeichert ✅", ok=True) await rebuild_year_select(select_year=int(get_value("year_select") or date.today().year)) await refresh_dashboard() def render_table(txs): tbody = el("tx_table") while tbody.firstChild: tbody.removeChild(tbody.firstChild) set_text("table_msg", f"{len(txs)} Buchung(en) gefunden.", ok=True) for x in txs: tr = DOM.createElement("tr") def td_text(text, right=False): td = DOM.createElement("td") td.innerText = text if right: td.classList.add("right") return td tr.appendChild(td_text(x["t_date"])) ttype = "Einnahme" if x["t_type"] == "income" else "Ausgabe" tr.appendChild(td_text(ttype)) tr.appendChild(td_text(x["category"])) tr.appendChild(td_text(eur(x["amount"]), right=True)) tr.appendChild(td_text(x["description"] or "")) td_receipt = DOM.createElement("td") if x["receipt_id"]: btn = DOM.createElement("button") btn.classList.add("smallbtn") btn.innerText = "Download" rid = int(x["receipt_id"]) proxy = create_proxy(lambda evt, rid=rid: download_receipt(rid)) PROXIES.append(proxy) btn.addEventListener("click", proxy) td_receipt.appendChild(btn) fn = DOM.createElement("div") fn.classList.add("muted") fn.innerText = x["receipt_filename"] td_receipt.appendChild(fn) else: td_receipt.innerText = "—" td_receipt.classList.add("muted") tr.appendChild(td_receipt) td_del = DOM.createElement("td") btn_del = DOM.createElement("button") btn_del.classList.add("smallbtn") btn_del.innerText = "Löschen" tid = int(x["id"]) proxy_del = create_proxy(lambda evt, tid=tid: asyncio.create_task(delete_tx(tid))) PROXIES.append(proxy_del) btn_del.addEventListener("click", proxy_del) td_del.appendChild(btn_del) tr.appendChild(td_del) tbody.appendChild(tr) async def add_transaction(evt=None): reset_msgs() if not CURRENT_USER: set_text("tx_msg", "Nicht eingeloggt.", danger=True) return t_date = get_value("t_date") or date.today().isoformat() t_type = get_value("t_type") category = (get_value("t_category").strip() or "Ohne Kategorie") desc = get_value("t_desc").strip() amount_str = get_value("t_amount").strip() try: amount = float(amount_str) except Exception: set_text("tx_msg", "Betrag ist ungültig.", danger=True) return if amount <= 0: set_text("tx_msg", "Betrag muss > 0 sein.", danger=True) return receipt_id = None files = el("t_receipt").files if files and files.length > 0: f = files.item(0) mime = f.type or "application/octet-stream" filename = f.name or "beleg" buf = await f.arrayBuffer() u8 = window.Uint8Array.new(buf) raw = bytes([int(u8[i]) for i in range(u8.length)]) b64 = base64.b64encode(raw).decode("ascii") cur = CONN.execute( "INSERT INTO receipts(filename, mime, content_b64, created_at) VALUES(?,?,?,?)", (filename, mime, b64, now_iso()) ) receipt_id = cur.lastrowid CONN.execute( """INSERT INTO transactions(user_id, t_date, t_type, category, amount, description, receipt_id, created_at) VALUES(?,?,?,?,?,?,?,?)""", (CURRENT_USER["id"], t_date, t_type, category, amount, (desc if desc else None), receipt_id, now_iso()) ) CONN.commit() await save_db_bytes(DB_PATH, CONN) el("t_amount").value = "" el("t_desc").value = "" el("t_receipt").value = "" set_text("tx_msg", "Gespeichert & DB gespeichert ✅", ok=True) y = int(t_date[:4]) await rebuild_year_select(select_year=y) set_year_range(y) await refresh_dashboard() # ---------------------------- # CSV / ZIP Export # ---------------------------- def build_csv_transactions(txs): out = StringIO() w = csv.writer(out, delimiter=";") w.writerow(["id","datum","typ","kategorie","betrag","beschreibung","receipt_id","beleg_dateiname"]) for x in txs: w.writerow([ x["id"], x["t_date"], x["t_type"], x["category"], f"{x['amount']:.2f}".replace(".", ","), x["description"], (x["receipt_id"] if x.get("receipt_id") is not None else ""), x["receipt_filename"] ]) return out.getvalue() def build_csv_eur_report(): f_from, f_to = parse_filters() cur = CONN.execute( """SELECT substr(t_date, 1, 7) as month, category, t_type, SUM(amount) as total FROM transactions WHERE user_id = ? AND t_date BETWEEN ? AND ? GROUP BY substr(t_date, 1, 7), category, t_type ORDER BY month, category, t_type""", (CURRENT_USER["id"], f_from, f_to) ) out = StringIO() w = csv.writer(out, delimiter=";") w.writerow(["monat","kategorie","typ","summe"]) for m, cat, ttype, total in cur.fetchall(): w.writerow([m, cat, ttype, f"{float(total):.2f}".replace(".", ",")]) return out.getvalue() async def download_csv_tx(evt=None): txs = fetch_transactions() window.downloadTextFile("buchungen.csv", build_csv_transactions(txs)) async def download_csv_eur(evt=None): window.downloadTextFile("eur_report.csv", build_csv_eur_report()) async def export_zip_with_receipts(evt=None): reset_msgs() if not CURRENT_USER: set_text("table_msg", "Nicht eingeloggt.", danger=True) return txs = fetch_transactions() csv_text = build_csv_transactions(txs) eur_text = build_csv_eur_report() files = [ {"path": "csv/buchungen.csv", "content": csv_text, "isBase64": False}, {"path": "csv/eur_report.csv", "content": eur_text, "isBase64": False}, ] added = 0 missing = 0 for x in txs: rid = x.get("receipt_id") if not rid: continue rec = fetch_receipt_by_id(int(rid)) if not rec: missing += 1 continue fn = safe_filename(rec["filename"]) files.append({ "path": f"receipts/{rid}_{fn}", "content": rec["content_b64"], # base64 "isBase64": True }) added += 1 y = "" try: y = get_value("year_select") except Exception: y = "" zip_name = f"eur_export_{y}.zip" if y else "eur_export.zip" # ✅ Übergabe als JSON-String (robust) payload = json.dumps(files, ensure_ascii=False) set_text( "table_msg", f"ZIP wird erstellt… Buchungen: {len(txs)}, CSV-Bytes: {len(csv_text.encode('utf-8'))}, Belege: {added}, fehlend: {missing}", ok=True ) await window.downloadZipFromJson(zip_name, payload) set_text("table_msg", f"ZIP Export OK ✅ (Belege: {added}, fehlend: {missing})", ok=True) async def refresh_dashboard(evt=None): if not CURRENT_USER: return txs = fetch_transactions() inc = sum(x["amount"] for x in txs if x["t_type"] == "income") exp = sum(x["amount"] for x in txs if x["t_type"] == "expense") net = inc - exp el("sum_income").innerText = eur(inc) el("sum_expense").innerText = eur(exp) el("sum_net").innerText = eur(net) el("sum_net").className = ("ok" if net >= 0 else "danger") render_table(txs) # ---------------------------- # Backup / Restore (DB Datei) # ---------------------------- async def export_db(evt=None): reset_msgs() await save_db_bytes(DB_PATH, CONN) raw = await load_db_bytes() if not raw: set_text("backup_msg", "Keine DB zum Export gefunden.", danger=True) return u8 = window.Uint8Array.new(list(raw)) window.downloadBinaryFile("eur.sqlite", u8) set_text("backup_msg", f"Export OK ({len(raw)} Bytes).", ok=True) async def import_db(evt=None): reset_msgs() files = el("import_db_file").files if not files or files.length == 0: set_text("backup_msg", "Bitte eine .sqlite Datei auswählen.", danger=True) return f = files.item(0) buf = await f.arrayBuffer() u8 = window.Uint8Array.new(buf) raw = bytes([int(u8[i]) for i in range(u8.length)]) if not raw.startswith(b"SQLite format 3"): set_text("backup_msg", "Import abgebrochen: Datei ist keine SQLite DB (Header fehlt).", danger=True) return await storage_set(DB_KEY, base64.b64encode(raw).decode("ascii")) await reopen_db_from_storage() await clear_session() global CURRENT_USER CURRENT_USER = None show("app_view", False) show("login_view", True) set_text("login_msg", "DB importiert ✅ Bitte neu einloggen.", ok=True) set_text("backup_msg", f"Import OK ({len(raw)} Bytes).", ok=True) async def reset_db(evt=None): reset_msgs() await storage_del(DB_KEY) await clear_session() global CURRENT_USER CURRENT_USER = None try: if CONN is not None: CONN.close() except Exception: pass set_text("backup_msg", "DB gelöscht. Seite neu laden oder neu registrieren.", ok=True) show("app_view", False) show("login_view", True) # ---------------------------- # Login/Register/Logout # ---------------------------- async def do_register(evt=None): reset_msgs() username = get_value("username").strip() password = get_value("password") if len(username) < 3 or len(password) < 6: set_text("login_msg", "Benutzername min. 3 Zeichen, Passwort min. 6 Zeichen.", danger=True) return if get_user_by_username(CONN, username): set_text("login_msg", "Benutzername existiert bereits.", danger=True) return create_user(CONN, username, password) await save_db_bytes(DB_PATH, CONN) set_text("login_msg", f"Registriert: {username}. Jetzt Login.", ok=True) async def do_login(evt=None): reset_msgs() username = get_value("username").strip() password = get_value("password") if not username or not password: set_text("login_msg", "Bitte Benutzername und Passwort eingeben.", danger=True) return user = get_user_by_username(CONN, username) if not user: set_text("login_msg", "Unbekannter Benutzer. Bitte registrieren.", danger=True) return salt = base64.b64decode(user["salt_b64"]) expected = base64.b64decode(user["pw_hash_b64"]) got = pbkdf2_hash(password, salt) if not consteq(expected, got): set_text("login_msg", "Passwort falsch.", danger=True) return global CURRENT_USER CURRENT_USER = {"id": user["id"], "username": user["username"]} await set_session(CURRENT_USER["username"]) show("login_view", False) show("app_view", True) render_whoami() set_default_dates() await rebuild_year_select(select_year=date.today().year) await refresh_dashboard() async def do_logout(evt=None): global CURRENT_USER CURRENT_USER = None await clear_session() show("app_view", False) show("login_view", True) set_text("login_msg", "Abgemeldet.", ok=True) # ---------------------------- # Startup # ---------------------------- async def main(): reset_msgs() ok_sqlite = await ensure_sqlite3() if not ok_sqlite: return global CONN, DB_PATH, CURRENT_USER CONN, DB_PATH = await open_db() bind_click("btn_login", do_login) bind_click("btn_register", do_register) bind_click("btn_logout", do_logout) bind_click("btn_add_tx", add_transaction) bind_click("btn_refresh", refresh_dashboard) bind_click("btn_csv_tx", download_csv_tx) bind_click("btn_csv_eur", download_csv_eur) bind_click("btn_zip_export", export_zip_with_receipts) bind_change("year_select", on_year_change) bind_click("btn_export_db", export_db) bind_click("btn_import_db", import_db) bind_click("btn_reset_db", reset_db) set_text("tech_msg", "Init OK ✅ (Jahresfilter + DB Export/Import + ZIP Export via JSON aktiv).", ok=True) username = await get_session_user() if username: user = get_user_by_username(CONN, username) if user: CURRENT_USER = {"id": user["id"], "username": user["username"]} show("login_view", False) show("app_view", True) render_whoami() set_default_dates() await rebuild_year_select(select_year=date.today().year) await refresh_dashboard() else: await clear_session() asyncio.create_task(main())