mirror of
https://gitlab.com/crafty-controller/crafty-4.git
synced 2025-01-19 17:55:29 +01:00
0d664cfae5
This fixes an error being sent when session.log doesn't exist. This usually occurs when the logs directory is deleted.
1011 lines
36 KiB
Python
1011 lines
36 KiB
Python
import contextlib
|
|
import os
|
|
import re
|
|
import sys
|
|
import json
|
|
import tempfile
|
|
import time
|
|
import uuid
|
|
import string
|
|
import base64
|
|
import socket
|
|
import secrets
|
|
import logging
|
|
import html
|
|
import zipfile
|
|
import pathlib
|
|
import ctypes
|
|
from datetime import datetime
|
|
from socket import gethostname
|
|
from contextlib import redirect_stderr, suppress
|
|
|
|
from app.classes.shared.null_writer import NullWriter
|
|
from app.classes.shared.console import Console
|
|
from app.classes.shared.installer import installer
|
|
from app.classes.shared.file_helpers import FileHelpers
|
|
from app.classes.shared.translation import Translation
|
|
from app.classes.web.websocket_helper import WebSocketHelper
|
|
|
|
with redirect_stderr(NullWriter()):
|
|
import psutil
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
import requests
|
|
from requests import get
|
|
from OpenSSL import crypto
|
|
from argon2 import PasswordHasher
|
|
|
|
except ModuleNotFoundError as err:
|
|
logger.critical(f"Import Error: Unable to load {err.name} module", exc_info=True)
|
|
print(f"Import Error: Unable to load {err.name} module")
|
|
installer.do_install()
|
|
|
|
|
|
class Helpers:
|
|
allowed_quotes = ['"', "'", "`"]
|
|
|
|
def __init__(self):
|
|
self.root_dir = os.path.abspath(os.path.curdir)
|
|
self.config_dir = os.path.join(self.root_dir, "app", "config")
|
|
self.webroot = os.path.join(self.root_dir, "app", "frontend")
|
|
self.servers_dir = os.path.join(self.root_dir, "servers")
|
|
self.backup_path = os.path.join(self.root_dir, "backups")
|
|
self.migration_dir = os.path.join(self.root_dir, "app", "migrations")
|
|
|
|
self.session_file = os.path.join(self.root_dir, "app", "config", "session.lock")
|
|
self.settings_file = os.path.join(self.root_dir, "app", "config", "config.json")
|
|
|
|
self.ensure_dir_exists(os.path.join(self.root_dir, "app", "config", "db"))
|
|
self.db_path = os.path.join(
|
|
self.root_dir, "app", "config", "db", "crafty.sqlite"
|
|
)
|
|
self.serverjar_cache = os.path.join(self.config_dir, "serverjars.json")
|
|
self.credits_cache = os.path.join(self.config_dir, "credits.json")
|
|
self.passhasher = PasswordHasher()
|
|
self.exiting = False
|
|
|
|
self.websocket_helper = WebSocketHelper(self)
|
|
self.translation = Translation(self)
|
|
|
|
@staticmethod
|
|
def auto_installer_fix(ex):
|
|
logger.critical(f"Import Error: Unable to load {ex.name} module", exc_info=True)
|
|
print(f"Import Error: Unable to load {ex.name} module")
|
|
installer.do_install()
|
|
|
|
@staticmethod
|
|
def float_to_string(gbs: float):
|
|
s = str(float(gbs) * 1000).rstrip("0").rstrip(".")
|
|
return s
|
|
|
|
@staticmethod
|
|
def check_file_perms(path):
|
|
try:
|
|
open(path, "r", encoding="utf-8").close()
|
|
logger.info(f"{path} is readable")
|
|
return True
|
|
except PermissionError:
|
|
return False
|
|
|
|
@staticmethod
|
|
def is_file_older_than_x_days(file, days=1):
|
|
if Helpers.check_file_exists(file):
|
|
file_time = os.path.getmtime(file)
|
|
# Check against 24 hours
|
|
if (time.time() - file_time) / 3600 > 24 * days:
|
|
return True
|
|
else:
|
|
return False
|
|
logger.error(f"{file} does not exist")
|
|
return True
|
|
|
|
def get_servers_root_dir(self):
|
|
return self.servers_dir
|
|
|
|
@staticmethod
|
|
def check_internet():
|
|
try:
|
|
requests.get("https://google.com", timeout=1)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
@staticmethod
|
|
def check_port(server_port):
|
|
try:
|
|
ip = get("https://api.ipify.org").content.decode("utf8")
|
|
except:
|
|
ip = "google.com"
|
|
a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
a_socket.settimeout(20.0)
|
|
|
|
location = (ip, server_port)
|
|
result_of_check = a_socket.connect_ex(location)
|
|
|
|
a_socket.close()
|
|
|
|
if result_of_check == 0:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def check_server_conn(server_port):
|
|
a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
a_socket.settimeout(10.0)
|
|
ip = "127.0.0.1"
|
|
|
|
location = (ip, server_port)
|
|
result_of_check = a_socket.connect_ex(location)
|
|
a_socket.close()
|
|
|
|
if result_of_check == 0:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def cmdparse(cmd_in):
|
|
# Parse a string into arguments
|
|
cmd_out = [] # "argv" output array
|
|
cmd_index = (
|
|
-1
|
|
) # command index - pointer to the argument we're building in cmd_out
|
|
new_param = True # whether we're creating a new argument/parameter
|
|
esc = False # whether an escape character was encountered
|
|
quote_char = None # if we're dealing with a quote, save the quote type here.
|
|
# Nested quotes to be dealt with by the command
|
|
for char in cmd_in: # for character in string
|
|
if (
|
|
new_param
|
|
): # if set, begin a new argument and increment the command index.
|
|
# Continue the loop.
|
|
if char == " ":
|
|
continue
|
|
else:
|
|
cmd_index += 1
|
|
cmd_out.append("")
|
|
new_param = False
|
|
if esc: # if we encountered an escape character on the last loop,
|
|
# append this char regardless of what it is
|
|
if char not in Helpers.allowed_quotes:
|
|
cmd_out[cmd_index] += "\\"
|
|
cmd_out[cmd_index] += char
|
|
esc = False
|
|
else:
|
|
if char == "\\": # if the current character is an escape character,
|
|
# set the esc flag and continue to next loop
|
|
esc = True
|
|
elif (
|
|
char == " " and quote_char is None
|
|
): # if we encounter a space and are not dealing with a quote,
|
|
# set the new argument flag and continue to next loop
|
|
new_param = True
|
|
elif (
|
|
char == quote_char
|
|
): # if we encounter the character that matches our start quote,
|
|
# end the quote and continue to next loop
|
|
quote_char = None
|
|
elif quote_char is None and (
|
|
char in Helpers.allowed_quotes
|
|
): # if we're not in the middle of a quote and we get a quotable
|
|
# character, start a quote and proceed to the next loop
|
|
quote_char = char
|
|
else: # else, just store the character in the current arg
|
|
cmd_out[cmd_index] += char
|
|
return cmd_out
|
|
|
|
def get_setting(self, key, default_return=False):
|
|
try:
|
|
with open(self.settings_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
if key in data.keys():
|
|
return data.get(key)
|
|
|
|
logger.error(f'Config File Error: Setting "{key}" does not exist')
|
|
Console.error(f'Config File Error: Setting "{key}" does not exist')
|
|
|
|
except Exception as e:
|
|
logger.critical(
|
|
f"Config File Error: Unable to read {self.settings_file} due to {e}"
|
|
)
|
|
Console.critical(
|
|
f"Config File Error: Unable to read {self.settings_file} due to {e}"
|
|
)
|
|
|
|
return default_return
|
|
|
|
def set_setting(self, key, new_value):
|
|
try:
|
|
with open(self.settings_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
if key in data.keys():
|
|
data[key] = new_value
|
|
with open(self.settings_file, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2)
|
|
return True
|
|
|
|
logger.error(f'Config File Error: Setting "{key}" does not exist')
|
|
Console.error(f'Config File Error: Setting "{key}" does not exist')
|
|
|
|
except Exception as e:
|
|
logger.critical(
|
|
f"Config File Error: Unable to read {self.settings_file} due to {e}"
|
|
)
|
|
Console.critical(
|
|
f"Config File Error: Unable to read {self.settings_file} due to {e}"
|
|
)
|
|
return False
|
|
|
|
@staticmethod
|
|
def get_local_ip():
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
try:
|
|
# doesn't even have to be reachable
|
|
s.connect(("10.255.255.255", 1))
|
|
ip = s.getsockname()[0]
|
|
except Exception:
|
|
ip = "127.0.0.1"
|
|
finally:
|
|
s.close()
|
|
return ip
|
|
|
|
def get_version(self):
|
|
version_data = {}
|
|
try:
|
|
with open(
|
|
os.path.join(self.config_dir, "version.json"), "r", encoding="utf-8"
|
|
) as f:
|
|
version_data = json.load(f)
|
|
|
|
except Exception as e:
|
|
Console.critical(f"Unable to get version data! \n{e}")
|
|
|
|
return version_data
|
|
|
|
@staticmethod
|
|
def get_announcements():
|
|
data = (
|
|
'[{"id":"1","date":"Unknown",'
|
|
'"title":"Error getting Announcements",'
|
|
'"desc":"Error getting Announcements","link":""}]'
|
|
)
|
|
|
|
try:
|
|
response = requests.get("https://craftycontrol.com/notify.json", timeout=2)
|
|
data = json.loads(response.content)
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch notifications with error: {e}")
|
|
|
|
return data
|
|
|
|
def get_version_string(self):
|
|
version_data = self.get_version()
|
|
major = version_data.get("major", "?")
|
|
minor = version_data.get("minor", "?")
|
|
sub = version_data.get("sub", "?")
|
|
meta = version_data.get("meta", "?")
|
|
|
|
# set some defaults if we don't get version_data from our helper
|
|
version = f"{major}.{minor}.{sub}-{meta}"
|
|
return str(version)
|
|
|
|
def encode_pass(self, password):
|
|
return self.passhasher.hash(password)
|
|
|
|
def verify_pass(self, password, currenthash):
|
|
try:
|
|
self.passhasher.verify(currenthash, password)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def log_colors(self, line):
|
|
# our regex replacements
|
|
# note these are in a tuple
|
|
|
|
user_keywords = self.get_setting("keywords")
|
|
|
|
replacements = [
|
|
(r"(\[.+?/INFO\])", r'<span class="mc-log-info">\1</span>'),
|
|
(r"(\[.+?/WARN\])", r'<span class="mc-log-warn">\1</span>'),
|
|
(r"(\[.+?/ERROR\])", r'<span class="mc-log-error">\1</span>'),
|
|
(r"(\[.+?/FATAL\])", r'<span class="mc-log-fatal">\1</span>'),
|
|
(
|
|
r"(\w+?\[/\d+?\.\d+?\.\d+?\.\d+?\:\d+?\])",
|
|
r'<span class="mc-log-keyword">\1</span>',
|
|
),
|
|
(r"\[(\d\d:\d\d:\d\d)\]", r'<span class="mc-log-time">[\1]</span>'),
|
|
(r"(\[.+? INFO\])", r'<span class="mc-log-info">\1</span>'),
|
|
(r"(\[.+? WARN\])", r'<span class="mc-log-warn">\1</span>'),
|
|
(r"(\[.+? ERROR\])", r'<span class="mc-log-error">\1</span>'),
|
|
(r"(\[.+? FATAL\])", r'<span class="mc-log-fatal">\1</span>'),
|
|
]
|
|
|
|
# highlight users keywords
|
|
for keyword in user_keywords:
|
|
# pylint: disable=consider-using-f-string
|
|
search_replace = (
|
|
r"({})".format(keyword),
|
|
r'<span class="mc-log-keyword">\1</span>',
|
|
)
|
|
replacements.append(search_replace)
|
|
|
|
for old, new in replacements:
|
|
line = re.sub(old, new, line, flags=re.IGNORECASE)
|
|
|
|
return line
|
|
|
|
@staticmethod
|
|
def validate_traversal(base_path, filename):
|
|
logger.debug(f'Validating traversal ("{base_path}", "{filename}")')
|
|
base = pathlib.Path(base_path).resolve()
|
|
file = pathlib.Path(filename)
|
|
fileabs = base.joinpath(file).resolve()
|
|
common_path = pathlib.Path(os.path.commonpath([base, fileabs]))
|
|
if base == common_path:
|
|
return fileabs
|
|
else:
|
|
raise ValueError("Path traversal detected")
|
|
|
|
@staticmethod
|
|
def tail_file(file_name, number_lines=20):
|
|
if not Helpers.check_file_exists(file_name):
|
|
logger.warning(f"Unable to find file to tail: {file_name}")
|
|
return [f"Unable to find file to tail: {file_name}"]
|
|
|
|
# length of lines is X char here
|
|
avg_line_length = 255
|
|
|
|
# create our buffer number - number of lines * avg_line_length
|
|
line_buffer = number_lines * avg_line_length
|
|
|
|
# open our file
|
|
with open(file_name, "r", encoding="utf-8") as f:
|
|
|
|
# seek
|
|
f.seek(0, 2)
|
|
|
|
# get file size
|
|
fsize = f.tell()
|
|
|
|
# set pos @ last n chars
|
|
# (buffer from above = number of lines * avg_line_length)
|
|
f.seek(max(fsize - line_buffer, 0), 0)
|
|
|
|
# read file til the end
|
|
try:
|
|
lines = f.readlines()
|
|
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Unable to read a line in the file:{file_name} - due to error: {e}"
|
|
)
|
|
|
|
# now we are done getting the lines, let's return it
|
|
return lines
|
|
|
|
@staticmethod
|
|
def check_writeable(path: str):
|
|
filename = os.path.join(path, "tempfile.txt")
|
|
try:
|
|
open(filename, "w", encoding="utf-8").close()
|
|
os.remove(filename)
|
|
|
|
logger.info(f"{filename} is writable")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.critical(f"Unable to write to {path} - Error: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def check_root():
|
|
if Helpers.is_os_windows():
|
|
if ctypes.windll.shell32.IsUserAnAdmin() == 1:
|
|
return True
|
|
else:
|
|
return False
|
|
else:
|
|
if os.geteuid() == 0:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def unzip_file(zip_path):
|
|
new_dir_list = zip_path.split("/")
|
|
new_dir = ""
|
|
for i in range(len(new_dir_list) - 1):
|
|
if i == 0:
|
|
new_dir += new_dir_list[i]
|
|
else:
|
|
new_dir += "/" + new_dir_list[i]
|
|
|
|
if Helpers.check_file_perms(zip_path) and os.path.isfile(zip_path):
|
|
Helpers.ensure_dir_exists(new_dir)
|
|
temp_dir = tempfile.mkdtemp()
|
|
try:
|
|
with zipfile.ZipFile(zip_path, "r") as zip_ref:
|
|
zip_ref.extractall(temp_dir)
|
|
for i in enumerate(zip_ref.filelist):
|
|
if len(zip_ref.filelist) > 1 or not zip_ref.filelist[
|
|
i
|
|
].filename.endswith("/"):
|
|
break
|
|
|
|
full_root_path = temp_dir
|
|
|
|
for item in os.listdir(full_root_path):
|
|
if os.path.isdir(os.path.join(full_root_path, item)):
|
|
try:
|
|
FileHelpers.move_dir(
|
|
os.path.join(full_root_path, item),
|
|
os.path.join(new_dir, item),
|
|
)
|
|
except Exception as ex:
|
|
logger.error(f"ERROR IN ZIP IMPORT: {ex}")
|
|
else:
|
|
try:
|
|
FileHelpers.move_file(
|
|
os.path.join(full_root_path, item),
|
|
os.path.join(new_dir, item),
|
|
)
|
|
except Exception as ex:
|
|
logger.error(f"ERROR IN ZIP IMPORT: {ex}")
|
|
except Exception as ex:
|
|
Console.error(ex)
|
|
else:
|
|
return "false"
|
|
return
|
|
|
|
def ensure_logging_setup(self):
|
|
log_file = os.path.join(os.path.curdir, "logs", "commander.log")
|
|
session_log_file = os.path.join(os.path.curdir, "logs", "session.log")
|
|
|
|
logger.info("Checking app directory writable")
|
|
|
|
writeable = Helpers.check_writeable(self.root_dir)
|
|
|
|
# if not writeable, let's bomb out
|
|
if not writeable:
|
|
logger.critical(f"Unable to write to {self.root_dir} directory!")
|
|
sys.exit(1)
|
|
|
|
# ensure the log directory is there
|
|
try:
|
|
with suppress(FileExistsError):
|
|
os.makedirs(os.path.join(self.root_dir, "logs"))
|
|
except Exception as e:
|
|
Console.error(f"Failed to make logs directory with error: {e} ")
|
|
|
|
# ensure the log file is there
|
|
try:
|
|
open(log_file, "a", encoding="utf-8").close()
|
|
except Exception as e:
|
|
Console.critical(f"Unable to open log file! {e}")
|
|
sys.exit(1)
|
|
|
|
# del any old session.lock file as this is a new session
|
|
try:
|
|
with contextlib.suppress(FileNotFoundError):
|
|
os.remove(session_log_file)
|
|
except Exception as e:
|
|
Console.error(f"Deleting Session.lock failed with error: {e}")
|
|
|
|
@staticmethod
|
|
def get_time_as_string():
|
|
now = datetime.now()
|
|
return now.strftime("%m/%d/%Y, %H:%M:%S")
|
|
|
|
@staticmethod
|
|
def calc_percent(source_path, dest_path):
|
|
# calculates percentable of zip from drive. Not with compression.
|
|
# (For backups and support logs)
|
|
source_size = 0
|
|
files_count = 0
|
|
for path, _dirs, files in os.walk(source_path):
|
|
for file in files:
|
|
full_path = os.path.join(path, file)
|
|
source_size += os.stat(full_path).st_size
|
|
files_count += 1
|
|
try:
|
|
dest_size = os.path.getsize(str(dest_path))
|
|
percent = round((dest_size / source_size) * 100, 1)
|
|
except:
|
|
percent = 0
|
|
if percent >= 0:
|
|
results = {"percent": percent, "total_files": files_count}
|
|
else:
|
|
results = {"percent": 0, "total_files": files_count}
|
|
return results
|
|
|
|
@staticmethod
|
|
def check_file_exists(path: str):
|
|
logger.debug(f"Looking for path: {path}")
|
|
|
|
if os.path.exists(path) and os.path.isfile(path):
|
|
logger.debug(f"Found path: {path}")
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def human_readable_file_size(num: int, suffix="B"):
|
|
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
|
|
if abs(num) < 1024.0:
|
|
# pylint: disable=consider-using-f-string
|
|
return "%3.1f%s%s" % (num, unit, suffix)
|
|
num /= 1024.0
|
|
# pylint: disable=consider-using-f-string
|
|
return "%.1f%s%s" % (num, "Y", suffix)
|
|
|
|
@staticmethod
|
|
def check_path_exists(path: str):
|
|
if not path:
|
|
return False
|
|
logger.debug(f"Looking for path: {path}")
|
|
|
|
if os.path.exists(path):
|
|
logger.debug(f"Found path: {path}")
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def get_file_contents(path: str, lines=100):
|
|
|
|
contents = ""
|
|
|
|
if os.path.exists(path) and os.path.isfile(path):
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
for line in f.readlines()[-lines:]:
|
|
contents = contents + line
|
|
|
|
return contents
|
|
|
|
except Exception as e:
|
|
logger.error(f"Unable to read file: {path}. \n Error: {e}")
|
|
return False
|
|
else:
|
|
logger.error(
|
|
f"Unable to read file: {path}. File not found, or isn't a file."
|
|
)
|
|
return False
|
|
|
|
def create_session_file(self, ignore=False):
|
|
|
|
if ignore and os.path.exists(self.session_file):
|
|
os.remove(self.session_file)
|
|
|
|
if os.path.exists(self.session_file):
|
|
|
|
file_data = self.get_file_contents(self.session_file)
|
|
try:
|
|
data = json.loads(file_data)
|
|
pid = data.get("pid")
|
|
started = data.get("started")
|
|
if psutil.pid_exists(pid):
|
|
Console.critical(
|
|
f"Another Crafty Controller agent seems to be running..."
|
|
f"\npid: {pid} \nstarted on: {started}"
|
|
)
|
|
logger.critical("Found running crafty process. Exiting.")
|
|
sys.exit(1)
|
|
else:
|
|
logger.info(
|
|
"No process found for pid. Assuming "
|
|
"crafty crashed. Deleting stale session.lock"
|
|
)
|
|
os.remove(self.session_file)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to locate existing session.lock with error: {e} ")
|
|
Console.error(
|
|
f"Failed to locate existing session.lock with error: {e} "
|
|
)
|
|
|
|
sys.exit(1)
|
|
|
|
pid = os.getpid()
|
|
now = datetime.now()
|
|
|
|
session_data = {"pid": pid, "started": now.strftime("%d-%m-%Y, %H:%M:%S")}
|
|
with open(self.session_file, "w", encoding="utf-8") as f:
|
|
json.dump(session_data, f, indent=True)
|
|
|
|
# because this is a recursive function, we will return bytes,
|
|
# and set human readable later
|
|
@staticmethod
|
|
def get_dir_size(path: str):
|
|
total = 0
|
|
for entry in os.scandir(path):
|
|
if entry.is_dir(follow_symlinks=False):
|
|
total += Helpers.get_dir_size(entry.path)
|
|
else:
|
|
total += entry.stat(follow_symlinks=False).st_size
|
|
return total
|
|
|
|
@staticmethod
|
|
def list_dir_by_date(path: str, reverse=False):
|
|
return [
|
|
str(p)
|
|
for p in sorted(
|
|
pathlib.Path(path).iterdir(), key=os.path.getmtime, reverse=reverse
|
|
)
|
|
]
|
|
|
|
@staticmethod
|
|
def get_human_readable_files_sizes(paths: list):
|
|
sizes = []
|
|
for p in paths:
|
|
sizes.append(
|
|
{
|
|
"path": p,
|
|
"size": Helpers.human_readable_file_size(os.stat(p).st_size),
|
|
}
|
|
)
|
|
return sizes
|
|
|
|
@staticmethod
|
|
def base64_encode_string(fun_str: str):
|
|
s_bytes = str(fun_str).encode("utf-8")
|
|
b64_bytes = base64.encodebytes(s_bytes)
|
|
return b64_bytes.decode("utf-8")
|
|
|
|
@staticmethod
|
|
def base64_decode_string(fun_str: str):
|
|
s_bytes = str(fun_str).encode("utf-8")
|
|
b64_bytes = base64.decodebytes(s_bytes)
|
|
return b64_bytes.decode("utf-8")
|
|
|
|
@staticmethod
|
|
def create_uuid():
|
|
return str(uuid.uuid4())
|
|
|
|
@staticmethod
|
|
def ensure_dir_exists(path):
|
|
"""
|
|
ensures a directory exists
|
|
|
|
Checks for the existence of a directory, if the directory isn't there,
|
|
this function creates the directory
|
|
|
|
Args:
|
|
path (string): the path you are checking for
|
|
|
|
"""
|
|
|
|
try:
|
|
os.makedirs(path)
|
|
logger.debug(f"Created Directory : {path}")
|
|
|
|
# directory already exists - non-blocking error
|
|
except FileExistsError:
|
|
pass
|
|
except PermissionError as e:
|
|
logger.critical(f"Check generated exception due to permssion error: {e}")
|
|
|
|
def create_self_signed_cert(self, cert_dir=None):
|
|
|
|
if cert_dir is None:
|
|
cert_dir = os.path.join(self.config_dir, "web", "certs")
|
|
|
|
# create a directory if needed
|
|
Helpers.ensure_dir_exists(cert_dir)
|
|
|
|
cert_file = os.path.join(cert_dir, "commander.cert.pem")
|
|
key_file = os.path.join(cert_dir, "commander.key.pem")
|
|
|
|
logger.info(f"SSL Cert File is set to: {cert_file}")
|
|
logger.info(f"SSL Key File is set to: {key_file}")
|
|
|
|
# don't create new files if we already have them.
|
|
if Helpers.check_file_exists(cert_file) and Helpers.check_file_exists(key_file):
|
|
logger.info("Cert and Key files already exists, not creating them.")
|
|
return True
|
|
|
|
Console.info("Generating a self signed SSL")
|
|
logger.info("Generating a self signed SSL")
|
|
|
|
# create a key pair
|
|
logger.info("Generating a key pair. This might take a moment.")
|
|
Console.info("Generating a key pair. This might take a moment.")
|
|
k = crypto.PKey()
|
|
k.generate_key(crypto.TYPE_RSA, 4096)
|
|
|
|
# create a self-signed cert
|
|
cert = crypto.X509()
|
|
cert.get_subject().C = "US"
|
|
cert.get_subject().ST = "Michigan"
|
|
cert.get_subject().L = "Kent County"
|
|
cert.get_subject().O = "Crafty Controller"
|
|
cert.get_subject().OU = "Server Ops"
|
|
cert.get_subject().CN = gethostname()
|
|
alt_names = ",".join(
|
|
[
|
|
f"DNS:{socket.gethostname()}",
|
|
f"DNS:*.{socket.gethostname()}",
|
|
"DNS:localhost",
|
|
"DNS:*.localhost",
|
|
"DNS:127.0.0.1",
|
|
]
|
|
).encode()
|
|
subject_alt_names_ext = crypto.X509Extension(
|
|
b"subjectAltName", False, alt_names
|
|
)
|
|
basic_constraints_ext = crypto.X509Extension(
|
|
b"basicConstraints", True, b"CA:false"
|
|
)
|
|
cert.add_extensions([subject_alt_names_ext, basic_constraints_ext])
|
|
cert.set_serial_number(secrets.randbelow(254) + 1)
|
|
cert.gmtime_adj_notBefore(0)
|
|
cert.gmtime_adj_notAfter(365 * 24 * 60 * 60)
|
|
cert.set_issuer(cert.get_subject())
|
|
cert.set_pubkey(k)
|
|
cert.set_version(2)
|
|
cert.sign(k, "sha256")
|
|
|
|
f = open(cert_file, "w", encoding="utf-8")
|
|
f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode())
|
|
f.close()
|
|
|
|
f = open(key_file, "w", encoding="utf-8")
|
|
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k).decode())
|
|
f.close()
|
|
|
|
@staticmethod
|
|
def random_string_generator(size=6, chars=string.ascii_uppercase + string.digits):
|
|
"""
|
|
Example Usage
|
|
random_generator() = G8sjO2
|
|
random_generator(3, abcdef) = adf
|
|
"""
|
|
return "".join(secrets.choice(chars) for x in range(size))
|
|
|
|
@staticmethod
|
|
def is_os_windows():
|
|
if os.name == "nt":
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def wtol_path(w_path):
|
|
l_path = w_path.replace("\\", "/")
|
|
return l_path
|
|
|
|
@staticmethod
|
|
def ltow_path(l_path):
|
|
w_path = l_path.replace("/", "\\")
|
|
return w_path
|
|
|
|
@staticmethod
|
|
def get_os_understandable_path(path):
|
|
return os.path.normpath(path)
|
|
|
|
def find_default_password(self):
|
|
default_file = os.path.join(self.root_dir, "default.json")
|
|
data = {}
|
|
|
|
if Helpers.check_file_exists(default_file):
|
|
with open(default_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
del_json = self.get_setting("delete_default_json")
|
|
|
|
if del_json:
|
|
os.remove(default_file)
|
|
|
|
return data
|
|
|
|
@staticmethod
|
|
def generate_tree(folder, output=""):
|
|
dir_list = []
|
|
unsorted_files = []
|
|
file_list = os.listdir(folder)
|
|
for item in file_list:
|
|
if os.path.isdir(os.path.join(folder, item)):
|
|
dir_list.append(item)
|
|
else:
|
|
unsorted_files.append(item)
|
|
file_list = sorted(dir_list, key=str.casefold) + sorted(
|
|
unsorted_files, key=str.casefold
|
|
)
|
|
for raw_filename in file_list:
|
|
filename = html.escape(raw_filename)
|
|
rel = os.path.join(folder, raw_filename)
|
|
dpath = os.path.join(folder, filename)
|
|
if os.path.isdir(rel):
|
|
output += f"""<li class="tree-item" data-path="{dpath}">
|
|
\n<div id="{dpath}" data-path="{dpath}" data-name="{filename}" class="tree-caret tree-ctx-item tree-folder">
|
|
<span id="{dpath}span" class="files-tree-title" data-path="{dpath}" data-name="{filename}" onclick="getDirView(event)">
|
|
<i style="color: #8862e0;" class="far fa-folder"></i>
|
|
<i style="color: #8862e0;" class="far fa-folder-open"></i>
|
|
{filename}
|
|
</span>
|
|
</div><li>
|
|
\n"""
|
|
else:
|
|
if filename != "crafty_managed.txt":
|
|
output += f"""<li
|
|
class="d-block tree-ctx-item tree-file tree-item"
|
|
data-path="{dpath}"
|
|
data-name="{filename}"
|
|
onclick="clickOnFile(event)"><span style="margin-right: 6px;">
|
|
<i class="far fa-file"></i></span>{filename}</li>"""
|
|
return output
|
|
|
|
@staticmethod
|
|
def generate_dir(folder, output=""):
|
|
dir_list = []
|
|
unsorted_files = []
|
|
file_list = os.listdir(folder)
|
|
for item in file_list:
|
|
if os.path.isdir(os.path.join(folder, item)):
|
|
dir_list.append(item)
|
|
else:
|
|
unsorted_files.append(item)
|
|
file_list = sorted(dir_list, key=str.casefold) + sorted(
|
|
unsorted_files, key=str.casefold
|
|
)
|
|
output += f"""<ul class="tree-nested d-block" id="{folder}ul">"""
|
|
for raw_filename in file_list:
|
|
filename = html.escape(raw_filename)
|
|
dpath = os.path.join(folder, filename)
|
|
rel = os.path.join(folder, raw_filename)
|
|
if os.path.isdir(rel):
|
|
output += f"""<li class="tree-item" data-path="{dpath}">
|
|
\n<div id="{dpath}" data-path="{dpath}" data-name="{filename}" class="tree-caret tree-ctx-item tree-folder">
|
|
<span id="{dpath}span" class="files-tree-title" data-path="{dpath}" data-name="{filename}" onclick="getDirView(event)">
|
|
<i style="color: #8862e0;" class="far fa-folder"></i>
|
|
<i style="color: #8862e0;" class="far fa-folder-open"></i>
|
|
{filename}
|
|
</span>
|
|
</div><li>"""
|
|
else:
|
|
if filename != "crafty_managed.txt":
|
|
output += f"""<li
|
|
class="d-block tree-ctx-item tree-file tree-item"
|
|
data-path="{dpath}"
|
|
data-name="{filename}"
|
|
onclick="clickOnFile(event)"><span style="margin-right: 6px;">
|
|
<i class="far fa-file"></i></span>{filename}</li>"""
|
|
output += "</ul>\n"
|
|
return output
|
|
|
|
@staticmethod
|
|
def generate_zip_tree(folder, output=""):
|
|
file_list = os.listdir(folder)
|
|
file_list = sorted(file_list, key=str.casefold)
|
|
output += f"""<ul class="tree-nested d-block" id="{folder}ul">"""
|
|
for raw_filename in file_list:
|
|
filename = html.escape(raw_filename)
|
|
rel = os.path.join(folder, raw_filename)
|
|
dpath = os.path.join(folder, filename)
|
|
if os.path.isdir(rel):
|
|
output += f"""<li class="tree-item" data-path="{dpath}">
|
|
\n<div id="{dpath}" data-path="{dpath}" data-name="{filename}" class="tree-caret tree-ctx-item tree-folder">
|
|
<input type="radio" name="root_path" value="{dpath}">
|
|
<span id="{dpath}span" class="files-tree-title" data-path="{dpath}" data-name="{filename}" onclick="getDirView(event)">
|
|
<i style="color: #8862e0;" class="far fa-folder"></i>
|
|
<i style="color: #8862e0;" class="far fa-folder-open"></i>
|
|
{filename}
|
|
</span>
|
|
</input></div><li>
|
|
\n"""
|
|
return output
|
|
|
|
@staticmethod
|
|
def generate_zip_dir(folder, output=""):
|
|
file_list = os.listdir(folder)
|
|
file_list = sorted(file_list, key=str.casefold)
|
|
output += f"""<ul class="tree-nested d-block" id="{folder}ul">"""
|
|
for raw_filename in file_list:
|
|
filename = html.escape(raw_filename)
|
|
rel = os.path.join(folder, raw_filename)
|
|
dpath = os.path.join(folder, filename)
|
|
if os.path.isdir(rel):
|
|
output += f"""<li class="tree-item" data-path="{dpath}">
|
|
\n<div id="{dpath}" data-path="{dpath}" data-name="{filename}" class="tree-caret tree-ctx-item tree-folder">
|
|
<input type="radio" name="root_path" value="{dpath}">
|
|
<span id="{dpath}span" class="files-tree-title" data-path="{dpath}" data-name="{filename}" onclick="getDirView(event)">
|
|
<i style="color: #8862e0;" class="far fa-folder"></i>
|
|
<i style="color: #8862e0;" class="far fa-folder-open"></i>
|
|
{filename}
|
|
</span>
|
|
</input></div><li>"""
|
|
return output
|
|
|
|
def unzip_server(self, zip_path, user_id):
|
|
if Helpers.check_file_perms(zip_path):
|
|
temp_dir = tempfile.mkdtemp()
|
|
with zipfile.ZipFile(zip_path, "r") as zip_ref:
|
|
# extracts archive to temp directory
|
|
zip_ref.extractall(temp_dir)
|
|
if user_id:
|
|
self.websocket_helper.broadcast_user(
|
|
user_id, "send_temp_path", {"path": temp_dir}
|
|
)
|
|
|
|
def backup_select(self, path, user_id):
|
|
if user_id:
|
|
self.websocket_helper.broadcast_user(
|
|
user_id, "send_temp_path", {"path": path}
|
|
)
|
|
|
|
@staticmethod
|
|
def unzip_backup_archive(backup_path, zip_name):
|
|
zip_path = os.path.join(backup_path, zip_name)
|
|
if Helpers.check_file_perms(zip_path):
|
|
temp_dir = tempfile.mkdtemp()
|
|
with zipfile.ZipFile(zip_path, "r") as zip_ref:
|
|
# extracts archive to temp directory
|
|
zip_ref.extractall(temp_dir)
|
|
return temp_dir
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def in_path(parent_path, child_path):
|
|
# Smooth out relative path names, note: if you are concerned about
|
|
# symbolic links, you should use os.path.realpath too
|
|
parent_path = os.path.abspath(parent_path)
|
|
child_path = os.path.abspath(child_path)
|
|
|
|
# Compare the common path of the parent and child path with the
|
|
# common path of just the parent path. Using the commonpath method
|
|
# on just the parent path will regularise the path name in the same way
|
|
# as the comparison that deals with both paths, removing any trailing
|
|
# path separator
|
|
return os.path.commonpath([parent_path]) == os.path.commonpath(
|
|
[parent_path, child_path]
|
|
)
|
|
|
|
@staticmethod
|
|
def copy_files(source, dest):
|
|
if os.path.isfile(source):
|
|
FileHelpers.copy_file(source, dest)
|
|
logger.info("Copying jar %s to %s", source, dest)
|
|
else:
|
|
logger.info("Source jar does not exist.")
|
|
|
|
@staticmethod
|
|
def download_file(executable_url, jar_path):
|
|
try:
|
|
response = requests.get(executable_url, timeout=5)
|
|
except Exception as ex:
|
|
logger.error("Could not download executable: %s", ex)
|
|
return False
|
|
if response.status_code != 200:
|
|
logger.error("Unable to download file from %s", executable_url)
|
|
return False
|
|
|
|
try:
|
|
open(jar_path, "wb").write(response.content)
|
|
except Exception as e:
|
|
logger.error("Unable to finish executable download. Error: %s", e)
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def remove_prefix(text, prefix):
|
|
if text.startswith(prefix):
|
|
return text[len(prefix) :]
|
|
return text
|
|
|
|
@staticmethod
|
|
def get_lang_page(text) -> str:
|
|
splitted = text.split("_")
|
|
if len(splitted) != 2:
|
|
return "en"
|
|
lang, region = splitted
|
|
if region == "EN":
|
|
return "en"
|
|
return lang + "-" + region
|