from giants.map import Map from giants.player import Player from dpnet.netserver import Netserver from dpnet.session import Session import socket import importlib import os from giants import Teams, GameTypes, ChatColor, ChatType from utils.logger import setup_logger import traceback import asyncio from aioconsole import ainput logger = setup_logger(__name__) class Server: def __init__(self, **kwargs): self.listen_ip = kwargs.get("ip", "0.0.0.0") self.listen_port = kwargs.get("port", 19711) self.register_with_ms = kwargs.get("register", False) self.teams = kwargs.get("teams", Teams.MvM) self.game_type = kwargs.get("gametype", GameTypes.TeamDeathmatchWithFullBase) self.currentmap = kwargs.get("map", Map("test3.gck")) self.maxplayers = kwargs.get("maxplayers", 20) self.name = kwargs.get("name", "Default Server Name") fake_session = Session(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) fake_session.ip = "127.0.0.1" fake_session.port = 3333 self.players = [Player("[Server]", fake_session)] fake_player = Session(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) fake_player.ip = "127.0.0.1" fake_player.port = 3334 self.players.append(Player("Bot 1", fake_player)) fake_player = Session(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) fake_player.ip = "127.0.0.1" fake_player.port = 3334 self.players.append(Player("Bot 2", fake_player)) fake_player = Session(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) fake_player.ip = "127.0.0.1" fake_player.port = 3334 self.players.append(Player("Bot 3", fake_player)) self.accept_new_players = True self.version = 1.497 self.points_per_kill = 1 self.points_per_capture = 5 self.detente_time = 0 # minutes self._plugins = [] self.tempplayers = [] self.running = True self.ticks = 60 def update(self): #logger.debug("Calling update") pass async def add_player(self, player): self.tempplayers.remove(player) self.players.append(player) await self.broadcast_event("on_player_join", player) # todo: remove def create_temp_player(self, player): self.tempplayers.append(player) async def remove_player(self, player): self.players.remove(player) await self.broadcast_event("on_player_left", player) def add_plugin(self, plugin): self._plugins.append(plugin) async def change_map(self, mappath): try: self.currentmap = Map(mappath) await self.broadcast_event("on_map_change", self.currentmap) except Exception: logger.error("Could not change map") async def broadcast_event(self, event, *args): logger.debug("Broadcasting event "+event) for plugin in self._plugins: if hasattr(plugin, event): func = getattr(plugin, event) if callable(func): try: await func(*args) except Exception: logger.error("Could not call plugin function: "+plugin.__class__.__name__+"."+event) traceback.print_exc() def load_plugins(self): plugins = os.listdir("plugins") for plugin in plugins: if os.path.isdir("plugins/"+plugin) or plugin == "__init__.py": continue if not plugin.endswith(".py"): continue pluginname = plugin.split(".py")[0] try: module = importlib.import_module("plugins."+pluginname) if hasattr(module, "setup"): setup = getattr(module, "setup") if callable(setup): logger.info("Loading plugin "+module.__name__) module.setup(self) except Exception: logger.warning("Could not load plugin "+plugin) traceback.print_exc() async def broadcast_message(self, text, color=ChatColor.Orange, type=ChatType.All): for player in self.players: if player.name == "[Server]": continue await player.send_message(text, color=color, type=type) async def ask_command(self): while True: try: cmd = await ainput(">") logger.debug("Sending game payload %s", cmd) for player in self.players: if player.name == "[Server]": continue await player.session.send_gamedata(bytes.fromhex(cmd), acknow=False) except: traceback.print_exc() async def broadcast_gamedata(self, payload, **kwargs): for player in self.players: if player.name == "[Server]": continue await player.session.send_gamedata(payload, **kwargs) if __name__ == '__main__': server = Server(name="giantsd", maxplayers=20, register=False, teams=Teams.MvM) #, map=Map("Three Way Island - Canyons.gck")) server.load_plugins() loop = asyncio.get_event_loop() listen = loop.create_datagram_endpoint(lambda: Netserver(server), local_addr=(server.listen_ip, server.listen_port)) transport, protocol = loop.run_until_complete(listen) loop.create_task(server.ask_command()) try: loop.run_forever() except KeyboardInterrupt: pass transport.close() loop.close()