giantsd/dpnet/CFrame.py

75 lines
2.5 KiB
Python

from .packet import Packet
import random
class CFrame:
PACKET_COMMAND_FRAME = 0x80
PACKET_COMMAND_POLL = 0x08
FRAME_EXOPCODE_CONNECT = 0x01
FRAME_EXOPCODE_CONNECTED = 0x02
FRAME_EXOPCODE_CONNECTED_SIGNED = 0x03
FRAME_EXOPCODE_HARD_DISCONNECT = 0x04
FRAME_EXOPCODE_SACK = 0x06
SACK_FLAGS_RESPONSE = 0x01
SACK_FLAGS_SACK_MASK1 = 0x02
SACK_FLAGS_SACK_MASK2 = 0x04
SACK_FLAGS_SEND_MASK1 = 0x08
SACK_FLAGS_SEND_MASK2 = 0x10
def __init__(self, packet=None):
self.Command = CFrame.PACKET_COMMAND_FRAME | CFrame.PACKET_COMMAND_POLL
self.ExtOpCode = CFrame.FRAME_EXOPCODE_CONNECT
self.MsgID = 0
self.RspId = 0
self.Flags = 1
self.Retry = 0
self.NSeq = 0
self.NRecv = 0
self.CurrentProtocolVersion = 0x00010006
self.SessID = random.getrandbits(32)
self.Timestamp = random.getrandbits(32)
if packet:
self.parse(packet)
def parse(self, packet):
packet.seek(0)
self.Command = packet.getByte()
self.ExtOpCode = packet.getByte()
if self.ExtOpCode == CFrame.FRAME_EXOPCODE_CONNECTED_SIGNED:
raise Exception("CONNECTED_SIGNED is not implemented")
if self.ExtOpCode == CFrame.FRAME_EXOPCODE_SACK:
self.Flags = packet.getByte()
self.Retry = packet.getByte()
self.NSeq = packet.getByte()
self.NRecv = packet.getByte()
packet.getByte() # padding
packet.getByte() # padding
self.Timestamp = packet.getULong()
else:
self.MsgID = packet.getByte()
self.RspId = packet.getByte()
self.CurrentProtocolVersion = packet.getLong()
self.SessID = packet.getLong()
self.Timestamp = packet.getULong()
def to_packet(self):
packet = Packet()
packet.putByte(self.Command)
packet.putByte(self.ExtOpCode)
if self.ExtOpCode == CFrame.FRAME_EXOPCODE_SACK:
packet.putByte(self.Flags)
packet.putByte(self.Retry)
packet.putByte(self.NSeq)
packet.putByte(self.NRecv)
packet.putShort(0) # padding
packet.putULong(self.Timestamp)
else:
packet.putByte(self.MsgID)
packet.putByte(self.RspId)
packet.putLong(self.CurrentProtocolVersion)
packet.putLong(self.SessID)
packet.putULong(self.Timestamp)
return packet