humbleParser/bundle_checker.py

189 lines
7.9 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import requests
import json
import hashlib
from datetime import datetime
from bs4 import BeautifulSoup
# Importiere declarative_base aus sqlalchemy.orm (SQLAlchemy 2.0-konform)
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, Text
# Basis-Klasse für SQLAlchemy-Modelle
Base = declarative_base()
# Tabelle für das Bundle (statische Identifikation)
class Bundle(Base):
__tablename__ = 'bundles'
id = Column(Integer, primary_key=True)
machine_name = Column(String, unique=True) # z.B. "linuxfrombeginnertoprofessionaloreilly_bookbundle"
human_name = Column(String)
current_version_id = Column(Integer, ForeignKey('bundle_versions.id'))
# Beziehung zur aktuellen Version
current_version = relationship("BundleVersion", uselist=False, foreign_keys=[current_version_id])
# Alle Versionen (historisch)
versions = relationship("BundleVersion", back_populates="bundle", foreign_keys='BundleVersion.bundle_id')
# Verkaufshistorie
sales_history = relationship("BundleSalesHistory", back_populates="bundle")
# Tabelle für Versionen eines Bundles
class BundleVersion(Base):
__tablename__ = 'bundle_versions'
id = Column(Integer, primary_key=True)
bundle_id = Column(Integer, ForeignKey('bundles.id'))
version_hash = Column(String) # SHA-256 Hash der relevanten Daten
version_data = Column(Text) # Alle relevanten Bundle-Daten als JSON-String
timestamp = Column(DateTime, default=datetime.utcnow)
bundle = relationship("Bundle", back_populates="versions")
# Tabelle für Verkaufshistorie (zur zeitlichen Analyse der Verkaufszahlen)
class BundleSalesHistory(Base):
__tablename__ = 'bundle_sales_history'
id = Column(Integer, primary_key=True)
bundle_id = Column(Integer, ForeignKey('bundles.id'))
bundles_sold = Column(Float)
timestamp = Column(DateTime, default=datetime.utcnow)
bundle = relationship("Bundle", back_populates="sales_history")
def calculate_hash(data: dict) -> str:
"""Berechnet einen SHA-256 Hash aus dem sortierten JSON-String der relevanten Daten."""
json_string = json.dumps(data, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(json_string.encode('utf-8')).hexdigest()
def fetch_bundle_data(url: str) -> dict:
"""Lädt die Detailseite eines Bundles und extrahiert den JSON-Inhalt aus dem <script>-Tag 'webpack-bundle-page-data'."""
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", {"id": "webpack-bundle-page-data", "type": "application/json"})
if not script_tag:
raise ValueError("Kein JSON-Datenblock 'webpack-bundle-page-data' gefunden auf der Detailseite!")
return json.loads(script_tag.string)
def process_bundle(session, url: str):
"""
Verarbeitet ein einzelnes Bundle:
- Lädt die Detailseite und extrahiert den JSON-Datensatz (bundleData)
- Berechnet einen Hash des relevanten Datenbereichs
- Vergleicht mit der letzten Version in der DB und speichert bei Änderung eine neue Version
- Speichert Verkaufszahlen in einer separaten Tabelle
"""
try:
data = fetch_bundle_data(url)
except Exception as e:
print(f"Fehler beim Laden der Bundle-Daten von {url}: {e}")
return
bundle_data = data.get("bundleData", {})
machine_name = bundle_data.get("machine_name", "")
human_name = bundle_data.get("human_name", "")
# Relevanter Datenausschnitt hier nehmen wir das gesamte bundleData
relevant_data = bundle_data
new_hash = calculate_hash(relevant_data)
# Suche, ob das Bundle bereits existiert (über machine_name)
bundle = session.query(Bundle).filter_by(machine_name=machine_name).first()
if not bundle:
bundle = Bundle(machine_name=machine_name, human_name=human_name)
session.add(bundle)
session.commit()
# Hole die aktuellste Version dieses Bundles (nach timestamp sortiert)
latest_version = (session.query(BundleVersion)
.filter_by(bundle_id=bundle.id)
.order_by(BundleVersion.timestamp.desc())
.first())
if latest_version is None or latest_version.version_hash != new_hash:
new_version = BundleVersion(
bundle_id=bundle.id,
version_hash=new_hash,
version_data=json.dumps(relevant_data, sort_keys=True, ensure_ascii=False)
)
session.add(new_version)
session.commit()
bundle.current_version_id = new_version.id
session.commit()
print(f"Neue Version für Bundle '{human_name}' gespeichert.")
else:
print(f"Bundle '{human_name}' hat sich nicht geändert.")
# Verkaufszahlen extrahieren hier wird angenommen, dass sie entweder direkt in bundleData
# oder unter basic_data stehen, im Feld "bundles_sold|decimal"
bundles_sold = bundle_data.get("bundles_sold|decimal")
if bundles_sold is None:
bundles_sold = bundle_data.get("basic_data", {}).get("bundles_sold|decimal")
if bundles_sold is not None:
try:
sales_value = float(bundles_sold)
sales_entry = BundleSalesHistory(
bundle_id=bundle.id,
bundles_sold=sales_value
)
session.add(sales_entry)
session.commit()
print(f"Verkaufszahlen für Bundle '{human_name}' aktualisiert: {sales_value}")
except Exception as e:
print(f"Fehler beim Speichern der Verkaufszahlen für {human_name}: {e}")
def get_bundle_urls(overview_url: str) -> list:
"""
Ruft die Übersichtsseite ab und extrahiert alle Bundle-URLs aus dem JSON-Datenblock
im <script>-Tag mit der ID "landingPage-json-data". Dies ist zuverlässig, da die
Bundles dort dynamisch eingebunden werden.
"""
response = requests.get(overview_url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
bundle_urls = []
landing_script = soup.find("script", {"id": "landingPage-json-data", "type": "application/json"})
if landing_script:
landing_data = json.loads(landing_script.string)
# Wir erwarten, dass in landing_data["data"] die Kategorien "books", "games", "software" enthalten sind.
for category in ["books", "games", "software"]:
cat_data = landing_data.get("data", {}).get(category, {})
for section in cat_data.get("mosaic", []):
for product in section.get("products", []):
# Annahme: Der Link steht im Feld "product_url"
url = product.get("product_url", "")
if url:
full_url = requests.compat.urljoin(overview_url, url)
bundle_urls.append(full_url)
else:
print("Kein JSON-Datenblock 'landingPage-json-data' auf der Übersichtsseite gefunden.")
return bundle_urls
def main():
# Datenbank-Verbindung (hier SQLite)
engine = create_engine('sqlite:///bundles.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# URL der Bundles-Übersichtsseite
overview_url = "https://www.humblebundle.com/bundles"
try:
bundle_urls = get_bundle_urls(overview_url)
if not bundle_urls:
print("Keine Bundle-URLs gefunden! Überprüfe den JSON-Datenblock oder den Selektor in get_bundle_urls().")
return
except Exception as e:
print(f"Fehler beim Abrufen der Übersichtsseite: {e}")
return
# Verarbeite alle gefundenen Bundle-URLs
for url in bundle_urls:
print(f"Verarbeite Bundle: {url}")
try:
process_bundle(session, url)
except Exception as e:
print(f"Fehler beim Verarbeiten von {url}: {e}")
if __name__ == "__main__":
main()