Appease the linter

This commit is contained in:
Andrew 2022-03-01 01:14:26 -05:00
parent 11278ebe5f
commit d760fcb010
4 changed files with 101 additions and 103 deletions

View File

@ -1,109 +1,107 @@
import os import os
import psutil
import pprint
import socket import socket
import time import time
import psutil
class BedrockPing: class BedrockPing:
magic = b'\x00\xff\xff\x00\xfe\xfe\xfe\xfe\xfd\xfd\xfd\xfd\x12\x34\x56\x78' magic = b'\x00\xff\xff\x00\xfe\xfe\xfe\xfe\xfd\xfd\xfd\xfd\x12\x34\x56\x78'
fields = { # (len, signed) fields = { # (len, signed)
"byte": (1, False), "byte": (1, False),
"long": (8, True), "long": (8, True),
"ulong": (8, False), "ulong": (8, False),
"magic": (16, False), "magic": (16, False),
"short": (2, True), "short": (2, True),
"ushort": (2, False), #unsigned short "ushort": (2, False), #unsigned short
"string": (2, False), #strlen is ushort "string": (2, False), #strlen is ushort
"bool": (1, False), "bool": (1, False),
"address": (7, False), "address": (7, False),
"uint24le": (3, False) "uint24le": (3, False)
} }
byte_order = 'big' byte_order = 'big'
def __init__(self, bedrock_addr, bedrock_port, client_guid=0, timeout=5): def __init__(self, bedrock_addr, bedrock_port, client_guid=0, timeout=5):
self.addr = bedrock_addr self.addr = bedrock_addr
self.port = bedrock_port self.port = bedrock_port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.settimeout(timeout) self.sock.settimeout(timeout)
self.proc = psutil.Process(os.getpid()) self.proc = psutil.Process(os.getpid())
self.guid = client_guid self.guid = client_guid
self.guid_bytes = self.guid.to_bytes(8, BedrockPing.byte_order) self.guid_bytes = self.guid.to_bytes(8, BedrockPing.byte_order)
@staticmethod @staticmethod
def __byter(in_val, to_type): def __byter(in_val, to_type):
f = BedrockPing.fields[to_type] f = BedrockPing.fields[to_type]
return in_val.to_bytes(f[0], BedrockPing.byte_order, signed=f[1]) return in_val.to_bytes(f[0], BedrockPing.byte_order, signed=f[1])
@staticmethod
def __slice(in_bytes, pattern):
ret = []
bi = 0 # bytes index
pi = 0 # pattern index
while(bi < len(in_bytes)):
try:
f = BedrockPing.fields[pattern[pi]]
except IndexError:
raise IndexError("Ran out of pattern with additional bytes remaining")
if pattern[pi] == "string":
shl = f[0] # string header length
sl = int.from_bytes(in_bytes[bi:bi+shl], BedrockPing.byte_order, signed=f[1]) # string length
l = shl+sl
ret.append(in_bytes[bi+shl:bi+shl+sl].decode('ascii'))
elif pattern[pi] == "magic":
l = f[0] # length of field
ret.append(in_bytes[bi:bi+l])
else:
l = f[0] # length of field
ret.append(int.from_bytes(in_bytes[bi:bi+l], BedrockPing.byte_order, signed=f[1]))
bi+=l
pi+=1
return ret
@staticmethod
def __get_time():
#return time.time_ns() // 1000000
return time.perf_counter_ns() // 1000000
def __sendping(self): @staticmethod
pack_id = BedrockPing.__byter(0x01, 'byte') def __slice(in_bytes, pattern):
now = BedrockPing.__byter(BedrockPing.__get_time(), 'ulong') ret = []
guid = self.guid_bytes bi = 0 # bytes index
d2s = pack_id+now+BedrockPing.magic+guid pi = 0 # pattern index
#print("S:", d2s) while bi < len(in_bytes):
self.sock.sendto(d2s, (self.addr, self.port)) try:
f = BedrockPing.fields[pattern[pi]]
except IndexError as index_error:
raise IndexError("Ran out of pattern with additional bytes remaining") from index_error
if pattern[pi] == "string":
shl = f[0] # string header length
sl = int.from_bytes(in_bytes[bi:bi+shl], BedrockPing.byte_order, signed=f[1]) # string length
l = shl+sl
ret.append(in_bytes[bi+shl:bi+shl+sl].decode('ascii'))
elif pattern[pi] == "magic":
l = f[0] # length of field
ret.append(in_bytes[bi:bi+l])
else:
l = f[0] # length of field
ret.append(int.from_bytes(in_bytes[bi:bi+l], BedrockPing.byte_order, signed=f[1]))
bi+=l
pi+=1
return ret
def __recvpong(self): @staticmethod
t_start = time.perf_counter() def __get_time():
data = self.sock.recv(4096) #return time.time_ns() // 1000000
if data[0] == 0x1c: return time.perf_counter_ns() // 1000000
ret = {}
sliced = BedrockPing.__slice(data,["byte","ulong","ulong","magic","string"])
if sliced[3] != BedrockPing.magic:
raise ValueError(f"Incorrect magic received ({sliced[3]})")
ret["server_guid"] = sliced[2]
ret["server_string_raw"] = sliced[4]
server_info = sliced[4].split(';')
ret["server_edition"] = server_info[0]
ret["server_motd"] = (server_info[1], server_info[7])
ret["server_protocol_version"] = server_info[2]
ret["server_version_name"] = server_info[3]
ret["server_player_count"] = server_info[4]
ret["server_player_max"] = server_info[5]
ret["server_uuid"] = server_info[6]
ret["server_game_mode"] = server_info[8]
ret["server_game_mode_num"] = server_info[9]
ret["server_port_ipv4"] = server_info[10]
ret["server_port_ipv6"] = server_info[11]
return ret
else:
raise ValueError(f"Incorrect packet type ({data[0]} detected")
def ping(self, retries=3): def __sendping(self):
rtr = retries pack_id = BedrockPing.__byter(0x01, 'byte')
while rtr > 0: now = BedrockPing.__byter(BedrockPing.__get_time(), 'ulong')
try: guid = self.guid_bytes
self.__sendping() d2s = pack_id+now+BedrockPing.magic+guid
return self.__recvpong() #print("S:", d2s)
except ValueError as e: self.sock.sendto(d2s, (self.addr, self.port))
print(f"E: {e}, checking next packet. Retries remaining: {rtr}/{retries}")
rtr -= 1 def __recvpong(self):
data = self.sock.recv(4096)
if data[0] == 0x1c:
ret = {}
sliced = BedrockPing.__slice(data,["byte","ulong","ulong","magic","string"])
if sliced[3] != BedrockPing.magic:
raise ValueError(f"Incorrect magic received ({sliced[3]})")
ret["server_guid"] = sliced[2]
ret["server_string_raw"] = sliced[4]
server_info = sliced[4].split(';')
ret["server_edition"] = server_info[0]
ret["server_motd"] = (server_info[1], server_info[7])
ret["server_protocol_version"] = server_info[2]
ret["server_version_name"] = server_info[3]
ret["server_player_count"] = server_info[4]
ret["server_player_max"] = server_info[5]
ret["server_uuid"] = server_info[6]
ret["server_game_mode"] = server_info[8]
ret["server_game_mode_num"] = server_info[9]
ret["server_port_ipv4"] = server_info[10]
ret["server_port_ipv6"] = server_info[11]
return ret
else:
raise ValueError(f"Incorrect packet type ({data[0]} detected")
def ping(self, retries=3):
rtr = retries
while rtr > 0:
try:
self.__sendping()
return self.__recvpong()
except ValueError as e:
print(f"E: {e}, checking next packet. Retries remaining: {rtr}/{retries}")
rtr -= 1

