import base64 import requests as requests from Crypto.Cipher import Blowfish import json from mysql import * with open("config.json", "r") as fp: config = json.load(fp) class Europapark: def __init__(self): self.app_id = "1:265494593072:android:39d736caef618dd96677f5" # check apk:/res/strings/values.xml self.api_key = "AIzaSyAULLQ9uZses2UXH3YLYbjTj1-OVBPt3tY" # check apk:/res/strings/values.xml self.encryption_key = b"4{^Kvep=%yah^r,k" # decompile app: search for Blowfish.INSTANCE.KEY self.encryption_iv = b"abcdefgh" # decompile app: search for Blowfish.decrypt self.local_data = Europapark.load_local_europapark_data() # copy apk:/assets/v2/fr/latest_package.json self.token = None def get_short_appid(self): return self.app_id.split(":")[1] def get_remoteconfig(self): short_appid = self.get_short_appid() r = requests.post( f"https://firebaseremoteconfig.googleapis.com/v1/projects/{short_appid}/namespaces/firebase:fetch?key={self.api_key}", json={ "appId": self.app_id, "appInstanceId": "PROD" }, headers={ "Content-Type": "application/json" }) r.raise_for_status() return r.json()["entries"] def decrypt_remoteconfig(self, remoteconfig: dict): decrypted = {} for remoteconfig_key in remoteconfig: remoteconfig_value = remoteconfig[remoteconfig_key] decoded_b64 = base64.b64decode(remoteconfig_value) cipher = Blowfish.new(self.encryption_key, Blowfish.MODE_CBC, iv=self.encryption_iv) clear = cipher.decrypt(decoded_b64) clear = clear.decode("utf8") clear = Europapark.pkcs5padding(clear).replace("\x04", "").strip() decrypted[remoteconfig_key] = clear return decrypted @staticmethod def load_local_europapark_data(): with open("latest_package.json", "r") as fp: return json.load(fp) @staticmethod def pkcs5padding(s: str): byte_num = len(s) packing_length = 8 - byte_num % 8 if packing_length == 8: return s else: appendage = chr(packing_length) * packing_length return s + appendage def get_credentials(self): remote_config = self.decrypt_remoteconfig(self.get_remoteconfig()) username = remote_config["v3_live_exozet_api_username"] password = remote_config["v3_live_exozet_api_password"] return username, password def get_token(self): if self.token: return self.token else: username, password = self.get_credentials() r = requests.post("https://tickets.mackinternational.de/api/v1/login_check", json={ "username": username, "password": password }) r.raise_for_status() return r.json()["token"] @staticmethod def _get_waiting_times(token): r = requests.get("https://tickets.mackinternational.de/api/v1/waitingtimes", headers={"JWTAuthorization": f"Bearer {token}"}) r.raise_for_status() return r.json()["waitingtimes"] def get_attraction_from_code(self, code): return list(filter(lambda e: e["code"] == code, self.local_data["package"]["data"]["pois"]))[0] def get_waiting_times(self): token = self.get_token() result = {} waiting_times = self._get_waiting_times(token) for waiting_time in waiting_times: attr_code = waiting_time["code"] attr_time = waiting_time["time"] result[attr_code] = attr_time return result def setup_database(): return MySQL(hostname=config["db_host"], username=config["db_user"], password=config["db_password"], database=config["db_database"]) def fill_codes_in_db(ep: Europapark): db = setup_database() for item in ep.local_data["package"]["data"]["pois"]: if not item["code"]: continue db.query("REPLACE INTO codes VALUES (%s, %s, %s, %s, %s, %s, %s, %s)", ( item["code"], item["name"], item["type"], item["latitude"], item["longitude"], item["descriptionMarkup"], item["image"]["reference"], item["scopes"][0]) ) def fill_waitingtimes_in_db(ep: Europapark): wt = ep.get_waiting_times() db = setup_database() for code in wt: waiting_time = wt[code] db.query("REPLACE INTO waitingtimes VALUES (%s, NOW(), %s)", (code, waiting_time)) def main(): ep = Europapark() fill_waitingtimes_in_db(ep) if __name__ == '__main__': main()