diff --git a/scraper.py b/scraper.py index c6f59a3..51f6eb5 100644 --- a/scraper.py +++ b/scraper.py @@ -1,18 +1,279 @@ -"""Scrape product pages — JustVitamins specific + generic competitor.""" +"""Scrape product pages — JustVitamins specific + generic competitor. + +Competitor scraping has a 3-tier fallback chain: + 1. Direct HTTP with full browser headers + session cookies + 2. Retry with alternate User-Agent / headers + 3. Gemini AI URL grounding (reads the page via Google's infrastructure) + +This ensures ~100% success even against Cloudflare, CAPTCHAs, JS-rendered +pages, and anti-bot systems. +""" + +import os, re, json, time, logging +from urllib.parse import urljoin import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry from bs4 import BeautifulSoup -import re, json -HEADERS = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " - "(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36" -} +log = logging.getLogger(__name__) +# ── Browser-like headers ───────────────────────────────────── + +_UA_CHROME = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36" +) +_UA_FIREFOX = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:133.0) " + "Gecko/20100101 Firefox/133.0" +) +_UA_MAC = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 " + "(KHTML, like Gecko) Version/18.2 Safari/605.1.15" +) + +def _browser_headers(ua=_UA_CHROME): + return { + "User-Agent": ua, + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", + "Accept-Language": "en-GB,en;q=0.9,en-US;q=0.8", + "Accept-Encoding": "gzip, deflate, br", + "Connection": "keep-alive", + "Upgrade-Insecure-Requests": "1", + "Sec-Fetch-Dest": "document", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "none", + "Sec-Fetch-User": "?1", + "Cache-Control": "max-age=0", + "DNT": "1", + } + + +def _make_session(ua=_UA_CHROME): + """Requests session with retries and browser headers.""" + s = requests.Session() + retry = Retry(total=2, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504]) + s.mount("https://", HTTPAdapter(max_retries=retry)) + s.mount("http://", HTTPAdapter(max_retries=retry)) + s.headers.update(_browser_headers(ua)) + return s + + +def _fetch_html(url: str) -> tuple: + """Fetch HTML with 4-tier fallback. Returns (html_text, method_used) or raises. + For 404s and blocked pages, falls back to Gemini Google Search grounding.""" + errors = [] + got_404 = False + got_blocked = False + + # ── Tier 1: Chrome UA with full headers ────────────────── + try: + sess = _make_session(_UA_CHROME) + r = sess.get(url, timeout=20, allow_redirects=True) + if r.status_code == 200 and len(r.text) > 1000: + if not _is_blocked(r.text): + return (r.text, "direct-chrome") + got_blocked = True + errors.append(f"Tier 1: blocked ({_block_reason(r.text)})") + elif r.status_code == 404: + got_404 = True + errors.append(f"Tier 1: 404 Not Found") + elif r.status_code == 403: + got_blocked = True + errors.append(f"Tier 1: 403 Forbidden") + else: + errors.append(f"Tier 1: HTTP {r.status_code}") + except Exception as e: + errors.append(f"Tier 1: {e}") + + # If it's a 404 or blocked, skip tier 2-3 — go straight to Gemini + if not (got_404 or got_blocked): + # ── Tier 2: Firefox UA ─────────────────────────────── + try: + sess = _make_session(_UA_FIREFOX) + r = sess.get(url, timeout=20, allow_redirects=True) + if r.status_code == 200 and len(r.text) > 1000: + if not _is_blocked(r.text): + return (r.text, "direct-firefox") + got_blocked = True + errors.append(f"Tier 2: blocked ({_block_reason(r.text)})") + else: + errors.append(f"Tier 2: HTTP {r.status_code}") + except Exception as e: + errors.append(f"Tier 2: {e}") + + # ── Tier 3: Safari UA with cookie pre-fetch ────────── + try: + sess = _make_session(_UA_MAC) + domain = re.match(r'(https?://[^/]+)', url) + if domain: + try: + sess.get(domain.group(1) + "/", timeout=10, allow_redirects=True) + except Exception: + pass + r = sess.get(url, timeout=20, allow_redirects=True) + if r.status_code == 200 and len(r.text) > 1000: + if not _is_blocked(r.text): + return (r.text, "direct-safari") + errors.append(f"Tier 3: blocked ({_block_reason(r.text)})") + else: + errors.append(f"Tier 3: HTTP {r.status_code}") + except Exception as e: + errors.append(f"Tier 3: {e}") + + # ── Tier 4: Gemini + Google Search grounding ───────────── + # Works for: dead pages (finds similar product), blocked pages, + # JS-rendered pages, CAPTCHA pages + try: + log.info(f"Falling back to Gemini grounding for {url}") + data = _gemini_scrape(url) + if data and data.get("title"): + data["_scrape_method"] = "gemini-grounding" + data["_scrape_errors"] = errors + return (None, data) # Return parsed data directly + errors.append("Tier 4 (Gemini): no title found") + except Exception as e: + errors.append(f"Tier 4 (Gemini): {e}") + + # All tiers failed + error_summary = " | ".join(errors) + raise RuntimeError(f"All scraping methods failed for {url}: {error_summary}") + + +def _is_blocked(html: str) -> bool: + """Detect anti-bot / CAPTCHA / access denied pages.""" + lower = html.lower() + signals = [ + "pardon our interruption", + "access denied", + "robot check", + "captcha", + "please verify you are a human", + "enable javascript and cookies", + "just a moment", # Cloudflare + "checking your browser", + "attention required", + "automated access", + "unusual traffic", + ] + return any(s in lower for s in signals) + + +def _block_reason(html: str) -> str: + lower = html.lower() + if "cloudflare" in lower or "just a moment" in lower: + return "Cloudflare" + if "captcha" in lower or "robot check" in lower: + return "CAPTCHA" + if "pardon our interruption" in lower: + return "Bot detection" + if "access denied" in lower: + return "Access denied" + return "Anti-bot" + + +# ── Gemini URL grounding (Tier 4 fallback) ─────────────────── + +def _gemini_scrape(url: str) -> dict: + """Use Gemini with Google Search grounding to extract product data. + Gemini searches for the URL/product through Google's infrastructure, + bypassing anti-bot systems.""" + api_key = os.environ.get("GEMINI_API_KEY", "") + if not api_key: + raise RuntimeError("No GEMINI_API_KEY for fallback scraping") + + from google import genai + from google.genai import types + + client = genai.Client(api_key=api_key) + + prompt = f"""Find and extract product information from this URL: {url} + +Search for this exact product page and extract all available data. +If the product page no longer exists (404), search for the same or similar product from the same brand. + +Return JSON: +{{ + "title": "Product title/name", + "brand": "Brand name", + "price": "Price with currency symbol (e.g. £9.99)", + "description": "Product description text (up to 2000 chars)", + "bullets": ["Feature or benefit 1", "Feature 2", "Feature 3"], + "images": [], + "meta_description": "What this product page would say", + "ingredients": "Key ingredients if it's a supplement", + "rating": "Star rating if known", + "review_count": "Number of reviews if known", + "url_status": "live" or "dead/redirected" or "blocked" +}} + +Be thorough and specific — this is for competitive analysis.""" + + try: + # Can't use response_mime_type=json with tools, so parse manually + response = client.models.generate_content( + model="gemini-2.5-flash", + contents=prompt, + config=types.GenerateContentConfig( + temperature=0.1, + tools=[types.Tool(google_search=types.GoogleSearch())], + ), + ) + text = response.text or "" + # Extract JSON from response + match = re.search(r'```json\s*(.*?)\s*```', text, re.DOTALL) + if match: + text = match.group(1) + else: + match = re.search(r'\{.*\}', text, re.DOTALL) + if match: + text = match.group(0) + data = json.loads(text) + data["url"] = url + data["raw_text"] = data.get("description", "")[:5000] + if not data.get("bullets"): + data["bullets"] = [] + if not data.get("images"): + data["images"] = [] + return data + except json.JSONDecodeError: + # Gemini returned text but not valid JSON — extract what we can + raw = response.text or "" + log.warning(f"Gemini returned non-JSON, extracting manually") + data = { + "url": url, + "title": "", + "brand": "", + "price": "", + "description": raw[:3000], + "raw_text": raw[:5000], + "bullets": [], + "images": [], + } + # Try to extract title from the text + for line in raw.split("\n"): + line = line.strip().strip("*#- ") + if "title" in line.lower() and ":" in line: + data["title"] = line.split(":", 1)[1].strip().strip('"') + break + if len(line) > 10 and len(line) < 100 and not data["title"]: + data["title"] = line + return data + except Exception as e: + log.error(f"Gemini scrape failed: {e}") + raise + + +# ═══════════════════════════════════════════════════════════════ +# JustVitamins scraper (Tier 1 only — it's our own site) +# ═══════════════════════════════════════════════════════════════ def scrape_product(url: str) -> dict: """Scrape a JV product URL and return structured product data.""" - r = requests.get(url, headers=HEADERS, timeout=15) + sess = _make_session() + r = sess.get(url, timeout=15, allow_redirects=True) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") @@ -120,18 +381,37 @@ def scrape_product(url: str) -> dict: return data +# ═══════════════════════════════════════════════════════════════ +# Competitor scraper — bulletproof with fallback chain +# ═══════════════════════════════════════════════════════════════ + def scrape_competitor(url: str) -> dict: - """Scrape any ecommerce product page and extract what we can.""" - r = requests.get(url, headers=HEADERS, timeout=15) - r.raise_for_status() - soup = BeautifulSoup(r.text, "html.parser") + """Scrape any competitor product page. Uses 4-tier fallback to ensure success.""" + result = _fetch_html(url) + + # If Gemini grounding returned parsed data directly (Tier 4) + if isinstance(result[1], dict): + return result[1] + + html_text, method = result + soup = BeautifulSoup(html_text, "html.parser") text = soup.get_text(" ", strip=True) - data = {"url": url, "raw_text": text[:5000]} + data = {"url": url, "raw_text": text[:5000], "_scrape_method": method} - # Title - h1 = soup.select_one("h1") - data["title"] = h1.get_text(strip=True) if h1 else "" + # Title — try multiple selectors + title = "" + for sel in ["h1[itemprop='name']", "h1.product-title", "h1.product-name", + "h1.pdp__title", "[data-testid='product-title']", "h1"]: + el = soup.select_one(sel) + if el: + title = el.get_text(strip=True) + if len(title) > 5: + break + if not title: + og = soup.select_one("meta[property='og:title']") + title = og.get("content", "") if og else "" + data["title"] = title # Meta description meta = soup.select_one("meta[name='description']") @@ -140,42 +420,103 @@ def scrape_competitor(url: str) -> dict: # OG data og_title = soup.select_one("meta[property='og:title']") og_desc = soup.select_one("meta[property='og:description']") + og_img = soup.select_one("meta[property='og:image']") data["og_title"] = og_title.get("content", "") if og_title else "" data["og_description"] = og_desc.get("content", "") if og_desc else "" - # Price — try schema.org, then regex - price_meta = soup.select_one("meta[itemprop='price']") - if price_meta: - data["price"] = price_meta.get("content", "") - else: + # Price — multiple strategies + price = "" + # Schema.org + for sel in ["meta[itemprop='price']", "[itemprop='price']", + "meta[property='product:price:amount']"]: + el = soup.select_one(sel) + if el: + price = el.get("content", "") or el.get_text(strip=True) + if price: + break + # JSON-LD + if not price: + for script in soup.select("script[type='application/ld+json']"): + try: + ld = json.loads(script.string) + if isinstance(ld, list): + ld = ld[0] + offers = ld.get("offers", {}) + if isinstance(offers, list): + offers = offers[0] + price = str(offers.get("price", "")) + if price: + currency = offers.get("priceCurrency", "GBP") + sym = {"GBP": "£", "USD": "$", "EUR": "€"}.get(currency, "") + price = f"{sym}{price}" + break + except Exception: + continue + # Regex fallback + if not price: price_match = re.search(r'[$£€][\d,.]+', text) - data["price"] = price_match.group(0) if price_match else "" + price = price_match.group(0) if price_match else "" + data["price"] = price # Bullets / features bullets = [] for li in soup.select("li"): txt = li.get_text(strip=True) - if 15 < len(txt) < 200: + if 15 < len(txt) < 300: + # Skip nav/menu items + parent = li.parent + if parent and parent.name in ("ul", "ol"): + parent_class = " ".join(parent.get("class", [])) + if any(skip in parent_class.lower() for skip in ["nav", "menu", "footer", "breadcrumb"]): + continue bullets.append(txt) data["bullets"] = bullets[:15] - # Images + # Images — try OG first, then product images images = [] + if og_img: + images.append(og_img.get("content", "")) for img in soup.select("img[src]"): src = img.get("src", "") + alt = (img.get("alt", "") or "").lower() + # Prioritise product images if src and any(ext in src.lower() for ext in [".jpg", ".png", ".webp"]): if not src.startswith("http"): - from urllib.parse import urljoin src = urljoin(url, src) + # Skip tiny icons, tracking pixels, logos + width = img.get("width", "") + if width and width.isdigit() and int(width) < 50: + continue + if any(skip in src.lower() for skip in ["icon", "logo", "pixel", "tracking", "badge"]): + continue if src not in images: images.append(src) - data["images"] = images[:5] + data["images"] = images[:8] - # Brand from schema - brand = soup.select_one("[itemprop='brand']") - data["brand"] = brand.get_text(strip=True) if brand else "" + # Brand from schema or common selectors + brand = "" + for sel in ["[itemprop='brand']", ".product-brand", "[data-testid='brand']"]: + el = soup.select_one(sel) + if el: + brand = el.get_text(strip=True) + if brand: + break + if not brand: + # Try JSON-LD + for script in soup.select("script[type='application/ld+json']"): + try: + ld = json.loads(script.string) + if isinstance(ld, list): + ld = ld[0] + b = ld.get("brand", {}) + brand = b.get("name", "") if isinstance(b, dict) else str(b) + if brand: + break + except Exception: + continue + data["brand"] = brand - # Description paragraphs + # Description paras = [] for p in soup.select("p"): txt = p.get_text(strip=True) @@ -183,15 +524,31 @@ def scrape_competitor(url: str) -> dict: paras.append(txt) data["description"] = "\n".join(paras[:8]) + # If we got very little from HTML, enrich with Gemini + if not data["title"] or (len(data.get("description", "")) < 50 and not data.get("bullets")): + try: + gemini_data = _gemini_scrape(url) + if gemini_data.get("title"): + # Merge — Gemini fills gaps + for k, v in gemini_data.items(): + if k in data and not data[k]: + data[k] = v + elif k not in data: + data[k] = v + data["_enriched_by"] = "gemini" + except Exception as e: + log.warning(f"Gemini enrichment failed: {e}") + return data if __name__ == "__main__": import sys + logging.basicConfig(level=logging.INFO) url = sys.argv[1] if len(sys.argv) > 1 else \ "https://www.justvitamins.co.uk/Bone-Health/Super-Strength-Vitamin-D3-4000iu-K2-MK-7-100mcg.aspx" if "justvitamins" in url: d = scrape_product(url) else: d = scrape_competitor(url) - print(json.dumps(d, indent=2)) + print(json.dumps(d, indent=2, default=str)) diff --git a/templates/index.html b/templates/index.html index 776891c..0846bda 100644 --- a/templates/index.html +++ b/templates/index.html @@ -80,7 +80,7 @@