273 lines
8.7 KiB
Python
273 lines
8.7 KiB
Python
|
import base64
|
||
|
import datetime
|
||
|
import os.path
|
||
|
import pathlib
|
||
|
import socket
|
||
|
import zlib
|
||
|
import sys
|
||
|
from PySide6.QtCore import QThread, QTimer, Signal
|
||
|
from PySide6.QtWidgets import QApplication, QMainWindow, QFileDialog
|
||
|
from lib.enumresponse import EnumResponse
|
||
|
from lib.giantsenumresponseparser import GiantsEnumResponseParser
|
||
|
from lib.packet import Packet
|
||
|
from maps_gui import Ui_MainWindow
|
||
|
import requests
|
||
|
from lib.enumquery import EnumQuery
|
||
|
from lib.constants import AppGUID
|
||
|
import yaml
|
||
|
|
||
|
installed_maps = {}
|
||
|
API_BASE_URL = "https://gckmaps.hipstercat.fr"
|
||
|
SLEEP_TIME = 20 * 1000
|
||
|
CONFIG = {}
|
||
|
|
||
|
|
||
|
def get_ip():
|
||
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
|
try:
|
||
|
# doesn't even have to be reachable
|
||
|
s.connect(('10.255.255.255', 1))
|
||
|
ip = s.getsockname()[0]
|
||
|
except:
|
||
|
ip = '127.0.0.1'
|
||
|
finally:
|
||
|
s.close()
|
||
|
return ip
|
||
|
|
||
|
|
||
|
LOCAL_IP = get_ip()
|
||
|
|
||
|
|
||
|
class MainWindow(QMainWindow):
|
||
|
def __init__(self):
|
||
|
super(MainWindow, self).__init__()
|
||
|
self.ui = Ui_MainWindow()
|
||
|
self.ui.setupUi(self)
|
||
|
|
||
|
self.timer = QTimer(self)
|
||
|
self.timer.setInterval(SLEEP_TIME)
|
||
|
self.timer.timeout.connect(self.timer_timeout)
|
||
|
|
||
|
self.background_task = MapsBackground()
|
||
|
self.background_task.finished.connect(self.background_task_finished)
|
||
|
self.background_task.msg.connect(self.message_received)
|
||
|
|
||
|
if CONFIG["automatic_download"]:
|
||
|
self.ui.automaticMapDownloadCheckBox.setChecked(True)
|
||
|
self.timer.start()
|
||
|
self.log("Background task started")
|
||
|
else:
|
||
|
self.ui.automaticMapDownloadCheckBox.setChecked(False)
|
||
|
self.ui.automaticMapDownloadCheckBox.stateChanged.connect(self.checkboxchanged)
|
||
|
self.ui.pushButton.clicked.connect(self.btnclicked)
|
||
|
|
||
|
def timer_timeout(self):
|
||
|
self.run_background_task()
|
||
|
|
||
|
def run_background_task(self):
|
||
|
if not self.background_task.isRunning():
|
||
|
self.ui.pushButton.setEnabled(False)
|
||
|
self.background_task.start()
|
||
|
|
||
|
def background_task_finished(self):
|
||
|
self.ui.pushButton.setEnabled(True)
|
||
|
|
||
|
def checkboxchanged(self):
|
||
|
checked = self.ui.automaticMapDownloadCheckBox.isChecked()
|
||
|
if checked:
|
||
|
self.timer.start()
|
||
|
self.log("Background task started")
|
||
|
else:
|
||
|
self.timer.stop()
|
||
|
self.log("Background task stopped")
|
||
|
CONFIG["automatic_download"] = checked
|
||
|
save_configuration()
|
||
|
|
||
|
def btnclicked(self):
|
||
|
self.run_background_task()
|
||
|
|
||
|
def message_received(self, m):
|
||
|
self.log(m)
|
||
|
|
||
|
def log(self, message: str) -> None:
|
||
|
full_msg = f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {message}"
|
||
|
self.ui.logEdit.appendPlainText(full_msg)
|
||
|
print(full_msg)
|
||
|
|
||
|
|
||
|
def get_maps_dir() -> pathlib.Path:
|
||
|
return get_giants_directory() / "Bin" / "Worlds"
|
||
|
|
||
|
|
||
|
def load_installed_maps() -> None:
|
||
|
maps_dir = get_maps_dir()
|
||
|
|
||
|
def map_crc(map_file):
|
||
|
prev = 0
|
||
|
with open(map_file, "rb") as f:
|
||
|
b = f.read()
|
||
|
prev = zlib.crc32(b, prev)
|
||
|
return prev & 0xffffffff
|
||
|
|
||
|
installed_maps.clear()
|
||
|
for map_path in os.listdir(maps_dir):
|
||
|
if map_path.endswith(".cache") or map_path.endswith(".gck"):
|
||
|
map_path_s = pathlib.Path(maps_dir / map_path)
|
||
|
crc = map_crc(map_path_s)
|
||
|
installed_maps[crc] = map_path_s
|
||
|
|
||
|
|
||
|
def config_file_path() -> pathlib.Path:
|
||
|
datadir = get_datadir() / "giants-maps-downloader"
|
||
|
config_file = datadir / "config.yml"
|
||
|
try:
|
||
|
datadir.mkdir(parents=True)
|
||
|
except FileExistsError:
|
||
|
pass
|
||
|
return config_file
|
||
|
|
||
|
|
||
|
def read_configuration():
|
||
|
config_file = config_file_path()
|
||
|
global CONFIG
|
||
|
if not os.path.exists(config_file):
|
||
|
# create it
|
||
|
giants_exe_path, _filename = QFileDialog.getOpenFileName(None, caption="Open Giants.exe", filter="Giants.exe")
|
||
|
giants_dir = os.path.dirname(giants_exe_path)
|
||
|
CONFIG = {"giants_directory": giants_dir, "automatic_download": True}
|
||
|
save_configuration()
|
||
|
|
||
|
print(f"Config file: {config_file}")
|
||
|
with open(config_file, "r") as fp:
|
||
|
CONFIG = yaml.load(fp, Loader=yaml.Loader)
|
||
|
|
||
|
|
||
|
def save_configuration():
|
||
|
config_file = config_file_path()
|
||
|
with open(config_file, "w") as fp:
|
||
|
yaml.dump(CONFIG, fp)
|
||
|
print(f"Wrote configuration file to {config_file}")
|
||
|
|
||
|
|
||
|
def get_giants_directory() -> pathlib.Path:
|
||
|
return pathlib.Path(CONFIG["giants_directory"])
|
||
|
|
||
|
|
||
|
def get_datadir() -> pathlib.Path:
|
||
|
home = pathlib.Path.home()
|
||
|
if sys.platform == "win32":
|
||
|
return home / "AppData/Roaming"
|
||
|
elif sys.platform == "linux":
|
||
|
return home / ".local/share"
|
||
|
elif sys.platform == "darwin":
|
||
|
return home / "Library/Application Support"
|
||
|
|
||
|
|
||
|
class MapsBackground(QThread):
|
||
|
msg = Signal(str)
|
||
|
|
||
|
def run(self) -> None:
|
||
|
self.log("Background task starting")
|
||
|
try:
|
||
|
self.server_up_or_dl_missing_map(LOCAL_IP, 19711, True, timeout=2)
|
||
|
except:
|
||
|
self.log("No local server found")
|
||
|
self.all_servers_download()
|
||
|
|
||
|
def log(self, message: str):
|
||
|
self.msg.emit(message)
|
||
|
|
||
|
def server_up_or_dl_missing_map(self, server_ip: str, server_port: int, is_local: bool, timeout=5) -> None:
|
||
|
enum_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
|
enum_socket.setblocking(True)
|
||
|
enum_socket.settimeout(timeout)
|
||
|
|
||
|
eq = EnumQuery()
|
||
|
eq.Type = EnumQuery.HAS_APPLICATION_GUID
|
||
|
eq.ApplicationGUID = AppGUID.Release
|
||
|
p = eq.to_packet().getvalue()
|
||
|
enum_socket.sendto(p, (server_ip, server_port))
|
||
|
|
||
|
response = enum_socket.recv(1024)
|
||
|
enumresp = EnumResponse.parse(Packet(response))
|
||
|
giantsenumresp = GiantsEnumResponseParser.parse(enumresp)
|
||
|
if is_local:
|
||
|
self.upload_map(installed_maps[giantsenumresp.map_checksum])
|
||
|
elif giantsenumresp.map_checksum not in installed_maps:
|
||
|
self.log(f"Map {giantsenumresp.map_name} not found locally, downloading it")
|
||
|
self.download_map(giantsenumresp.map_checksum)
|
||
|
|
||
|
def download_map(self, crc: int) -> None:
|
||
|
self.log(f"Downloading map {API_BASE_URL}/map?crc={crc}")
|
||
|
try:
|
||
|
req = requests.get(f"{API_BASE_URL}/map?crc={crc}")
|
||
|
except Exception as e:
|
||
|
self.log(f"Error while fetching map info: {e}")
|
||
|
return
|
||
|
if req.status_code != 200:
|
||
|
self.log(f"Could not download map from server, got status_code {req.status_code}")
|
||
|
return
|
||
|
j = req.json()
|
||
|
blob_location = j["blob_location"]
|
||
|
|
||
|
final_map_file = get_maps_dir() / j["name"]
|
||
|
|
||
|
with requests.get(blob_location, stream=True) as stream:
|
||
|
with open(final_map_file, 'wb') as f:
|
||
|
for chunk in stream.iter_content(chunk_size=8192):
|
||
|
f.write(chunk)
|
||
|
|
||
|
load_installed_maps()
|
||
|
self.log(f"Download succesful: {final_map_file}")
|
||
|
|
||
|
def upload_map(self, map_path: pathlib.Path) -> None:
|
||
|
with open(map_path, "rb") as fp:
|
||
|
content_bytes = fp.read()
|
||
|
|
||
|
def crc(map_content_bytes):
|
||
|
prev = zlib.crc32(map_content_bytes, 0)
|
||
|
return prev & 0xffffffff
|
||
|
|
||
|
map_crc = crc(content_bytes)
|
||
|
|
||
|
# check if map is known by server
|
||
|
self.log(f"Trying {API_BASE_URL}/map?crc={map_crc}")
|
||
|
try:
|
||
|
req = requests.get(f"{API_BASE_URL}/map?crc={map_crc}")
|
||
|
except Exception as e:
|
||
|
self.log(f"Error while fetching map info: {e}")
|
||
|
return
|
||
|
if req.status_code == 200:
|
||
|
self.log(f"API already knows this map, got response {req.status_code}")
|
||
|
return
|
||
|
|
||
|
map_name = os.path.basename(map_path)
|
||
|
self.log(f"Local server found hosting map {map_name}, uploading it")
|
||
|
b64_data = base64.b64encode(content_bytes).decode("utf8")
|
||
|
map_data = {"name": map_name, "b64_data": b64_data}
|
||
|
try:
|
||
|
req = requests.post(f"{API_BASE_URL}/maps", json=map_data)
|
||
|
req.raise_for_status()
|
||
|
except Exception as e:
|
||
|
self.log(f"There was an error while uploading map: {e}")
|
||
|
return
|
||
|
self.log("Upload successful")
|
||
|
|
||
|
def all_servers_download(self) -> None:
|
||
|
all_servers = requests.get("https://giants.azurewebsites.net/api/Servers").json()
|
||
|
for server in all_servers:
|
||
|
try:
|
||
|
self.server_up_or_dl_missing_map(server["hostIpAddress"], server["port"], False)
|
||
|
except socket.timeout:
|
||
|
self.log(f"Could not connect to {server['hostIpAddress']}")
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
app = QApplication(sys.argv)
|
||
|
app.setQuitOnLastWindowClosed(True)
|
||
|
read_configuration()
|
||
|
load_installed_maps()
|
||
|
window = MainWindow()
|
||
|
window.show()
|
||
|
sys.exit(app.exec())
|