View File

@ -179,4 +179,3 @@ def ping_bedrock(ip, port):
return brp.ping() return brp.ping()
except socket.timeout: except socket.timeout:
logger.debug("Unable to get RakNet stats") logger.debug("Unable to get RakNet stats")

View File

@ -163,7 +163,6 @@ class Stats:
@staticmethod @staticmethod
def parse_server_RakNet_ping(ping_obj: object): def parse_server_RakNet_ping(ping_obj: object):
online_stats = {}
try: try:
server_icon = base64.encodebytes(ping_obj['icon']) server_icon = base64.encodebytes(ping_obj['icon'])

View File

@ -577,7 +577,8 @@ class Server:
logger.info(f"Starting server {self.name} (ID {self.server_id}) backup") logger.info(f"Starting server {self.name} (ID {self.server_id}) backup")
server_users = server_permissions.get_server_user_list(self.server_id) server_users = server_permissions.get_server_user_list(self.server_id)
for user in server_users: for user in server_users:
websocket_helper.broadcast_user(user, 'notification', translation.translate('notify', 'backupStarted', users_helper.get_user_lang_by_id(user)) + self.name) websocket_helper.broadcast_user(user, 'notification', translation.translate('notify',
'backupStarted', users_helper.get_user_lang_by_id(user)) + self.name)
time.sleep(3) time.sleep(3)
self.is_backingup = True self.is_backingup = True
conf = management_helper.get_backup_config(self.server_id) conf = management_helper.get_backup_config(self.server_id)
@ -617,7 +618,8 @@ class Server:
logger.info(f"Backup of server: {self.name} completed") logger.info(f"Backup of server: {self.name} completed")
server_users = server_permissions.get_server_user_list(self.server_id) server_users = server_permissions.get_server_user_list(self.server_id)
for user in server_users: for user in server_users:
websocket_helper.broadcast_user(user, 'notification', translation.translate('notify', 'backupComplete', users_helper.get_user_lang_by_id(user)) + self.name) websocket_helper.broadcast_user(user, 'notification', translation.translate('notify', 'backupComplete',
users_helper.get_user_lang_by_id(user)) + self.name)
time.sleep(3) time.sleep(3)
return return
except: except: