136 lines
4.7 KiB
Python
136 lines
4.7 KiB
Python
import base64
|
|
import requests as requests
|
|
from Crypto.Cipher import Blowfish
|
|
import json
|
|
from mysql import *
|
|
|
|
|
|
with open("config.json", "r") as fp:
|
|
config = json.load(fp)
|
|
|
|
|
|
class Europapark:
|
|
def __init__(self):
|
|
self.app_id = "1:265494593072:android:39d736caef618dd96677f5" # check apk:/res/strings/values.xml
|
|
self.api_key = "AIzaSyAULLQ9uZses2UXH3YLYbjTj1-OVBPt3tY" # check apk:/res/strings/values.xml
|
|
self.encryption_key = b"4{^Kvep=%yah^r,k" # decompile app: search for Blowfish.INSTANCE.KEY
|
|
self.encryption_iv = b"abcdefgh" # decompile app: search for Blowfish.decrypt
|
|
self.local_data = Europapark.load_local_europapark_data() # copy apk:/assets/v2/fr/latest_package.json
|
|
self.token = None
|
|
|
|
def get_short_appid(self):
|
|
return self.app_id.split(":")[1]
|
|
|
|
def get_remoteconfig(self):
|
|
short_appid = self.get_short_appid()
|
|
r = requests.post(
|
|
f"https://firebaseremoteconfig.googleapis.com/v1/projects/{short_appid}/namespaces/firebase:fetch?key={self.api_key}",
|
|
json={
|
|
"appId": self.app_id,
|
|
"appInstanceId": "PROD"
|
|
},
|
|
headers={
|
|
"Content-Type": "application/json"
|
|
})
|
|
r.raise_for_status()
|
|
return r.json()["entries"]
|
|
|
|
def decrypt_remoteconfig(self, remoteconfig: dict):
|
|
decrypted = {}
|
|
for remoteconfig_key in remoteconfig:
|
|
remoteconfig_value = remoteconfig[remoteconfig_key]
|
|
decoded_b64 = base64.b64decode(remoteconfig_value)
|
|
|
|
cipher = Blowfish.new(self.encryption_key, Blowfish.MODE_CBC, iv=self.encryption_iv)
|
|
clear = cipher.decrypt(decoded_b64)
|
|
clear = clear.decode("utf8")
|
|
clear = Europapark.pkcs5padding(clear).replace("\x04", "").strip()
|
|
decrypted[remoteconfig_key] = clear
|
|
return decrypted
|
|
|
|
@staticmethod
|
|
def load_local_europapark_data():
|
|
with open("latest_package.json", "r") as fp:
|
|
return json.load(fp)
|
|
|
|
@staticmethod
|
|
def pkcs5padding(s: str):
|
|
byte_num = len(s)
|
|
packing_length = 8 - byte_num % 8
|
|
if packing_length == 8:
|
|
return s
|
|
else:
|
|
appendage = chr(packing_length) * packing_length
|
|
return s + appendage
|
|
|
|
def get_credentials(self):
|
|
remote_config = self.decrypt_remoteconfig(self.get_remoteconfig())
|
|
username = remote_config["v3_live_exozet_api_username"]
|
|
password = remote_config["v3_live_exozet_api_password"]
|
|
return username, password
|
|
|
|
def get_token(self):
|
|
if self.token:
|
|
return self.token
|
|
else:
|
|
username, password = self.get_credentials()
|
|
r = requests.post("https://tickets.mackinternational.de/api/v1/login_check",
|
|
json={
|
|
"username": username,
|
|
"password": password
|
|
})
|
|
r.raise_for_status()
|
|
return r.json()["token"]
|
|
|
|
@staticmethod
|
|
def _get_waiting_times(token):
|
|
r = requests.get("https://tickets.mackinternational.de/api/v1/waitingtimes",
|
|
headers={"JWTAuthorization": f"Bearer {token}"})
|
|
r.raise_for_status()
|
|
return r.json()["waitingtimes"]
|
|
|
|
def get_attraction_from_code(self, code):
|
|
return list(filter(lambda e: e["code"] == code, self.local_data["package"]["data"]["pois"]))[0]
|
|
|
|
def get_waiting_times(self):
|
|
token = self.get_token()
|
|
result = {}
|
|
waiting_times = self._get_waiting_times(token)
|
|
for waiting_time in waiting_times:
|
|
attr_code = waiting_time["code"]
|
|
attr_time = waiting_time["time"]
|
|
result[attr_code] = attr_time
|
|
return result
|
|
|
|
|
|
def setup_database():
|
|
return MySQL(hostname=config["db_host"], username=config["db_user"], password=config["db_password"], database=config["db_database"])
|
|
|
|
|
|
def fill_codes_in_db(ep: Europapark):
|
|
db = setup_database()
|
|
for item in ep.local_data["package"]["data"]["pois"]:
|
|
if not item["code"]:
|
|
continue
|
|
db.query("REPLACE INTO codes VALUES (%s, %s, %s, %s, %s, %s, %s, %s)", (
|
|
item["code"], item["name"], item["type"], item["latitude"], item["longitude"], item["descriptionMarkup"],
|
|
item["image"]["reference"], item["scopes"][0])
|
|
)
|
|
|
|
|
|
def fill_waitingtimes_in_db(ep: Europapark):
|
|
wt = ep.get_waiting_times()
|
|
db = setup_database()
|
|
for code in wt:
|
|
waiting_time = wt[code]
|
|
db.query("REPLACE INTO waitingtimes VALUES (%s, NOW(), %s)", (code, waiting_time))
|
|
|
|
|
|
def main():
|
|
ep = Europapark()
|
|
fill_waitingtimes_in_db(ep)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|