diff --git a/masterserver.py b/masterserver.py index d305933..ec22f6f 100644 --- a/masterserver.py +++ b/masterserver.py @@ -1,9 +1,9 @@ +import asyncio import logging import socket -import threading import datetime -import time -import traceback +import signal +import sys # List of servers always up DEDICATED_SERVERS = [{"ip": "136.143.97.184", "port": 19711, "last": datetime.datetime.now()}, @@ -11,6 +11,7 @@ DEDICATED_SERVERS = [{"ip": "136.143.97.184", "port": 19711, "last": datetime.da {"ip": "artolsheim.hipstercat.fr", "port": 19711, "last": datetime.datetime.now()}] SERVERS = DEDICATED_SERVERS.copy() +running = True def setup_logger(name): @@ -19,54 +20,32 @@ def setup_logger(name): _ch.setLevel("DEBUG") _formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') _ch.setFormatter(_formatter) - logger = logging.getLogger(name) - logger.addHandler(_ch) - logger.setLevel("DEBUG") - logger.debug("Logger initialized") - return logger + log = logging.getLogger(name) + log.addHandler(_ch) + log.setLevel("DEBUG") + log.debug("Logger initialized") + return log logger = setup_logger(__name__) -def main(): - listen_ip = "0.0.0.0" +class QueryServerProtocol(asyncio.Protocol): + def __init__(self): + super().__init__() + self.transport = None - # register socket. dedicated servers send packets to this socket to register. - registersock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - registersock.bind((listen_ip, 27900)) - logger.info("Register server listening on %s UDP %s", listen_ip, 27900) + def connection_made(self, transport): + peername = transport.get_extra_info('peername') + logger.info('Connection from {}'.format(peername)) + self.transport = transport - # query socket. clients use this socket to query available servers. - querysock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - querysock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - querysock.bind((listen_ip, 28900)) - print("Query server listening on %s TCP %s", listen_ip, 28900) - querysock.listen(5) + def data_received(self, data): + clean() + ip = self.transport.get_extra_info("peername") - # everything is threaded. - registerthread = threading.Thread(target=registerloop, args=(registersock,)) - querythread = threading.Thread(target=queryloop, args=(querysock,)) - - # clean thread is used to clean older server that haven't updated in a while - cleanthread = threading.Thread(target=cleanloop) - - # start everything - querythread.start() - registerthread.start() - cleanthread.start() - - # wait until everything is finished (which will never happen because they are all running in a loop) - querythread.join() - registerthread.join() - cleanthread.join() - - -def queryloop(querysock): - while True: # new query incoming from a client - (clientsocket, (_, _)) = querysock.accept() - logger.info("Query from %s:%s", clientsocket[0], clientsocket[1]) + logger.info("Query from %s", ip) # forge the response b = b'' @@ -78,30 +57,39 @@ def queryloop(querysock): b += b'\xac' b += str(SERVERS[i]["port"]).encode("ascii") b += b'\x00' - logger.info("Sending back %s", b.decode('utf8')) + logger.info("Sending back %s", b) # send it - clientsocket.send(b) + self.transport.write(b) # close the socket - clientsocket.close() + self.transport.close() -def registerloop(registersock): - while True: - # new register incoming from a server - data, (ip, port) = registersock.recvfrom(1024) - try: - # first byte is some kind of opcode, so we leave it. the rest is the game port - gameport = data[1:].decode("ascii") - if int(gameport): +class RegisterServerProtocol(asyncio.DatagramProtocol): + def __init__(self): + super().__init__() + self.transport = None + + def connection_made(self, transport): + self.transport = transport + + def datagram_received(self, data, addr): + clean() # remove old servers + ip = addr[0] + logger.info("Received %s from %s", data.decode("utf8"), ip) + gameport = data[1:].decode("ascii") + if int(gameport): + gameport = int(gameport) + try: # game port sent. now we try to reach the server to avoid registering of servers unreachable. tempsocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) tempsocket.settimeout(5) # we try to send a "status" packet to the server. - tempsocket.sendto("\\status\\".encode("ascii"), (ip, port)) - tempsocket.recvfrom(1024) + tempsocket.sendto("\\status\\".encode("ascii"), (ip, gameport)) + tempsocket.recvfrom(1024) # discard received message and suppose it was okay + tempsocket.close() # server will be None if it has not been seen before, or something else if it has been seen server = next((x for x in SERVERS if x["ip"] == ip and x["port"] == gameport), None) @@ -111,29 +99,62 @@ def registerloop(registersock): else: server["last"] = datetime.datetime.now() logger.info("Keepalive from: %s:%s" % (ip, gameport)) - tempsocket.close() - else: - # server sent shit, discard it - print("Fuck it, wasn't int: ", data) - except Exception as e: - # something wrong happened but we don't know what - print("Exception") - traceback.print_exc() + + except socket.timeout: + logger.info("Could not connect to %s:%s after timeout", ip, gameport) + else: + # server sent shit, discard it + logger.info("Fuck it, wasn't int: ", data) -def cleanloop(): - # clean loop removes servers that haven't registered or sent a packet in the last 2 minutes - # but don't remove known dedicated servers +def signal_handler(sig, frame): + # called when CTRL+C received + del sig, frame + logger.info('Shutting down...') + sys.exit(0) + + +async def wakeup(): + # hack to allow CTRL+C on Windows while True: - now = datetime.datetime.now() - for server in SERVERS: - if now > server["last"] + datetime.timedelta(minutes=2) and server not in DEDICATED_SERVERS: - logger.info("Deleting %s:%s" % (server["ip"], server["port"])) - try: - SERVERS.remove(server) - except ValueError: - pass - time.sleep(10) + await asyncio.sleep(1) + + +def clean(): + # clean function removes servers that haven't registered or sent a packet in the last 2 minutes + # but don't remove known dedicated servers + now = datetime.datetime.now() + for server in SERVERS: + if now > server["last"] + datetime.timedelta(minutes=2) and server not in DEDICATED_SERVERS: + logger.info("Deleting %s:%s" % (server["ip"], server["port"])) + try: + SERVERS.remove(server) + except ValueError: + pass + + +def main(): + signal.signal(signal.SIGINT, signal_handler) + listen_ip = "0.0.0.0" + queryport = 28900 + registerport = 27900 + + loop = asyncio.get_event_loop() + + query_coro = loop.create_server(QueryServerProtocol, listen_ip, queryport) + loop.create_task(query_coro) + logger.info("Query server listening on %s TCP %s", listen_ip, queryport) + + register_coro = loop.create_datagram_endpoint(lambda: RegisterServerProtocol(), + local_addr=(listen_ip, registerport)) + loop.create_task(register_coro) + logger.info("Register server listening on %s UDP %s", listen_ip, registerport) + + # add wakeup hack + # https://stackoverflow.com/questions/27480967/why-does-the-asyncios-event-loop-suppress-the-keyboardinterrupt-on-windows + loop.create_task(wakeup()) + + loop.run_forever() if __name__ == '__main__': diff --git a/querytest.py b/querytest.py new file mode 100644 index 0000000..61234d5 --- /dev/null +++ b/querytest.py @@ -0,0 +1,23 @@ +import socket + +HOST = '127.0.0.1' +PORT = 28900 + +client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +client.connect((HOST, PORT)) +print('Connection to ' + HOST + ':' + str(PORT) + ' successful.') + +message = b"query" +print("Sending: %s" % message) +n = client.send(message) +if n != len(message): + print('Error while sending.') +else: + print('Sent.') + +print('Receiving...') +donnees = client.recv(1024) +print('Received:', donnees) + +print('Disconnection.') +client.close() diff --git a/registertest.py b/registertest.py new file mode 100644 index 0000000..d7e02b7 --- /dev/null +++ b/registertest.py @@ -0,0 +1,21 @@ +import socket +import time + +UDP_IP = "127.0.0.1" +MY_UDP = 27901 +UDP_PORT = 27900 + +sock = socket.socket(socket.AF_INET, # Internet + socket.SOCK_DGRAM) # UDP +sock.bind((UDP_IP, MY_UDP)) + +MESSAGE = b"\x0127901" + +while True: + print("Sending register/keep alive packet") + sock.sendto(MESSAGE, (UDP_IP, UDP_PORT)) # simulate register/keepalive packet + data, addr = sock.recvfrom(1024) # master server will query server + print("Received data %s" % data) + sock.sendto(MESSAGE, addr) # simulate "status" response + print("Sending \"status\" reponse") + time.sleep(30)