humbleParser/bundle_checker.py

211 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import requests
import json
import hashlib
import logging
from datetime import datetime
from bs4 import BeautifulSoup
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, Text
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
# Konfiguriere Logging passe das Level bei Bedarf an (DEBUG, INFO, WARNING, ERROR)
DEBUG_LEVEL = logging.DEBUG
logging.basicConfig(
level=DEBUG_LEVEL,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
# Basis-Klasse für SQLAlchemy-Modelle (SQLAlchemy 2.0-konform)
Base = declarative_base()
# Tabelle für das Bundle (statische Identifikation)
class Bundle(Base):
__tablename__ = 'bundles'
id = Column(Integer, primary_key=True)
machine_name = Column(String, unique=True)
human_name = Column(String)
# current_version_id verweist auf die aktuelle Version in bundle_versions
current_version_id = Column(Integer, ForeignKey('bundle_versions.id'))
# Beziehung zur aktuellen Version; post_update=True hilft bei zirkulären Abhängigkeiten
current_version = relationship("BundleVersion", uselist=False,
foreign_keys=[current_version_id],
post_update=True)
# Alle Versionen (historisch)
versions = relationship("BundleVersion", back_populates="bundle",
foreign_keys=lambda: [BundleVersion.bundle_id])
# Verkaufshistorie
sales_history = relationship("BundleSalesHistory", back_populates="bundle")
# Tabelle für Versionen eines Bundles
class BundleVersion(Base):
__tablename__ = 'bundle_versions'
id = Column(Integer, primary_key=True)
bundle_id = Column(Integer, ForeignKey('bundles.id'))
version_hash = Column(String)
version_data = Column(Text) # Relevante Bundle-Daten als JSON-String
timestamp = Column(DateTime, default=datetime.utcnow)
# Eindeutige Beziehung: wir verwenden hier explizit bundle_id
bundle = relationship("Bundle", back_populates="versions", foreign_keys=[bundle_id])
# Tabelle für Verkaufshistorie
class BundleSalesHistory(Base):
__tablename__ = 'bundle_sales_history'
id = Column(Integer, primary_key=True)
bundle_id = Column(Integer, ForeignKey('bundles.id'))
bundles_sold = Column(Float)
timestamp = Column(DateTime, default=datetime.utcnow)
bundle = relationship("Bundle", back_populates="sales_history")
def calculate_hash(data: dict) -> str:
"""Berechnet einen SHA-256 Hash aus dem sortierten JSON-String der relevanten Daten."""
json_string = json.dumps(data, sort_keys=True, ensure_ascii=False)
hash_value = hashlib.sha256(json_string.encode('utf-8')).hexdigest()
logger.debug(f"Berechneter Hash: {hash_value}")
return hash_value
def fetch_bundle_data(url: str) -> dict:
"""Lädt die Detailseite eines Bundles und extrahiert den JSON-Inhalt aus dem <script>-Tag 'webpack-bundle-page-data'."""
logger.info(f"Rufe Bundle-Daten von {url} ab...")
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", {"id": "webpack-bundle-page-data", "type": "application/json"})
if not script_tag:
logger.error("Kein JSON-Datenblock 'webpack-bundle-page-data' gefunden!")
raise ValueError("Kein JSON-Datenblock gefunden!")
data = json.loads(script_tag.string)
logger.debug(f"Erhaltener JSON-Block: {str(data)[:200]} ...")
return data
def process_bundle(session, url: str):
"""
Verarbeitet ein einzelnes Bundle:
- Lädt die Detailseite und extrahiert den JSON-Datensatz (bundleData)
- Berechnet einen Hash des relevanten Datenbereichs
- Vergleicht mit der letzten Version in der DB und speichert bei Änderung eine neue Version
- Speichert Verkaufszahlen in einer separaten Tabelle
"""
try:
data = fetch_bundle_data(url)
except Exception as e:
logger.error(f"Fehler beim Laden der Bundle-Daten von {url}: {e}")
return
bundle_data = data.get("bundleData", {})
machine_name = bundle_data.get("machine_name", "")
human_name = bundle_data.get("human_name", "")
logger.info(f"Verarbeite Bundle '{human_name}' (machine_name: {machine_name})")
# Wir nehmen das gesamte bundleData als relevanten Datenausschnitt
relevant_data = bundle_data
new_hash = calculate_hash(relevant_data)
# Suche, ob das Bundle bereits existiert
bundle = session.query(Bundle).filter_by(machine_name=machine_name).first()
if not bundle:
logger.info(f"Neues Bundle gefunden: {human_name}")
bundle = Bundle(machine_name=machine_name, human_name=human_name)
session.add(bundle)
session.commit()
else:
logger.debug(f"Bundle '{human_name}' existiert bereits (ID: {bundle.id})")
# Hole die aktuellste Version dieses Bundles
latest_version = (session.query(BundleVersion)
.filter_by(bundle_id=bundle.id)
.order_by(BundleVersion.timestamp.desc())
.first())
if latest_version is None or latest_version.version_hash != new_hash:
logger.info(f"Neue Version für Bundle '{human_name}' wird gespeichert (alter Hash: {latest_version.version_hash if latest_version else 'None'}, neuer Hash: {new_hash}).")
new_version = BundleVersion(
bundle_id=bundle.id,
version_hash=new_hash,
version_data=json.dumps(relevant_data, sort_keys=True, ensure_ascii=False)
)
session.add(new_version)
session.commit()
bundle.current_version_id = new_version.id
session.commit()
else:
logger.info(f"Bundle '{human_name}' hat sich nicht geändert.")
# Verkaufszahlen extrahieren angenommen, sie stehen entweder direkt in bundleData oder unter basic_data
bundles_sold = bundle_data.get("bundles_sold|decimal")
if bundles_sold is None:
bundles_sold = bundle_data.get("basic_data", {}).get("bundles_sold|decimal")
if bundles_sold is not None:
try:
sales_value = float(bundles_sold)
sales_entry = BundleSalesHistory(
bundle_id=bundle.id,
bundles_sold=sales_value
)
session.add(sales_entry)
session.commit()
logger.info(f"Verkaufszahlen für Bundle '{human_name}' aktualisiert: {sales_value}")
except Exception as e:
logger.error(f"Fehler beim Speichern der Verkaufszahlen für {human_name}: {e}")
else:
logger.debug(f"Keine Verkaufszahlen für Bundle '{human_name}' gefunden.")
def get_bundle_urls(overview_url: str) -> list:
"""
Ruft die Übersichtsseite ab und extrahiert alle Bundle-URLs aus dem JSON-Datenblock im <script>-Tag 'landingPage-json-data'.
"""
logger.info(f"Rufe Übersichtsseite {overview_url} ab...")
response = requests.get(overview_url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
bundle_urls = []
landing_script = soup.find("script", {"id": "landingPage-json-data", "type": "application/json"})
if landing_script:
landing_data = json.loads(landing_script.string)
logger.debug(f"Landing Page JSON (gekürzt): {str(landing_data)[:200]} ...")
# Durchlaufe die Kategorien "books", "games" und "software"
for category in ["books", "games", "software"]:
cat_data = landing_data.get("data", {}).get(category, {})
for section in cat_data.get("mosaic", []):
for product in section.get("products", []):
url = product.get("product_url", "")
if url:
full_url = requests.compat.urljoin(overview_url, url)
bundle_urls.append(full_url)
else:
logger.warning("Kein JSON-Datenblock 'landingPage-json-data' auf der Übersichtsseite gefunden.")
logger.info(f"Gefundene Bundle-URLs: {len(bundle_urls)}")
return bundle_urls
def main():
engine = create_engine('sqlite:///bundles.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
overview_url = "https://www.humblebundle.com/bundles"
try:
bundle_urls = get_bundle_urls(overview_url)
if not bundle_urls:
logger.error("Keine Bundle-URLs gefunden! Überprüfe den JSON-Datenblock oder den Selektor in get_bundle_urls().")
return
except Exception as e:
logger.error(f"Fehler beim Abrufen der Übersichtsseite: {e}")
return
# Für jede Bundle-URL verarbeiten
for url in bundle_urls:
logger.info(f"Verarbeite Bundle: {url}")
try:
process_bundle(session, url)
except Exception as e:
logger.error(f"Fehler beim Verarbeiten von {url}: {e}")
if __name__ == "__main__":
main()