231 lines
7.0 KiB
Python
Raw Normal View History

import struct
import socket
import base64
import json
2021-08-29 00:48:30 +02:00
import os
import logging.config
2021-08-29 00:48:30 +02:00
from app.classes.shared.console import console
logger = logging.getLogger(__name__)
class Server:
def __init__(self, data):
self.description = data.get('description')
# print(self.description)
if isinstance(self.description, dict):
# cat server
if "translate" in self.description:
self.description = self.description['translate']
# waterfall / bungee
elif 'extra' in self.description:
lines = []
description = self.description
if 'extra' in description.keys():
for e in description['extra']:
2021-08-29 00:48:30 +02:00
#Conversion format code needed only for Java Version
lines.append(get_code_format("reset"))
if "bold" in e.keys():
lines.append(get_code_format("bold"))
if "italic" in e.keys():
lines.append(get_code_format("italic"))
if "underlined" in e.keys():
lines.append(get_code_format("underlined"))
if "strikethrough" in e.keys():
lines.append(get_code_format("strikethrough"))
if "obfuscated" in e.keys():
lines.append(get_code_format("obfuscated"))
if "color" in e.keys():
lines.append(get_code_format(e['color']))
2021-08-29 00:48:30 +02:00
#Then append the text
if "text" in e.keys():
2021-08-29 00:48:30 +02:00
if e['text'] == '\n':
lines.append("§§")
else:
lines.append(e['text'])
total_text = " ".join(lines)
self.description = total_text
# normal MC
else:
self.description = self.description['text']
self.icon = base64.b64decode(data.get('favicon', '')[22:])
self.players = Players(data['players']).report()
self.version = data['version']['name']
self.protocol = data['version']['protocol']
class Players(list):
def __init__(self, data):
super().__init__(Player(x) for x in data.get('sample', []))
self.max = data['max']
self.online = data['online']
def report(self):
players = []
for x in self:
players.append(str(x))
r_data = {
'online': self.online,
'max': self.max,
'players': players
}
return json.dumps(r_data)
class Player:
def __init__(self, data):
self.id = data['id']
self.name = data['name']
def __str__(self):
return self.name
2021-08-29 00:48:30 +02:00
def get_code_format(format_name):
root_dir = os.path.abspath(os.path.curdir)
format_file = os.path.join(root_dir, 'app', 'config', 'motd_format.json')
try:
with open(format_file, "r", encoding='utf-8') as f:
data = json.load(f)
if format_name in data.keys():
return data.get(format_name)
else:
logger.error(f"Format MOTD Error: format name {format_name} does not exist")
console.error(f"Format MOTD Error: format name {format_name} does not exist")
2021-08-29 00:48:30 +02:00
return ""
except Exception as e:
logger.critical(f"Config File Error: Unable to read {format_file} due to {e}")
console.critical(f"Config File Error: Unable to read {format_file} due to {e}")
2021-08-29 00:48:30 +02:00
return ""
# For the rest of requests see wiki.vg/Protocol
def ping(ip, port):
def read_var_int():
i = 0
j = 0
while True:
k = sock.recv(1)
if not k:
return 0
k = k[0]
i |= (k & 0x7f) << (j * 7)
j += 1
if j > 5:
raise ValueError('var_int too big')
if not k & 0x80:
return i
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((ip, port))
except socket.error:
2022-02-10 18:20:36 -05:00
return False
try:
host = ip.encode('utf-8')
data = b'' # wiki.vg/Server_List_Ping
data += b'\x00' # packet ID
data += b'\x04' # protocol variant
data += struct.pack('>b', len(host)) + host
data += struct.pack('>H', port)
data += b'\x01' # next state
data = struct.pack('>b', len(data)) + data
sock.sendall(data + b'\x01\x00') # handshake + status ping
length = read_var_int() # full packet length
if length < 10:
if length < 0:
return False
else:
return False
sock.recv(1) # packet type, 0 for pings
length = read_var_int() # string length
data = b''
while len(data) != length:
chunk = sock.recv(length - len(data))
if not chunk:
return False
data += chunk
logger.debug(f"Server reports this data on ping: {data}")
return Server(json.loads(data))
finally:
sock.close()
2022-02-10 18:20:36 -05:00
# For the rest of requests see wiki.vg/Protocol
def ping_bedrock(ip, port):
def read_var_int():
i = 0
j = 0
while True:
try:
k = sock.recvfrom(1024)
except:
return False
2022-02-10 18:20:36 -05:00
if not k:
return 0
k = k[0]
i |= (k & 0x7f) << (j * 7)
j += 1
if j > 5:
raise ValueError('var_int too big')
if not k & 0x80:
return i
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(2)
2022-02-10 18:20:36 -05:00
try:
sock.connect((ip, port))
except:
print("in first except")
2022-02-10 18:20:36 -05:00
return False
try:
host = ip.encode('utf-8')
data = b'' # wiki.vg/Server_List_Ping
data += b'\x00' # packet ID
data += b'\x04' # protocol variant
data += struct.pack('>b', len(host)) + host
data += struct.pack('>H', port)
data += b'\x01' # next state
data = struct.pack('>b', len(data)) + data
sock.sendall(data + b'\x01\x00') # handshake + status ping
length = read_var_int() # full packet length
2022-02-10 18:20:36 -05:00
if length < 10:
if length < 0:
return False
else:
return False
try:
sock.recvfrom(1024) # packet type, 0 for pings
except:
return False
2022-02-10 18:20:36 -05:00
length = read_var_int() # string length
data = b''
while len(data) != length:
print("in while")
2022-02-10 18:20:36 -05:00
chunk = sock.recv(length - len(data))
if not chunk:
return False
data += chunk
logger.debug(f"Server reports this data on ping: {data}")
return Server(json.loads(data))
finally:
sock.close()