humbleParser/bundle_checker.py

259 lines
11 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import requests
import json
import hashlib
import logging
from datetime import datetime
from bs4 import BeautifulSoup
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, Text
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
# Konfiguriere Logging passe das Level bei Bedarf an (DEBUG, INFO, WARNING, ERROR)
DEBUG_LEVEL = logging.DEBUG
logging.basicConfig(
level=DEBUG_LEVEL,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
# Basis-Klasse für SQLAlchemy-Modelle (SQLAlchemy 2.0-konform)
Base = declarative_base()
# ---------------------------
# Datenbank-Modelle
# ---------------------------
class Bundle(Base):
__tablename__ = 'bundles'
id = Column(Integer, primary_key=True)
machine_name = Column(String, unique=True)
human_name = Column(String) # Falls vorhanden; kann leer bleiben, wenn nicht extrahiert
current_version_id = Column(Integer, ForeignKey('bundle_versions.id'))
# Beziehung zur aktuellen Version; post_update=True hilft bei zirkulären Abhängigkeiten
current_version = relationship("BundleVersion", uselist=False,
foreign_keys=[current_version_id],
post_update=True)
# Alle Versionen (historisch)
versions = relationship("BundleVersion", back_populates="bundle",
foreign_keys=lambda: [BundleVersion.bundle_id])
# Verkaufshistorie
sales_history = relationship("BundleSalesHistory", back_populates="bundle")
# Einzelne Elemente des Bundles (z.B. die enthaltenen Bücher, Spiele etc.)
items = relationship("BundleItem", back_populates="bundle")
class BundleVersion(Base):
__tablename__ = 'bundle_versions'
id = Column(Integer, primary_key=True)
bundle_id = Column(Integer, ForeignKey('bundles.id'))
version_hash = Column(String)
version_data = Column(Text) # Relevante Bundle-Daten als JSON-String
timestamp = Column(DateTime, default=datetime.utcnow)
# Explizite Angabe des Fremdschlüssels
bundle = relationship("Bundle", back_populates="versions", foreign_keys=[bundle_id])
class BundleSalesHistory(Base):
__tablename__ = 'bundle_sales_history'
id = Column(Integer, primary_key=True)
bundle_id = Column(Integer, ForeignKey('bundles.id'))
bundles_sold = Column(Float)
timestamp = Column(DateTime, default=datetime.utcnow)
bundle = relationship("Bundle", back_populates="sales_history")
class BundleItem(Base):
__tablename__ = 'bundle_items'
id = Column(Integer, primary_key=True)
bundle_id = Column(Integer, ForeignKey('bundles.id'))
title = Column(String) # Titel des Elements (z.B. Buch- oder Spielname)
category = Column(String) # Kategorie, z.B. "book", "game" oder "software"
details = Column(Text) # Optional: ganze Detaildaten als JSON-String
bundle = relationship("Bundle", back_populates="items")
# ---------------------------
# Hilfsfunktionen
# ---------------------------
def calculate_hash(data: dict) -> str:
"""Berechnet einen SHA-256-Hash aus dem sortierten JSON-String der relevanten Daten."""
json_string = json.dumps(data, sort_keys=True, ensure_ascii=False)
hash_value = hashlib.sha256(json_string.encode('utf-8')).hexdigest()
logger.debug(f"Berechneter Hash: {hash_value}")
return hash_value
def fetch_bundle_data(url: str) -> dict:
"""
Lädt die Detailseite eines Bundles und extrahiert den JSON-Inhalt aus dem <script>-Tag
mit id="webpack-bundle-page-data" (dieser enthält die detaillierten Bundle-Daten).
"""
logger.info(f"Rufe Bundle-Daten von {url} ab...")
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", {"id": "webpack-bundle-page-data", "type": "application/json"})
if not script_tag:
logger.error("Kein JSON-Datenblock 'webpack-bundle-page-data' gefunden!")
raise ValueError("Kein JSON-Datenblock gefunden!")
data = json.loads(script_tag.string)
logger.debug(f"Erhaltener JSON-Block (gekürzt): {str(data)[:200]} ...")
return data
def process_bundle(session, url: str):
"""
Verarbeitet ein einzelnes Bundle:
- Lädt die Detailseite und extrahiert den JSON-Datensatz (bundleData)
- Berechnet einen Hash des relevanten Datenbereichs
- Vergleicht mit der letzten Version in der DB und speichert bei Änderung eine neue Version
- Speichert Verkaufszahlen in einer separaten Tabelle
- Extrahiert und speichert, falls vorhanden, einzelne Bundle-Elemente
"""
try:
data = fetch_bundle_data(url)
except Exception as e:
logger.error(f"Fehler beim Laden der Bundle-Daten von {url}: {e}")
return
bundle_data = data.get("bundleData", {})
machine_name = bundle_data.get("machine_name", "")
human_name = bundle_data.get("human_name", "")
logger.info(f"Verarbeite Bundle '{human_name}' (machine_name: {machine_name})")
# Hier definieren wir den relevanten Datenausschnitt (anpassen, falls nötig)
relevant_data = bundle_data
new_hash = calculate_hash(relevant_data)
# Suche, ob das Bundle bereits existiert
bundle = session.query(Bundle).filter_by(machine_name=machine_name).first()
if not bundle:
logger.info(f"Neues Bundle gefunden: '{human_name}' (machine_name: {machine_name})")
bundle = Bundle(machine_name=machine_name, human_name=human_name)
session.add(bundle)
session.commit()
else:
logger.debug(f"Bundle '{human_name}' existiert bereits (ID: {bundle.id})")
# Vergleiche mit der aktuellsten Version
latest_version = (session.query(BundleVersion)
.filter_by(bundle_id=bundle.id)
.order_by(BundleVersion.timestamp.desc())
.first())
if latest_version is None or latest_version.version_hash != new_hash:
logger.info(f"Neue Version für Bundle '{human_name}' wird gespeichert (alter Hash: {latest_version.version_hash if latest_version else 'None'}, neuer Hash: {new_hash}).")
new_version = BundleVersion(
bundle_id=bundle.id,
version_hash=new_hash,
version_data=json.dumps(relevant_data, sort_keys=True, ensure_ascii=False)
)
session.add(new_version)
session.commit()
bundle.current_version_id = new_version.id
session.commit()
else:
logger.info(f"Bundle '{human_name}' hat sich nicht geändert.")
# Verkaufszahlen extrahieren hier wird angenommen, dass sie unter einem entsprechenden Schlüssel stehen
bundles_sold = bundle_data.get("bundles_sold|decimal")
if bundles_sold is None:
bundles_sold = bundle_data.get("basic_data", {}).get("bundles_sold|decimal")
if bundles_sold is not None:
try:
sales_value = float(bundles_sold)
sales_entry = BundleSalesHistory(
bundle_id=bundle.id,
bundles_sold=sales_value
)
session.add(sales_entry)
session.commit()
logger.info(f"Verkaufszahlen für Bundle '{human_name}' aktualisiert: {sales_value}")
except Exception as e:
logger.error(f"Fehler beim Speichern der Verkaufszahlen für '{human_name}': {e}")
else:
logger.debug(f"Keine Verkaufszahlen für Bundle '{human_name}' gefunden.")
# ---------------------------
# Extraktion der Bundle-Elemente
# ---------------------------
# Wir gehen hier davon aus, dass in den Bundle-Daten unter dem Schlüssel "items" eine Liste der enthaltenen Produkte steht.
items = bundle_data.get("items", [])
if items:
logger.info(f"Es wurden {len(items)} Elemente im Bundle '{human_name}' gefunden.")
for item in items:
title = item.get("title", "Unbekannt")
category = item.get("category", "Unbekannt")
details = json.dumps(item, sort_keys=True, ensure_ascii=False)
# Prüfen, ob das Element bereits gespeichert wurde (zum Beispiel anhand des Titels)
existing_item = session.query(BundleItem).filter_by(bundle_id=bundle.id, title=title).first()
if existing_item:
logger.debug(f"Element '{title}' im Bundle '{human_name}' existiert bereits Überspringe.")
else:
new_item = BundleItem(
bundle_id=bundle.id,
title=title,
category=category,
details=details
)
session.add(new_item)
logger.debug(f"Neues Element '{title}' im Bundle '{human_name}' wird gespeichert.")
session.commit()
else:
logger.debug(f"Keine Bundle-Elemente (items) im Bundle '{human_name}' gefunden.")
def get_bundle_urls(overview_url: str) -> list:
"""
Ruft die Übersichtsseite ab und extrahiert alle Bundle-URLs aus dem JSON-Datenblock im <script>-Tag 'landingPage-json-data'.
"""
logger.info(f"Rufe Übersichtsseite {overview_url} ab...")
response = requests.get(overview_url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
bundle_urls = []
landing_script = soup.find("script", {"id": "landingPage-json-data", "type": "application/json"})
if landing_script:
landing_data = json.loads(landing_script.string)
logger.debug(f"Landing Page JSON (gekürzt): {str(landing_data)[:200]} ...")
# Durchlaufe die Kategorien "books", "games" und "software"
for category in ["books", "games", "software"]:
cat_data = landing_data.get("data", {}).get(category, {})
for section in cat_data.get("mosaic", []):
for product in section.get("products", []):
url = product.get("product_url", "")
if url:
full_url = requests.compat.urljoin(overview_url, url)
bundle_urls.append(full_url)
else:
logger.warning("Kein JSON-Datenblock 'landingPage-json-data' auf der Übersichtsseite gefunden.")
logger.info(f"Gefundene Bundle-URLs: {len(bundle_urls)}")
return bundle_urls
def main():
engine = create_engine('sqlite:///bundles.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
overview_url = "https://www.humblebundle.com/bundles"
try:
bundle_urls = get_bundle_urls(overview_url)
if not bundle_urls:
logger.error("Keine Bundle-URLs gefunden! Überprüfe den JSON-Datenblock oder den Selektor in get_bundle_urls().")
return
except Exception as e:
logger.error(f"Fehler beim Abrufen der Übersichtsseite: {e}")
return
# Für jede Bundle-URL verarbeiten
for url in bundle_urls:
logger.info(f"Verarbeite Bundle: {url}")
try:
process_bundle(session, url)
except Exception as e:
logger.error(f"Fehler beim Verarbeiten von {url}: {e}")
if __name__ == "__main__":
main()