humbleParser/bundle_checker.py

184 lines
7.3 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import requests
import json
import hashlib
from datetime import datetime
from bs4 import BeautifulSoup
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
# Basis-Klasse für SQLAlchemy-Modelle
Base = declarative_base()
# Tabelle für das Bundle (statische Identifikation)
class Bundle(Base):
__tablename__ = 'bundles'
id = Column(Integer, primary_key=True)
machine_name = Column(String, unique=True) # z.B. "linuxfrombeginnertoprofessionaloreilly_bookbundle"
human_name = Column(String)
current_version_id = Column(Integer, ForeignKey('bundle_versions.id'))
# Beziehung zur aktuellen Version
current_version = relationship("BundleVersion", uselist=False, foreign_keys=[current_version_id])
# Alle Versionen (historisch)
versions = relationship("BundleVersion", back_populates="bundle", foreign_keys='BundleVersion.bundle_id')
# Verkaufshistorie
sales_history = relationship("BundleSalesHistory", back_populates="bundle")
# Tabelle für Versionen eines Bundles
class BundleVersion(Base):
__tablename__ = 'bundle_versions'
id = Column(Integer, primary_key=True)
bundle_id = Column(Integer, ForeignKey('bundles.id'))
version_hash = Column(String) # SHA-256 Hash der relevanten Daten
version_data = Column(Text) # Alle relevanten Bundle-Daten als JSON-String
timestamp = Column(DateTime, default=datetime.utcnow)
bundle = relationship("Bundle", back_populates="versions")
# Tabelle für Verkaufshistorie (zur zeitlichen Analyse der Verkaufszahlen)
class BundleSalesHistory(Base):
__tablename__ = 'bundle_sales_history'
id = Column(Integer, primary_key=True)
bundle_id = Column(Integer, ForeignKey('bundles.id'))
bundles_sold = Column(Float)
timestamp = Column(DateTime, default=datetime.utcnow)
bundle = relationship("Bundle", back_populates="sales_history")
def calculate_hash(data: dict) -> str:
"""
Berechnet einen SHA-256 Hash aus dem JSON-String der relevanten Daten.
"""
json_string = json.dumps(data, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(json_string.encode('utf-8')).hexdigest()
def fetch_bundle_data(url: str) -> dict:
"""
Ruft die Bundle-Detailseite ab und extrahiert den JSON-Inhalt aus dem <script>-Tag mit der ID "webpack-bundle-page-data".
"""
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", {"id": "webpack-bundle-page-data", "type": "application/json"})
if not script_tag:
raise ValueError("Kein JSON-Datenblock gefunden auf der Detailseite!")
return json.loads(script_tag.string)
def process_bundle(session, url: str):
"""
Verarbeitet ein einzelnes Bundle:
- Lädt die Detailseite und extrahiert den JSON-Datensatz
- Berechnet einen Hash des relevanten Datenbereichs (hier: bundleData)
- Vergleicht mit der letzten Version in der DB und speichert bei Änderung eine neue Version
- Speichert Verkaufszahlen in einer separaten Tabelle
"""
try:
data = fetch_bundle_data(url)
except Exception as e:
print(f"Fehler beim Laden der Bundle-Daten von {url}: {e}")
return
bundle_data = data.get("bundleData", {})
machine_name = bundle_data.get("machine_name", "")
human_name = bundle_data.get("human_name", "")
# Definiere den relevanten Datenausschnitt; hier nehmen wir das gesamte bundleData
relevant_data = bundle_data
new_hash = calculate_hash(relevant_data)
# Suche, ob das Bundle bereits existiert (über machine_name)
bundle = session.query(Bundle).filter_by(machine_name=machine_name).first()
if not bundle:
bundle = Bundle(machine_name=machine_name, human_name=human_name)
session.add(bundle)
session.commit()
# Hole die aktuellste Version (nach timestamp sortiert)
latest_version = (session.query(BundleVersion)
.filter_by(bundle_id=bundle.id)
.order_by(BundleVersion.timestamp.desc())
.first())
# Wenn es noch keine Version gibt oder der Hash sich geändert hat, speichere eine neue Version
if latest_version is None or latest_version.version_hash != new_hash:
new_version = BundleVersion(
bundle_id=bundle.id,
version_hash=new_hash,
version_data=json.dumps(relevant_data, sort_keys=True, ensure_ascii=False)
)
session.add(new_version)
session.commit()
bundle.current_version_id = new_version.id
session.commit()
print(f"Neue Version für Bundle '{human_name}' gespeichert.")
else:
print(f"Bundle '{human_name}' hat sich nicht geändert.")
# Verkaufszahlen extrahieren Annahme: Verkaufszahlen stehen unter "bundles_sold|decimal" entweder direkt in bundleData oder in basic_data
bundles_sold = bundle_data.get("bundles_sold|decimal", None)
if bundles_sold is None:
bundles_sold = bundle_data.get("basic_data", {}).get("bundles_sold|decimal", None)
if bundles_sold is not None:
try:
sales_value = float(bundles_sold)
sales_entry = BundleSalesHistory(
bundle_id=bundle.id,
bundles_sold=sales_value
)
session.add(sales_entry)
session.commit()
print(f"Verkaufszahlen für Bundle '{human_name}' aktualisiert: {sales_value}")
except Exception as e:
print(f"Fehler beim Speichern der Verkaufszahlen für {human_name}: {e}")
def get_bundle_urls(overview_url: str) -> list:
"""
Ruft die Übersichtsseite ab und extrahiert alle Bundle-URLs.
Hier wird angenommen, dass Bundle-Links in <a>-Tags mit der CSS-Klasse "bundle-link" stehen.
Passe den Selektor ggf. an die tatsächliche Seitenstruktur an.
"""
response = requests.get(overview_url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
bundle_urls = []
# Beispiel: Alle <a>-Tags mit class="bundle-link"
for a in soup.find_all("a", class_="bundle-link"):
href = a.get("href")
if href:
full_url = requests.compat.urljoin(overview_url, href)
bundle_urls.append(full_url)
return bundle_urls
def main():
# Datenbank-Verbindung (hier SQLite)
engine = create_engine('sqlite:///bundles.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# URL der Übersichtsseite, von der alle Bundles erfasst werden sollen
overview_url = "https://www.humblebundle.com/bundles"
try:
bundle_urls = get_bundle_urls(overview_url)
if not bundle_urls:
print("Keine Bundle-URLs gefunden! Überprüfe den CSS-Selektor in get_bundle_urls().")
return
except Exception as e:
print(f"Fehler beim Abrufen der Übersichtseite: {e}")
return
# Iteriere über alle gefundenen Bundle-URLs und verarbeite jedes Bundle
for url in bundle_urls:
print(f"Verarbeite Bundle: {url}")
try:
process_bundle(session, url)
except Exception as e:
print(f"Fehler beim Verarbeiten von {url}: {e}")
if __name__ == "__main__":
main()