humbleParser/bundle_checker.py

283 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import requests
import json
import hashlib
import logging
import difflib
from datetime import datetime
from bs4 import BeautifulSoup
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, Text
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
# Konfiguriere Logging ändere das Level bei Bedarf (DEBUG, INFO, WARNING, ERROR)
DEBUG_LEVEL = logging.DEBUG
logging.basicConfig(
level=DEBUG_LEVEL,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
# Basis-Klasse für SQLAlchemy-Modelle (2.0-konform)
Base = declarative_base()
# ---------------------------
# Datenbank-Modelle
# ---------------------------
class Bundle(Base):
__tablename__ = 'bundles'
id = Column(Integer, primary_key=True)
machine_name = Column(String, unique=True)
human_name = Column(String) # Kann leer bleiben, wenn nicht extrahiert
current_version_id = Column(Integer, ForeignKey('bundle_versions.id'))
# Beziehung zur aktuellen Version (post_update=True hilft bei zirkulären Abhängigkeiten)
current_version = relationship("BundleVersion", uselist=False,
foreign_keys=[current_version_id],
post_update=True)
# Alle Versionen (historisch)
versions = relationship("BundleVersion", back_populates="bundle",
foreign_keys=lambda: [BundleVersion.bundle_id])
# Verkaufshistorie
sales_history = relationship("BundleSalesHistory", back_populates="bundle")
# Einzelne Elemente des Bundles (z.B. enthaltene Bücher, Spiele etc.)
items = relationship("BundleItem", back_populates="bundle")
class BundleVersion(Base):
__tablename__ = 'bundle_versions'
id = Column(Integer, primary_key=True)
bundle_id = Column(Integer, ForeignKey('bundles.id'))
version_hash = Column(String)
version_data = Column(Text) # JSON-Daten als String
timestamp = Column(DateTime, default=datetime.utcnow)
# Explizite Angabe des Fremdschlüssels
bundle = relationship("Bundle", back_populates="versions", foreign_keys=[bundle_id])
class BundleSalesHistory(Base):
__tablename__ = 'bundle_sales_history'
id = Column(Integer, primary_key=True)
bundle_id = Column(Integer, ForeignKey('bundles.id'))
bundles_sold = Column(Float)
timestamp = Column(DateTime, default=datetime.utcnow)
bundle = relationship("Bundle", back_populates="sales_history")
class BundleItem(Base):
__tablename__ = 'bundle_items'
id = Column(Integer, primary_key=True)
bundle_id = Column(Integer, ForeignKey('bundles.id'))
title = Column(String) # Titel des Elements (z.B. Buch- oder Spielname)
category = Column(String) # Kategorie, z.B. "book", "game" oder "software"
details = Column(Text) # Optionale Detaildaten als JSON-String
bundle = relationship("Bundle", back_populates="items")
# ---------------------------
# Hilfsfunktionen
# ---------------------------
def calculate_hash(data: dict) -> str:
"""Berechnet einen SHA-256-Hash aus dem sortierten JSON-String der Daten."""
json_string = json.dumps(data, sort_keys=True, ensure_ascii=False)
hash_value = hashlib.sha256(json_string.encode('utf-8')).hexdigest()
logger.debug(f"Berechneter Hash: {hash_value}")
return hash_value
def log_diff(old_data: dict, new_data: dict):
"""
Vergleicht zwei JSON-Daten (als dict) und gibt einen unified diff als String aus.
Nur im DEBUG-Level wird der detaillierte Vergleich ausgegeben.
"""
old_str = json.dumps(old_data, sort_keys=True, indent=4, ensure_ascii=False).splitlines()
new_str = json.dumps(new_data, sort_keys=True, indent=4, ensure_ascii=False).splitlines()
diff = difflib.unified_diff(old_str, new_str, fromfile="alte_version", tofile="neue_version", lineterm="")
diff_text = "\n".join(diff)
logger.debug("Unterschiede zwischen den Versionen:\n" + diff_text)
def fetch_bundle_data(url: str) -> dict:
"""
Lädt die Detailseite eines Bundles und extrahiert den JSON-Inhalt aus dem <script>-Tag
mit id="webpack-bundle-page-data" (diese Seite enthält die detaillierten Bundle-Daten).
"""
logger.info(f"Rufe Bundle-Daten von {url} ab...")
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", {"id": "webpack-bundle-page-data", "type": "application/json"})
if not script_tag:
logger.error("Kein JSON-Datenblock 'webpack-bundle-page-data' gefunden!")
raise ValueError("Kein JSON-Datenblock gefunden!")
data = json.loads(script_tag.string)
logger.debug(f"Erhaltener JSON-Block (gekürzt): {str(data)[:200]} ...")
return data
def process_bundle(session, url: str):
"""
Verarbeitet ein einzelnes Bundle:
- Lädt die Detailseite und extrahiert den JSON-Datensatz (bundleData)
- Berechnet einen Hash des relevanten Datenbereichs
- Vergleicht mit der letzten Version in der DB und speichert bei Änderung eine neue Version
- Speichert Verkaufszahlen in einer separaten Tabelle
- Extrahiert und speichert, falls vorhanden, einzelne Bundle-Elemente
"""
try:
data = fetch_bundle_data(url)
except Exception as e:
logger.error(f"Fehler beim Laden der Bundle-Daten von {url}: {e}")
return
# Es wird angenommen, dass die Bundle-Daten unter dem Schlüssel "bundleData" stehen.
bundle_data = data.get("bundleData", {})
machine_name = bundle_data.get("machine_name", "")
human_name = bundle_data.get("human_name", "")
logger.info(f"Verarbeite Bundle '{human_name}' (machine_name: {machine_name})")
# Definiere hier den relevanten Datenausschnitt (anpassen, falls nötig)
relevant_data = bundle_data
new_hash = calculate_hash(relevant_data)
# Suche, ob das Bundle bereits existiert
bundle = session.query(Bundle).filter_by(machine_name=machine_name).first()
if not bundle:
logger.info(f"Neues Bundle gefunden: '{human_name}' (machine_name: {machine_name})")
bundle = Bundle(machine_name=machine_name, human_name=human_name)
session.add(bundle)
session.commit()
else:
logger.debug(f"Bundle '{human_name}' existiert bereits (ID: {bundle.id})")
# Vergleiche mit der aktuellsten Version
latest_version = (session.query(BundleVersion)
.filter_by(bundle_id=bundle.id)
.order_by(BundleVersion.timestamp.desc())
.first())
if latest_version is None or latest_version.version_hash != new_hash:
if latest_version:
if logger.isEnabledFor(logging.DEBUG):
try:
old_data = json.loads(latest_version.version_data)
log_diff(old_data, relevant_data)
except Exception as ex:
logger.debug(f"Fehler beim Diff-Vergleich: {ex}")
else:
logger.info("Hashabweichung festgestellt es gibt Änderungen in den Bundle-Daten.")
else:
logger.info("Keine vorherige Version gefunden neues Bundle wird gespeichert.")
new_version = BundleVersion(
bundle_id=bundle.id,
version_hash=new_hash,
version_data=json.dumps(relevant_data, sort_keys=True, ensure_ascii=False)
)
session.add(new_version)
session.commit()
bundle.current_version_id = new_version.id
session.commit()
else:
logger.info(f"Bundle '{human_name}' hat sich nicht geändert.")
# Verkaufszahlen extrahieren hier wird angenommen, dass sie unter einem entsprechenden Schlüssel stehen
bundles_sold = bundle_data.get("bundles_sold|decimal")
if bundles_sold is None:
bundles_sold = bundle_data.get("basic_data", {}).get("bundles_sold|decimal")
if bundles_sold is not None:
try:
sales_value = float(bundles_sold)
sales_entry = BundleSalesHistory(
bundle_id=bundle.id,
bundles_sold=sales_value
)
session.add(sales_entry)
session.commit()
logger.info(f"Verkaufszahlen für Bundle '{human_name}' aktualisiert: {sales_value}")
except Exception as e:
logger.error(f"Fehler beim Speichern der Verkaufszahlen für '{human_name}': {e}")
else:
logger.debug(f"Keine Verkaufszahlen für Bundle '{human_name}' gefunden.")
# ---------------------------
# Extraktion der Bundle-Elemente
# ---------------------------
# Hier wird angenommen, dass in den Bundle-Daten unter "items" eine Liste der enthaltenen Produkte steht.
items = bundle_data.get("items", [])
if items:
logger.info(f"Es wurden {len(items)} Elemente im Bundle '{human_name}' gefunden.")
for item in items:
title = item.get("title", "Unbekannt")
category = item.get("category", "Unbekannt")
details = json.dumps(item, sort_keys=True, ensure_ascii=False)
# Prüfen, ob das Element bereits gespeichert wurde (z.B. anhand des Titels)
existing_item = session.query(BundleItem).filter_by(bundle_id=bundle.id, title=title).first()
if existing_item:
logger.debug(f"Element '{title}' im Bundle '{human_name}' existiert bereits Überspringe.")
else:
new_item = BundleItem(
bundle_id=bundle.id,
title=title,
category=category,
details=details
)
session.add(new_item)
logger.debug(f"Neues Element '{title}' im Bundle '{human_name}' wird gespeichert.")
session.commit()
else:
logger.debug(f"Keine Bundle-Elemente (items) im Bundle '{human_name}' gefunden.")
def get_bundle_urls(overview_url: str) -> list:
"""
Ruft die Übersichtsseite ab und extrahiert alle Bundle-URLs aus dem JSON-Datenblock im <script>-Tag 'landingPage-json-data'.
"""
logger.info(f"Rufe Übersichtsseite {overview_url} ab...")
response = requests.get(overview_url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
bundle_urls = []
landing_script = soup.find("script", {"id": "landingPage-json-data", "type": "application/json"})
if landing_script:
landing_data = json.loads(landing_script.string)
logger.debug(f"Landing Page JSON (gekürzt): {str(landing_data)[:200]} ...")
# Durchlaufe die Kategorien "books", "games" und "software"
for category in ["books", "games", "software"]:
cat_data = landing_data.get("data", {}).get(category, {})
for section in cat_data.get("mosaic", []):
for product in section.get("products", []):
url = product.get("product_url", "")
if url:
full_url = requests.compat.urljoin(overview_url, url)
bundle_urls.append(full_url)
else:
logger.warning("Kein JSON-Datenblock 'landingPage-json-data' auf der Übersichtsseite gefunden.")
logger.info(f"Gefundene Bundle-URLs: {len(bundle_urls)}")
return bundle_urls
def main():
engine = create_engine('sqlite:///bundles.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
overview_url = "https://www.humblebundle.com/bundles"
try:
bundle_urls = get_bundle_urls(overview_url)
if not bundle_urls:
logger.error("Keine Bundle-URLs gefunden! Überprüfe den JSON-Datenblock oder den Selektor in get_bundle_urls().")
return
except Exception as e:
logger.error(f"Fehler beim Abrufen der Übersichtsseite: {e}")
return
# Für jede Bundle-URL verarbeiten
for url in bundle_urls:
logger.info(f"Verarbeite Bundle: {url}")
try:
process_bundle(session, url)
except Exception as e:
logger.error(f"Fehler beim Verarbeiten von {url}: {e}")
if __name__ == "__main__":
main()