Files
justvitamin/app.py
Omair Saleh 21f67d39c7 feat: add simple password auth gate
- Flask session-based login with styled dark-theme login page
- All routes gated behind password (configurable via SITE_PASSWORD env)
- /login and /api/health are public
- Wrong password shows red error, correct redirects to original page
- 30-day session persistence
- /logout to clear session
- Password: jv2026 (set in docker-compose.yml)
2026-03-02 23:04:22 +08:00

338 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""JustVitamin × QuikCue — AI Content Engine
Live proposal site with real Gemini-powered demos.
PostgreSQL-backed data dashboard."""
import os, json, time, traceback, logging, secrets
from pathlib import Path
from functools import wraps
from flask import (Flask, render_template, jsonify, request,
send_from_directory, redirect, session, url_for,
make_response)
from scraper import scrape_product, scrape_competitor
from ai_engine import (generate_asset_pack, competitor_xray, pdp_surgeon,
optimise_pdp_copy, generate_product_images)
from db import init_db, get_dashboard_data, get_meta
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
app = Flask(__name__,
static_folder="static",
template_folder="templates")
app.secret_key = os.environ.get("SECRET_KEY", secrets.token_hex(32))
SITE_PASSWORD = os.environ.get("SITE_PASSWORD", "jv2026")
GEN_DIR = Path(__file__).parent / "generated"
GEN_DIR.mkdir(exist_ok=True)
# ── Init DB on startup ───────────────────────────────────────
with app.app_context():
try:
init_db()
except Exception as e:
logging.error(f"DB init failed: {e}")
# ── Auth ─────────────────────────────────────────────────────
PUBLIC_PATHS = {"/login", "/api/health"}
@app.before_request
def require_auth():
"""Gate every request behind a simple password session."""
path = request.path
# Allow login page, health check, and static login assets
if path in PUBLIC_PATHS or path.startswith("/static/"):
return
if not session.get("authed"):
if request.is_json:
return jsonify({"error": "Authentication required"}), 401
return redirect(url_for("login", next=request.path))
LOGIN_HTML = """<!DOCTYPE html>
<html lang="en"><head>
<meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
<title>Login — JustVitamins × QuikCue</title>
<style>
*{margin:0;padding:0;box-sizing:border-box}
body{min-height:100vh;display:flex;align-items:center;justify-content:center;
background:#0a0e1a;font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',sans-serif;color:#e0e6f0}
.card{background:rgba(255,255,255,.04);border:1px solid rgba(255,255,255,.08);
border-radius:16px;padding:48px 40px;max-width:400px;width:100%;text-align:center}
.card h1{font-size:1.5rem;margin-bottom:8px;background:linear-gradient(135deg,#22d3ee,#a78bfa);
-webkit-background-clip:text;-webkit-text-fill-color:transparent}
.card p{font-size:.85rem;color:#8892a8;margin-bottom:28px}
.field{position:relative;margin-bottom:20px}
.field input{width:100%;padding:14px 16px;background:rgba(255,255,255,.06);
border:1px solid rgba(255,255,255,.12);border-radius:10px;color:#e0e6f0;
font-size:1rem;outline:none;transition:border .2s}
.field input:focus{border-color:#22d3ee}
.field input::placeholder{color:#5a6478}
button{width:100%;padding:14px;border:none;border-radius:10px;cursor:pointer;
font-size:1rem;font-weight:600;color:#fff;
background:linear-gradient(135deg,#7c3aed,#6d28d9);transition:opacity .2s}
button:hover{opacity:.9}
.err{color:#f87171;font-size:.85rem;margin-bottom:16px}
.footer{margin-top:24px;font-size:.75rem;color:#4a5568}
</style>
</head><body>
<div class="card">
<h1>JustVitamins × QuikCue</h1>
<p>Confidential proposal — enter password to continue</p>
{{ERR}}
<form method="post">
<div class="field">
<input type="password" name="password" placeholder="Password" autofocus autocomplete="current-password">
</div>
<button type="submit">Enter</button>
</form>
<div class="footer">Prepared for Just Vitamins · March 2026</div>
</div>
</body></html>"""
@app.route("/login", methods=["GET", "POST"])
def login():
err = ""
if request.method == "POST":
pw = request.form.get("password", "")
if pw == SITE_PASSWORD:
session["authed"] = True
session.permanent = True
app.permanent_session_lifetime = __import__("datetime").timedelta(days=30)
dest = request.args.get("next", "/")
return redirect(dest)
err = '<div class="err">Wrong password</div>'
html = LOGIN_HTML.replace("{{ERR}}", err)
return make_response(html)
@app.route("/logout")
def logout():
session.clear()
return redirect("/login")
# ── Page routes ──────────────────────────────────────────────
@app.route("/")
def index():
return render_template("index.html")
@app.route("/dashboard")
@app.route("/dashboard/")
def dashboard():
return redirect("/dashboard/index.html")
@app.route("/dashboard/<path:filename>")
def dashboard_files(filename):
return send_from_directory("static/dashboard", filename)
# ── Dashboard API (Postgres-backed) ─────────────────────────
@app.route("/api/dashboard/data")
def api_dashboard_data():
"""Return filtered dashboard data from PostgreSQL.
Query params: ?start=YYYY-MM&end=YYYY-MM
Replaces the 4MB static JSON with fast server-side SQL filtering."""
start = request.args.get("start") or None
end = request.args.get("end") or None
try:
t0 = time.time()
data = get_dashboard_data(start, end)
data["_query_time"] = f"{time.time()-t0:.3f}s"
data["_source"] = "postgresql"
resp = jsonify(data)
resp.headers['Cache-Control'] = 'public, max-age=60'
return resp
except Exception as e:
traceback.print_exc()
return jsonify({"error": str(e)}), 500
@app.route("/api/dashboard/meta")
def api_dashboard_meta():
"""Return just the meta info (fast, tiny)."""
try:
return jsonify(get_meta())
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/proposal")
def proposal():
return send_from_directory("static/proposal", "index.html")
@app.route("/offer")
def offer():
return send_from_directory("static/offer", "index.html")
@app.route("/generated/<path:filename>")
def serve_generated(filename):
return send_from_directory(GEN_DIR, filename)
# ── API: Scrape ──────────────────────────────────────────────
@app.route("/api/scrape", methods=["POST"])
def api_scrape():
"""Scrape a JustVitamins product page."""
url = request.json.get("url", "").strip()
if not url:
return jsonify({"error": "No URL provided"}), 400
try:
data = scrape_product(url)
if not data.get("title"):
return jsonify({"error": "Could not find product. Check URL."}), 400
return jsonify(data)
except Exception as e:
traceback.print_exc()
return jsonify({"error": str(e)}), 500
@app.route("/api/scrape-competitor", methods=["POST"])
def api_scrape_competitor():
"""Scrape any competitor product page."""
url = request.json.get("url", "").strip()
if not url:
return jsonify({"error": "No URL provided"}), 400
try:
data = scrape_competitor(url)
if not data.get("title"):
return jsonify({"error": "Could not extract product data."}), 400
return jsonify(data)
except Exception as e:
traceback.print_exc()
return jsonify({"error": str(e)}), 500
# ── API: Demo A — 12-Asset Pack ──────────────────────────────
@app.route("/api/generate-pack", methods=["POST"])
def api_generate_pack():
"""Generate a full 12-asset marketing pack from product data."""
product = request.json
if not product:
return jsonify({"error": "No product data"}), 400
try:
t0 = time.time()
pack = generate_asset_pack(product)
pack["_generation_time"] = f"{time.time()-t0:.1f}s"
pack["_product_title"] = product.get("title", "")
return jsonify(pack)
except Exception as e:
traceback.print_exc()
return jsonify({"error": str(e)}), 500
# ── API: Demo B — Competitor X-Ray ───────────────────────────
@app.route("/api/competitor-xray", methods=["POST"])
def api_competitor_xray():
"""Scrape competitor + run AI analysis."""
url = request.json.get("url", "").strip()
if not url:
return jsonify({"error": "No URL provided"}), 400
try:
t0 = time.time()
comp_data = scrape_competitor(url)
if not comp_data.get("title"):
return jsonify({"error": "Could not scrape competitor page."}), 400
analysis = competitor_xray(comp_data)
analysis["_scrape_data"] = {
"title": comp_data.get("title"),
"price": comp_data.get("price"),
"brand": comp_data.get("brand"),
"url": url,
}
analysis["_generation_time"] = f"{time.time()-t0:.1f}s"
return jsonify(analysis)
except Exception as e:
traceback.print_exc()
return jsonify({"error": str(e)}), 500
# ── API: Demo C — PDP Surgeon ────────────────────────────────
@app.route("/api/pdp-surgeon", methods=["POST"])
def api_pdp_surgeon():
"""Rewrite PDP copy in a given style."""
data = request.json or {}
product = data.get("product")
style = data.get("style", "balanced")
if not product:
return jsonify({"error": "No product data"}), 400
try:
t0 = time.time()
result = pdp_surgeon(product, style)
result["_generation_time"] = f"{time.time()-t0:.1f}s"
return jsonify(result)
except Exception as e:
traceback.print_exc()
return jsonify({"error": str(e)}), 500
# ── API: Full PDP Optimise ───────────────────────────────────
@app.route("/api/optimise", methods=["POST"])
def api_optimise():
"""Full PDP rewrite — scrape + AI copy."""
product = request.json
if not product:
return jsonify({"error": "No product data"}), 400
try:
t0 = time.time()
copy = optimise_pdp_copy(product)
copy["_generation_time"] = f"{time.time()-t0:.1f}s"
return jsonify(copy)
except Exception as e:
traceback.print_exc()
return jsonify({"error": str(e)}), 500
# ── API: Image Generation ────────────────────────────────────
@app.route("/api/generate-images", methods=["POST"])
def api_generate_images():
"""Generate AI product images via Nano Banana / Pro."""
product = request.json
if not product:
return jsonify({"error": "No product data"}), 400
try:
t0 = time.time()
images = generate_product_images(product)
images["_generation_time"] = f"{time.time()-t0:.1f}s"
return jsonify(images)
except Exception as e:
traceback.print_exc()
return jsonify({"error": str(e)}), 500
# ── Health check ─────────────────────────────────────────────
@app.route("/api/health")
def health():
db_ok = False
try:
from db import get_conn
conn = get_conn()
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM monthly")
count = cur.fetchone()[0]
cur.close()
conn.close()
db_ok = count > 0
except Exception:
pass
return jsonify({
"status": "ok",
"gemini_key_set": bool(os.environ.get("GEMINI_API_KEY")),
"db_connected": db_ok,
"db_rows": count if db_ok else 0,
})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5050, debug=True)