europapark-exporter/src/europapark_exporter/europapark.py

104 lines
3.9 KiB
Python

import base64
import os.path
import requests as requests
from Crypto.Cipher import Blowfish
import json
class EuropaparkAPI:
def __init__(self):
self.app_id = "1:265494593072:android:39d736caef618dd96677f5" # check apk:/res/strings/values.xml
self.api_key = "AIzaSyAULLQ9uZses2UXH3YLYbjTj1-OVBPt3tY" # check apk:/res/strings/values.xml
self.encryption_key = b"4{^Kvep=%yah^r,k" # decompile app: search for Blowfish.INSTANCE.KEY
self.encryption_iv = b"abcdefgh" # decompile app: search for Blowfish.decrypt
self.local_data = EuropaparkAPI.load_local_europapark_data() # copy apk:/assets/v2/fr/latest_package.json
self.token = None
def get_short_appid(self):
return self.app_id.split(":")[1]
def get_remoteconfig(self):
short_appid = self.get_short_appid()
r = requests.post(
f"https://firebaseremoteconfig.googleapis.com/v1/projects/{short_appid}/namespaces/firebase:fetch?key={self.api_key}",
json={
"appId": self.app_id,
"appInstanceId": "PROD"
},
headers={
"Content-Type": "application/json"
})
r.raise_for_status()
return r.json()["entries"]
def decrypt_remoteconfig(self, remoteconfig: dict):
decrypted = {}
for remoteconfig_key in remoteconfig:
remoteconfig_value = remoteconfig[remoteconfig_key]
decoded_b64 = base64.b64decode(remoteconfig_value)
cipher = Blowfish.new(self.encryption_key, Blowfish.MODE_CBC, iv=self.encryption_iv)
clear = cipher.decrypt(decoded_b64)
clear = clear.decode("utf8")
clear = EuropaparkAPI.pkcs5padding(clear).replace("\x04", "").strip()
decrypted[remoteconfig_key] = clear
return decrypted
@staticmethod
def load_local_europapark_data():
latest_package_path = os.path.dirname(os.path.abspath(__file__)) + "/europapark_data/latest_package.json"
with open(latest_package_path, "r") as fp:
return json.load(fp)
@staticmethod
def pkcs5padding(s: str):
byte_num = len(s)
packing_length = 8 - byte_num % 8
if packing_length == 8:
return s
else:
appendage = chr(packing_length) * packing_length
return s + appendage
def get_credentials(self):
remote_config = self.decrypt_remoteconfig(self.get_remoteconfig())
username = remote_config["v3_live_exozet_api_username"]
password = remote_config["v3_live_exozet_api_password"]
return username, password
def get_token(self):
if self.token:
return self.token
else:
username, password = self.get_credentials()
r = requests.post("https://tickets.mackinternational.de/api/v1/login_check",
json={
"username": username,
"password": password
})
r.raise_for_status()
return r.json()["token"]
@staticmethod
def _get_waiting_times(token):
r = requests.get("https://tickets.mackinternational.de/api/v1/waitingtimes",
headers={"JWTAuthorization": f"Bearer {token}"})
r.raise_for_status()
return r.json()["waitingtimes"]
def get_attraction_from_code(self, code) -> dict | None:
attrs = list(filter(lambda e: e["code"] == code, self.local_data["package"]["data"]["pois"]))
if len(attrs) > 0:
return attrs[0]
return None
def get_waiting_times(self):
token = self.get_token()
result = {}
waiting_times = self._get_waiting_times(token)
for waiting_time in waiting_times:
attr_code = waiting_time["code"]
attr_time = waiting_time["time"]
result[attr_code] = attr_time
return result