umstellung auf modulare klassen und dateien

This commit is contained in:
Czechman 2025-02-16 15:14:03 +01:00
parent be49c2a6e7
commit 3897ee1a15
2 changed files with 54 additions and 272 deletions

View File

@ -1,282 +1,19 @@
#!/usr/bin/env python3 # bundle_checker.py
import requests
import json
import hashlib
import logging import logging
import difflib from bundle_parser import BundleParser
from datetime import datetime
from bs4 import BeautifulSoup
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, Text
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
# Konfiguriere Logging ändere das Level bei Bedarf (DEBUG, INFO, WARNING, ERROR)
DEBUG_LEVEL = logging.DEBUG
logging.basicConfig(
level=DEBUG_LEVEL,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Basis-Klasse für SQLAlchemy-Modelle (2.0-konform)
Base = declarative_base()
# ---------------------------
# Datenbank-Modelle
# ---------------------------
class Bundle(Base):
__tablename__ = 'bundles'
id = Column(Integer, primary_key=True)
machine_name = Column(String, unique=True)
human_name = Column(String) # Kann leer bleiben, wenn nicht extrahiert
current_version_id = Column(Integer, ForeignKey('bundle_versions.id'))
# Beziehung zur aktuellen Version (post_update=True hilft bei zirkulären Abhängigkeiten)
current_version = relationship("BundleVersion", uselist=False,
foreign_keys=[current_version_id],
post_update=True)
# Alle Versionen (historisch)
versions = relationship("BundleVersion", back_populates="bundle",
foreign_keys=lambda: [BundleVersion.bundle_id])
# Verkaufshistorie
sales_history = relationship("BundleSalesHistory", back_populates="bundle")
# Einzelne Elemente des Bundles (z.B. enthaltene Bücher, Spiele etc.)
items = relationship("BundleItem", back_populates="bundle")
class BundleVersion(Base):
__tablename__ = 'bundle_versions'
id = Column(Integer, primary_key=True)
bundle_id = Column(Integer, ForeignKey('bundles.id'))
version_hash = Column(String)
version_data = Column(Text) # JSON-Daten als String
timestamp = Column(DateTime, default=datetime.utcnow)
# Explizite Angabe des Fremdschlüssels
bundle = relationship("Bundle", back_populates="versions", foreign_keys=[bundle_id])
class BundleSalesHistory(Base):
__tablename__ = 'bundle_sales_history'
id = Column(Integer, primary_key=True)
bundle_id = Column(Integer, ForeignKey('bundles.id'))
bundles_sold = Column(Float)
timestamp = Column(DateTime, default=datetime.utcnow)
bundle = relationship("Bundle", back_populates="sales_history")
class BundleItem(Base):
__tablename__ = 'bundle_items'
id = Column(Integer, primary_key=True)
bundle_id = Column(Integer, ForeignKey('bundles.id'))
title = Column(String) # Titel des Elements (z.B. Buch- oder Spielname)
category = Column(String) # Kategorie, z.B. "book", "game" oder "software"
details = Column(Text) # Optionale Detaildaten als JSON-String
bundle = relationship("Bundle", back_populates="items")
# ---------------------------
# Hilfsfunktionen
# ---------------------------
def calculate_hash(data: dict) -> str:
"""Berechnet einen SHA-256-Hash aus dem sortierten JSON-String der Daten."""
json_string = json.dumps(data, sort_keys=True, ensure_ascii=False)
hash_value = hashlib.sha256(json_string.encode('utf-8')).hexdigest()
logger.debug(f"Berechneter Hash: {hash_value}")
return hash_value
def log_diff(old_data: dict, new_data: dict):
"""
Vergleicht zwei JSON-Daten (als dict) und gibt einen unified diff als String aus.
Nur im DEBUG-Level wird der detaillierte Vergleich ausgegeben.
"""
old_str = json.dumps(old_data, sort_keys=True, indent=4, ensure_ascii=False).splitlines()
new_str = json.dumps(new_data, sort_keys=True, indent=4, ensure_ascii=False).splitlines()
diff = difflib.unified_diff(old_str, new_str, fromfile="alte_version", tofile="neue_version", lineterm="")
diff_text = "\n".join(diff)
logger.debug("Unterschiede zwischen den Versionen:\n" + diff_text)
def fetch_bundle_data(url: str) -> dict:
"""
Lädt die Detailseite eines Bundles und extrahiert den JSON-Inhalt aus dem <script>-Tag
mit id="webpack-bundle-page-data" (diese Seite enthält die detaillierten Bundle-Daten).
"""
logger.info(f"Rufe Bundle-Daten von {url} ab...")
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", {"id": "webpack-bundle-page-data", "type": "application/json"})
if not script_tag:
logger.error("Kein JSON-Datenblock 'webpack-bundle-page-data' gefunden!")
raise ValueError("Kein JSON-Datenblock gefunden!")
data = json.loads(script_tag.string)
logger.debug(f"Erhaltener JSON-Block (gekürzt): {str(data)[:200]} ...")
return data
def process_bundle(session, url: str):
"""
Verarbeitet ein einzelnes Bundle:
- Lädt die Detailseite und extrahiert den JSON-Datensatz (bundleData)
- Berechnet einen Hash des relevanten Datenbereichs
- Vergleicht mit der letzten Version in der DB und speichert bei Änderung eine neue Version
- Speichert Verkaufszahlen in einer separaten Tabelle
- Extrahiert und speichert, falls vorhanden, einzelne Bundle-Elemente
"""
try:
data = fetch_bundle_data(url)
except Exception as e:
logger.error(f"Fehler beim Laden der Bundle-Daten von {url}: {e}")
return
# Es wird angenommen, dass die Bundle-Daten unter dem Schlüssel "bundleData" stehen.
bundle_data = data.get("bundleData", {})
machine_name = bundle_data.get("machine_name", "")
human_name = bundle_data.get("human_name", "")
logger.info(f"Verarbeite Bundle '{human_name}' (machine_name: {machine_name})")
# Definiere hier den relevanten Datenausschnitt (anpassen, falls nötig)
relevant_data = bundle_data
new_hash = calculate_hash(relevant_data)
# Suche, ob das Bundle bereits existiert
bundle = session.query(Bundle).filter_by(machine_name=machine_name).first()
if not bundle:
logger.info(f"Neues Bundle gefunden: '{human_name}' (machine_name: {machine_name})")
bundle = Bundle(machine_name=machine_name, human_name=human_name)
session.add(bundle)
session.commit()
else:
logger.debug(f"Bundle '{human_name}' existiert bereits (ID: {bundle.id})")
# Vergleiche mit der aktuellsten Version
latest_version = (session.query(BundleVersion)
.filter_by(bundle_id=bundle.id)
.order_by(BundleVersion.timestamp.desc())
.first())
if latest_version is None or latest_version.version_hash != new_hash:
if latest_version:
if logger.isEnabledFor(logging.DEBUG):
try:
old_data = json.loads(latest_version.version_data)
log_diff(old_data, relevant_data)
except Exception as ex:
logger.debug(f"Fehler beim Diff-Vergleich: {ex}")
else:
logger.info("Hashabweichung festgestellt es gibt Änderungen in den Bundle-Daten.")
else:
logger.info("Keine vorherige Version gefunden neues Bundle wird gespeichert.")
new_version = BundleVersion(
bundle_id=bundle.id,
version_hash=new_hash,
version_data=json.dumps(relevant_data, sort_keys=True, ensure_ascii=False)
)
session.add(new_version)
session.commit()
bundle.current_version_id = new_version.id
session.commit()
else:
logger.info(f"Bundle '{human_name}' hat sich nicht geändert.")
# Verkaufszahlen extrahieren hier wird angenommen, dass sie unter einem entsprechenden Schlüssel stehen
bundles_sold = bundle_data.get("bundles_sold|decimal")
if bundles_sold is None:
bundles_sold = bundle_data.get("basic_data", {}).get("bundles_sold|decimal")
if bundles_sold is not None:
try:
sales_value = float(bundles_sold)
sales_entry = BundleSalesHistory(
bundle_id=bundle.id,
bundles_sold=sales_value
)
session.add(sales_entry)
session.commit()
logger.info(f"Verkaufszahlen für Bundle '{human_name}' aktualisiert: {sales_value}")
except Exception as e:
logger.error(f"Fehler beim Speichern der Verkaufszahlen für '{human_name}': {e}")
else:
logger.debug(f"Keine Verkaufszahlen für Bundle '{human_name}' gefunden.")
# ---------------------------
# Extraktion der Bundle-Elemente
# ---------------------------
# Hier wird angenommen, dass in den Bundle-Daten unter "items" eine Liste der enthaltenen Produkte steht.
items = bundle_data.get("items", [])
if items:
logger.info(f"Es wurden {len(items)} Elemente im Bundle '{human_name}' gefunden.")
for item in items:
title = item.get("title", "Unbekannt")
category = item.get("category", "Unbekannt")
details = json.dumps(item, sort_keys=True, ensure_ascii=False)
# Prüfen, ob das Element bereits gespeichert wurde (z.B. anhand des Titels)
existing_item = session.query(BundleItem).filter_by(bundle_id=bundle.id, title=title).first()
if existing_item:
logger.debug(f"Element '{title}' im Bundle '{human_name}' existiert bereits Überspringe.")
else:
new_item = BundleItem(
bundle_id=bundle.id,
title=title,
category=category,
details=details
)
session.add(new_item)
logger.debug(f"Neues Element '{title}' im Bundle '{human_name}' wird gespeichert.")
session.commit()
else:
logger.debug(f"Keine Bundle-Elemente (items) im Bundle '{human_name}' gefunden.")
def get_bundle_urls(overview_url: str) -> list:
"""
Ruft die Übersichtsseite ab und extrahiert alle Bundle-URLs aus dem JSON-Datenblock im <script>-Tag 'landingPage-json-data'.
"""
logger.info(f"Rufe Übersichtsseite {overview_url} ab...")
response = requests.get(overview_url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
bundle_urls = []
landing_script = soup.find("script", {"id": "landingPage-json-data", "type": "application/json"})
if landing_script:
landing_data = json.loads(landing_script.string)
logger.debug(f"Landing Page JSON (gekürzt): {str(landing_data)[:200]} ...")
# Durchlaufe die Kategorien "books", "games" und "software"
for category in ["books", "games", "software"]:
cat_data = landing_data.get("data", {}).get(category, {})
for section in cat_data.get("mosaic", []):
for product in section.get("products", []):
url = product.get("product_url", "")
if url:
full_url = requests.compat.urljoin(overview_url, url)
bundle_urls.append(full_url)
else:
logger.warning("Kein JSON-Datenblock 'landingPage-json-data' auf der Übersichtsseite gefunden.")
logger.info(f"Gefundene Bundle-URLs: {len(bundle_urls)}")
return bundle_urls
def main(): def main():
engine = create_engine('sqlite:///bundles.db') url = "https://www.humblebundle.com/some-bundle-url"
Base.metadata.create_all(engine) parser = BundleParser(url, category="books")
Session = sessionmaker(bind=engine)
session = Session()
overview_url = "https://www.humblebundle.com/bundles"
try: try:
bundle_urls = get_bundle_urls(overview_url) bundle_data = parser.get_relevant_bundle_data()
if not bundle_urls: logger.info(f"Erhaltene Bundle-Daten: {bundle_data}")
logger.error("Keine Bundle-URLs gefunden! Überprüfe den JSON-Datenblock oder den Selektor in get_bundle_urls().") items = parser.parse_items()
return logger.info(f"Gefundene Items: {items}")
except Exception as e: except Exception as e:
logger.error(f"Fehler beim Abrufen der Übersichtsseite: {e}") logger.error(f"Fehler: {e}")
return
# Für jede Bundle-URL verarbeiten
for url in bundle_urls:
logger.info(f"Verarbeite Bundle: {url}")
try:
process_bundle(session, url)
except Exception as e:
logger.error(f"Fehler beim Verarbeiten von {url}: {e}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

45
bundle_parser.py Normal file
View File

@ -0,0 +1,45 @@
# bundle_parser.py
import requests
import json
from bs4 import BeautifulSoup
import logging
logger = logging.getLogger(__name__)
class BundleParser:
def __init__(self, url, category=None):
self.url = url
self.category = category
def fetch_data(self):
logger.info(f"Rufe Bundle-Daten von {self.url} ab...")
response = requests.get(self.url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", {"id": "webpack-bundle-page-data", "type": "application/json"})
if not script_tag:
logger.error("Kein JSON-Datenblock 'webpack-bundle-page-data' gefunden!")
raise ValueError("Kein JSON-Datenblock gefunden!")
data = json.loads(script_tag.string)
logger.debug(f"Erhaltener JSON-Block (gekürzt): {str(data)[:200]} ...")
return data
def get_relevant_bundle_data(self):
data = self.fetch_data()
return data.get("bundleData", {})
def parse_items(self):
bundle_data = self.get_relevant_bundle_data()
items = bundle_data.get("items", [])
parsed_items = []
for item in items:
title = item.get("title", "Unbekannt")
category = item.get("category", self.category if self.category else "Unbekannt")
details = json.dumps(item, sort_keys=True, ensure_ascii=False)
parsed_items.append({
"title": title,
"category": category,
"details": details
})
return parsed_items