176 lines
5.6 KiB
Python
176 lines
5.6 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import socket
|
|
import smtplib
|
|
from datetime import datetime, timezone
|
|
from email.message import EmailMessage
|
|
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
|
|
import requests
|
|
|
|
EPIC_ENDPOINTS = [
|
|
# manchmal zuverlässiger als die ipv4-subdomain
|
|
"https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions",
|
|
"https://store-site-backend-static-ipv4.ak.epicgames.com/freeGamesPromotions",
|
|
]
|
|
|
|
def env(name: str, default: str | None = None, required: bool = False) -> str:
|
|
val = os.getenv(name, default)
|
|
if required and (val is None or val == ""):
|
|
raise SystemExit(f"Missing required env var: {name}")
|
|
return val
|
|
|
|
def fetch_json_with_retry(locale="de-DE", country="DE", retries=3, timeout=20):
|
|
params = {"locale": locale, "country": country, "allowCountries": country}
|
|
headers = {"Accept": "application/json"}
|
|
|
|
last_err = None
|
|
for base in EPIC_ENDPOINTS:
|
|
for attempt in range(1, retries + 1):
|
|
try:
|
|
r = requests.get(base, params=params, headers=headers, timeout=timeout)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
except Exception as e:
|
|
last_err = e
|
|
# kleines Backoff
|
|
time.sleep(1.0 * attempt)
|
|
|
|
raise RuntimeError(f"Epic fetch failed after retries. Last error: {last_err}")
|
|
|
|
def is_active_window(window_obj: dict, now: datetime) -> bool:
|
|
try:
|
|
start = datetime.fromisoformat(window_obj["startDate"].replace("Z", "+00:00"))
|
|
end = datetime.fromisoformat(window_obj["endDate"].replace("Z", "+00:00"))
|
|
return start <= now <= end
|
|
except Exception:
|
|
return False
|
|
|
|
def extract_free_games(epic_json: dict, locale="de-DE"):
|
|
# Pfad: data.Catalog.searchStore.elements
|
|
elements = (
|
|
epic_json.get("data", {})
|
|
.get("Catalog", {})
|
|
.get("searchStore", {})
|
|
.get("elements", [])
|
|
)
|
|
|
|
now = datetime.now(timezone.utc)
|
|
free_now = []
|
|
|
|
for e in elements:
|
|
promos = e.get("promotions") or {}
|
|
blocks = promos.get("promotionalOffers") or []
|
|
windows = []
|
|
for b in blocks:
|
|
windows.extend(b.get("promotionalOffers") or [])
|
|
|
|
active = [w for w in windows if is_active_window(w, now)]
|
|
if not active:
|
|
continue
|
|
|
|
slug = e.get("productSlug") or e.get("urlSlug")
|
|
store_url = None
|
|
if slug:
|
|
store_url = f"https://store.epicgames.com/{locale.lower()}/p/{slug}"
|
|
|
|
free_now.append({
|
|
"title": e.get("title"),
|
|
"until": active[0].get("endDate"),
|
|
"store_url": store_url,
|
|
})
|
|
|
|
# sortiere nach Titel (optional)
|
|
free_now.sort(key=lambda x: (x["title"] or "").lower())
|
|
return free_now
|
|
|
|
def format_email_body(free_games: list[dict]) -> tuple[str, str]:
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
if not free_games:
|
|
subject = "Epic Games: Keine aktuellen Gratis-Spiele"
|
|
text = f"Stand: {now}\n\nAktuell wurden keine Gratis-Spiele gefunden."
|
|
html = f"<p><strong>Stand:</strong> {now}</p><p>Aktuell wurden keine Gratis-Spiele gefunden.</p>"
|
|
return subject, text, html
|
|
|
|
subject = f"Epic Games: {len(free_games)} Gratis-Spiel(e) verfügbar"
|
|
lines = [f"Stand: {now}", "", "Aktuelle Gratis-Spiele:"]
|
|
for g in free_games:
|
|
lines.append(f"- {g['title']} (bis {g['until']})")
|
|
if g["store_url"]:
|
|
lines.append(f" {g['store_url']}")
|
|
text = "\n".join(lines)
|
|
|
|
items_html = ""
|
|
for g in free_games:
|
|
title = (g["title"] or "").replace("&", "&").replace("<", "<").replace(">", ">")
|
|
until = (g["until"] or "")
|
|
if g["store_url"]:
|
|
items_html += f"<li><a href=\"{g['store_url']}\">{title}</a> <small>(bis {until})</small></li>"
|
|
else:
|
|
items_html += f"<li>{title} <small>(bis {until})</small></li>"
|
|
|
|
html = f"""
|
|
<p><strong>Stand:</strong> {now}</p>
|
|
<p>Aktuelle Gratis-Spiele:</p>
|
|
<ul>
|
|
{items_html}
|
|
</ul>
|
|
"""
|
|
return subject, text, html
|
|
|
|
def send_email(subject: str, text: str, html: str):
|
|
smtp_host = env("SMTP_HOST", required=True)
|
|
smtp_port = int(env("SMTP_PORT", "587"))
|
|
smtp_user = env("SMTP_USER", required=True)
|
|
smtp_pass = env("SMTP_PASS", required=True)
|
|
|
|
mail_from = env("MAIL_FROM", smtp_user)
|
|
mail_to = env("MAIL_TO", required=True)
|
|
|
|
use_tls = env("SMTP_TLS", "true").lower() in ("1", "true", "yes", "on")
|
|
|
|
msg = EmailMessage()
|
|
msg["Subject"] = subject
|
|
msg["From"] = mail_from
|
|
msg["To"] = mail_to
|
|
msg.set_content(text)
|
|
msg.add_alternative(html, subtype="html")
|
|
|
|
with smtplib.SMTP(smtp_host, smtp_port, timeout=30) as server:
|
|
server.ehlo()
|
|
if use_tls:
|
|
server.starttls()
|
|
server.ehlo()
|
|
server.login(smtp_user, smtp_pass)
|
|
server.send_message(msg)
|
|
|
|
def main():
|
|
locale = env("EPIC_LOCALE", "de-DE")
|
|
country = env("EPIC_COUNTRY", "DE")
|
|
|
|
try:
|
|
data = fetch_json_with_retry(locale=locale, country=country)
|
|
free_games = extract_free_games(data, locale=locale)
|
|
subject, text, html = format_email_body(free_games)
|
|
send_email(subject, text, html)
|
|
print(f"OK: Sent mail ({len(free_games)} freebies).")
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"HTTP error: {e}", file=sys.stderr)
|
|
sys.exit(2)
|
|
except (socket.gaierror, RuntimeError) as e:
|
|
print(f"Network/DNS error: {e}", file=sys.stderr)
|
|
sys.exit(3)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|