import time import datetime import os from typing import Dict, Any, Tuple import json import logging import threading from app.classes.models.server_permissions import Enum_Permissions_Server from app.classes.models.crafty_permissions import Enum_Permissions_Crafty from app.classes.models.management import management_helper from app.classes.shared.authentication import authentication from app.classes.shared.helpers import helper from app.classes.web.base_handler import BaseHandler try: import bleach import libgravatar import requests import tornado.web import tornado.escape from tornado import iostream #TZLocal is set as a hidden import on win pipeline from tzlocal import get_localzone from cron_validator import CronValidator except ModuleNotFoundError as ex: helper.auto_installer_fix(ex) logger = logging.getLogger(__name__) class PanelHandler(BaseHandler): def get_user_roles(self) -> Dict[str, list]: user_roles = {} for user in self.controller.users.get_all_users(): user_roles_list = self.controller.users.get_user_roles_names(user.user_id) # user_servers = self.controller.servers.get_authorized_servers(user.user_id) user_roles[user.user_id] = user_roles_list return user_roles def get_role_servers(self) -> set: servers = set() for server in self.controller.list_defined_servers(): argument = int(float( bleach.clean( self.get_argument(f"server_{server['server_id']}_access", '0') ) )) if argument: servers.add(server['server_id']) return servers def get_perms_quantity(self) -> Tuple[str, dict]: permissions_mask: str = "000" server_quantity: dict = {} for permission in self.controller.crafty_perms.list_defined_crafty_permissions(): argument = int(float(bleach.clean( self.get_argument(f'permission_{permission.name}', '0') ))) if argument: permissions_mask = self.controller.crafty_perms.set_permission(permissions_mask, permission, argument) q_argument = int(float( bleach.clean( self.get_argument(f'quantity_{permission.name}', '0') ) )) if q_argument: server_quantity[permission.name] = q_argument else: server_quantity[permission.name] = 0 return permissions_mask, server_quantity def get_perms(self) -> str: permissions_mask: str = "000" for permission in self.controller.crafty_perms.list_defined_crafty_permissions(): argument = self.get_argument(f'permission_{permission.name}', None) if argument is not None: permissions_mask = self.controller.crafty_perms.set_permission(permissions_mask, permission, 1 if argument == '1' else 0) return permissions_mask def get_perms_server(self) -> str: permissions_mask = "00000000" for permission in self.controller.server_perms.list_defined_permissions(): argument = self.get_argument(f'permission_{permission.name}', None) if argument is not None: permissions_mask = self.controller.server_perms.set_permission(permissions_mask, permission, 1 if argument == '1' else 0) return permissions_mask def get_user_role_memberships(self) -> set: roles = set() for role in self.controller.roles.get_all_roles(): if self.get_argument(f'role_{role.role_id}_membership', None) == '1': roles.add(role.role_id) return roles def download_file(self, name: str, file: str): self.set_header('Content-Type', 'application/octet-stream') self.set_header('Content-Disposition', f'attachment; filename={name}') chunk_size = 1024 * 1024 * 4 # 4 MiB with open(file, 'rb') as f: while True: chunk = f.read(chunk_size) if not chunk: break try: self.write(chunk) # write the chunk to response self.flush() # send the chunk to client except iostream.StreamClosedError: # this means the client has closed the connection # so break the loop break finally: # deleting the chunk is very important because # if many clients are downloading files at the # same time, the chunks in memory will keep # increasing and will eat up the RAM del chunk def check_server_id(self): server_id = self.get_argument('id', None) api_key, _, exec_user = self.current_user superuser = exec_user['superuser'] # Commented out because there is no server access control for API keys, they just inherit from the host user # if api_key is not None: # superuser = superuser and api_key.superuser if server_id is None: self.redirect("/panel/error?error=Invalid Server ID") return None else: # Does this server exist? if not self.controller.servers.server_id_exists(server_id): self.redirect("/panel/error?error=Invalid Server ID") return None # Does the user have permission? if not superuser: # TODO: Figure out a better solution if api_key is not None: if not self.controller.servers.server_id_authorized_api_key(server_id, api_key): print(f'API key {api_key.name} (id: {api_key.token_id}) does not have permission') self.redirect("/panel/error?error=Invalid Server ID") return None else: if not self.controller.servers.server_id_authorized(server_id, exec_user["user_id"]): print(f'User {exec_user["user_id"]} does not have permission') self.redirect("/pandel/error?error=Invalid Server ID") return None return server_id # Server fetching, spawned asynchronously # TODO: Make the related front-end elements update with AJAX def fetch_server_data(self, page_data): total_players = 0 for server in page_data['servers']: total_players += len(self.controller.stats.get_server_players(server['server_data']['server_id'])) page_data['num_players'] = total_players for s in page_data['servers']: try: data = json.loads(s['int_ping_results']) s['int_ping_results'] = data except Exception as e: logger.error(f"Failed server data for page with error: {e}") return page_data @tornado.web.authenticated async def get(self, page): error = self.get_argument('error', "WTF Error!") template = "panel/denied.html" now = time.time() formatted_time = str(datetime.datetime.fromtimestamp(now).strftime('%Y-%m-%d %H:%M:%S')) api_key, _token_data, exec_user = self.current_user superuser = exec_user['superuser'] if api_key is not None: superuser = superuser and api_key.superuser exec_user_role = set() if superuser: # TODO: Figure out a better solution defined_servers = self.controller.list_defined_servers() exec_user_role.add("Super User") exec_user_crafty_permissions = self.controller.crafty_perms.list_defined_crafty_permissions() else: if api_key is not None: exec_user_crafty_permissions = self.controller.crafty_perms.get_api_key_permissions_list(api_key) else: exec_user_crafty_permissions = self.controller.crafty_perms.get_crafty_permissions_list( exec_user["user_id"]) logger.debug(exec_user['roles']) for r in exec_user['roles']: role = self.controller.roles.get_role(r) exec_user_role.add(role['role_name']) defined_servers = self.controller.servers.get_authorized_servers(exec_user["user_id"]) user_order = self.controller.users.get_user_by_id(exec_user['user_id']) user_order = user_order['server_order'].split(',') page_servers = [] server_ids = [] for server_id in user_order[:]: for server in defined_servers[:]: if str(server['server_id']) == str(server_id): page_servers.append(server) user_order.remove(server_id) defined_servers.remove(server) for server in defined_servers: server_ids.append(str(server['server_id'])) if server not in page_servers: page_servers.append(server) for server_id in user_order[:]: #remove IDs in list that user no longer has access to if str(server_id) not in server_ids: user_order.remove(server_id) defined_servers = page_servers page_data: Dict[str, Any] = { # todo: make this actually pull and compare version data 'update_available': False, 'serverTZ':get_localzone(), 'version_data': helper.get_version_string(), 'user_data': exec_user, 'user_role' : exec_user_role, 'user_crafty_permissions' : exec_user_crafty_permissions, 'crafty_permissions': { 'Server_Creation': Enum_Permissions_Crafty.Server_Creation, 'User_Config': Enum_Permissions_Crafty.User_Config, 'Roles_Config': Enum_Permissions_Crafty.Roles_Config, }, 'server_stats': { 'total': len(defined_servers), 'running': len(self.controller.list_running_servers()), 'stopped': (len(self.controller.list_defined_servers()) - len(self.controller.list_running_servers())) }, 'menu_servers': defined_servers, 'hosts_data': self.controller.management.get_latest_hosts_stats(), 'show_contribute': helper.get_setting("show_contribute_link", True), 'error': error, 'time': formatted_time, 'lang': self.controller.users.get_user_lang_by_id(exec_user["user_id"]), 'lang_page': helper.getLangPage(self.controller.users.get_user_lang_by_id(exec_user["user_id"])), 'super_user': superuser, 'api_key': { 'name': api_key.name, 'created': api_key.created, 'server_permissions': api_key.server_permissions, 'crafty_permissions': api_key.crafty_permissions, 'superuser': api_key.superuser } if api_key is not None else None, 'superuser': superuser } if helper.get_setting("allow_nsfw_profile_pictures"): rating = "x" else: rating = "g" #Get grvatar hash for profile pictures if exec_user['email'] != 'default@example.com' or "": g = libgravatar.Gravatar(libgravatar.sanitize_email(exec_user['email'])) url = g.get_image(size=80, default="404", force_default=False, rating=rating, filetype_extension=False, use_ssl=True) # + "?d=404" if requests.head(url).status_code != 404: profile_url = url else: profile_url = "/static/assets/images/faces-clipart/pic-3.png" else: profile_url = "/static/assets/images/faces-clipart/pic-3.png" page_data['user_image'] = profile_url if page == 'unauthorized': template = "panel/denied.html" elif page == "error": template = "public/error.html" elif page == 'credits': with open(helper.credits_cache, encoding='utf-8') as credits_default_local: try: remote = requests.get('https://craftycontrol.com/credits', allow_redirects=True) credits_dict: dict = remote.json() if not credits_dict["staff"]: logger.error("Issue with upstream Staff, using local.") credits_dict: dict = json.load(credits_default_local) except: logger.error("Request to credits bucket failed, using local.") credits_dict: dict = json.load(credits_default_local) timestamp = credits_dict["lastUpdate"] / 1000.0 page_data["patrons"] = credits_dict["patrons"] page_data["staff"] = credits_dict["staff"] # Filter Language keys to exclude joke prefix '*' clean_dict = {user.replace('*', ''): translation for user, translation in credits_dict['translations'].items()} page_data["translations"] = clean_dict # 0 Defines if we are using local credits file andd displays sadcat. if timestamp == 0: page_data["lastUpdate"] = '😿' else: page_data["lastUpdate"] = str(datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')) template = "panel/credits.html" elif page == 'contribute': template = "panel/contribute.html" elif page == 'dashboard': if superuser: # TODO: Figure out a better solution try: page_data['servers'] = self.controller.servers.get_all_servers_stats() except IndexError: self.controller.stats.record_stats() page_data['servers'] = self.controller.servers.get_all_servers_stats() else: try: user_auth = self.controller.servers.get_authorized_servers_stats(exec_user["user_id"]) except IndexError: self.controller.stats.record_stats() user_auth = self.controller.servers.get_authorized_servers_stats(exec_user["user_id"]) logger.debug(f"ASFR: {user_auth}") page_data['servers'] = user_auth page_data['server_stats']['running'] = len( list(filter(lambda x: x['stats']['running'], page_data['servers']))) page_data['server_stats']['stopped'] = len(page_data['servers']) - page_data['server_stats']['running'] #set user server order user_order = self.controller.users.get_user_by_id(exec_user['user_id']) user_order = user_order['server_order'].split(',') page_servers = [] server_ids = [] un_used_servers = page_data['servers'] flag = 0 for server_id in user_order[:]: for server in un_used_servers[:]: if flag == 0: server['stats']['downloading'] = self.controller.servers.get_download_status( str(server['stats']['server_id']['server_id'])) server['stats']['crashed'] = self.controller.servers.is_crashed( str(server['stats']['server_id']['server_id'])) try: server['stats']['waiting_start'] = self.controller.servers.get_waiting_start( str(server['stats']['server_id']['server_id'])) except Exception as e: logger.error(f"Failed to get server waiting to start: {e}") server['stats']['waiting_start'] = False if str(server['server_data']['server_id']) == str(server_id): page_servers.append(server) un_used_servers.remove(server) user_order.remove(server_id) #we only want to set these server stats values once. We need to update the flag so it only hits that if once. flag += 1 for server in un_used_servers: server_ids.append(str(server['server_data']['server_id'])) if server not in page_servers: page_servers.append(server) for server_id in user_order: #remove IDs in list that user no longer has access to if str(server_id) not in server_ids[:]: user_order.remove(server_id) page_data['servers'] = page_servers #num players is set to zero here. If we poll all servers while dashboard is loading it takes FOREVER. We leave this to the #async polling once dashboard is served. page_data['num_players'] = 0 template = "panel/dashboard.html" elif page == 'server_detail': subpage = bleach.clean(self.get_argument('subpage', "")) server_id = self.check_server_id() if server_id is None: return valid_subpages = ['term', 'logs', 'backup', 'config', 'files', 'admin_controls', 'schedules'] if subpage not in valid_subpages: logger.debug('not a valid subpage') subpage = 'term' logger.debug(f'Subpage: "{subpage}"') server = self.controller.get_server_obj(server_id) # server_data isn't needed since the server_stats also pulls server data page_data['server_data'] = self.controller.servers.get_server_data_by_id(server_id) page_data['server_stats'] = self.controller.servers.get_server_stats_by_id(server_id) page_data['downloading'] = self.controller.servers.get_download_status( server_id) try: page_data['waiting_start'] = self.controller.servers.get_waiting_start(server_id) except Exception as e: logger.error(f"Failed to get server waiting to start: {e}") page_data['waiting_start'] = False page_data['get_players'] = lambda: self.controller.stats.get_server_players(server_id) page_data['active_link'] = subpage page_data['permissions'] = { 'Commands': Enum_Permissions_Server.Commands, 'Terminal': Enum_Permissions_Server.Terminal, 'Logs': Enum_Permissions_Server.Logs, 'Schedule': Enum_Permissions_Server.Schedule, 'Backup': Enum_Permissions_Server.Backup, 'Files': Enum_Permissions_Server.Files, 'Config': Enum_Permissions_Server.Config, 'Players': Enum_Permissions_Server.Players, } page_data['user_permissions'] = self.controller.server_perms.get_user_id_permissions_list(exec_user["user_id"], server_id) page_data['server_stats']['crashed'] = self.controller.servers.is_crashed(server_id) page_data['server_stats']['server_type'] = self.controller.servers.get_server_type_by_id(server_id) if subpage == 'term': if not page_data['permissions']['Terminal'] in page_data['user_permissions']: if not superuser: self.redirect("/panel/error?error=Unauthorized access to Terminal") return if subpage == 'logs': if not page_data['permissions']['Logs'] in page_data['user_permissions']: if not superuser: self.redirect("/panel/error?error=Unauthorized access to Logs") return if subpage == 'schedules': if not page_data['permissions']['Schedule'] in page_data['user_permissions']: if not superuser: self.redirect("/panel/error?error=Unauthorized access To Schedules") return page_data['schedules'] = management_helper.get_schedules_by_server(server_id) if subpage == 'config': if not page_data['permissions']['Config'] in page_data['user_permissions']: if not superuser: self.redirect("/panel/error?error=Unauthorized access Server Config") return if subpage == 'files': if not page_data['permissions']['Files'] in page_data['user_permissions']: if not superuser: self.redirect("/panel/error?error=Unauthorized access Files") return if subpage == "backup": if not page_data['permissions']['Backup'] in page_data['user_permissions']: if not superuser: self.redirect("/panel/error?error=Unauthorized access to Backups") return server_info = self.controller.servers.get_server_data_by_id(server_id) page_data['backup_config'] = self.controller.management.get_backup_config(server_id) exclusions = [] page_data['exclusions'] = self.controller.management.get_excluded_backup_dirs(server_id) page_data['backing_up'] = self.controller.get_server_obj(server_id).is_backingup page_data['backup_stats'] = self.controller.get_server_obj(server_id).send_backup_status() #makes it so relative path is the only thing shown for file in page_data['exclusions']: if helper.is_os_windows(): exclusions.append(file.replace(server_info['path']+'\\', "")) else: exclusions.append(file.replace(server_info['path']+'/', "")) page_data['exclusions'] = exclusions self.controller.refresh_server_settings(server_id) try: page_data['backup_list'] = server.list_backups() except: page_data['backup_list'] = [] page_data['backup_path'] = helper.wtol_path(server_info["backup_path"]) def get_banned_players_html(): banned_players = self.controller.servers.get_banned_players(server_id) if banned_players is None: return """
  • Error while reading banned-players.json

  • """ html = "" for player in banned_players: html += f"""
  • {player['name']}

    Banned by {player['source']} for reason: {player['reason']}
  • """ return html if subpage == "admin_controls": if not page_data['permissions']['Players'] in page_data['user_permissions']: if not superuser: self.redirect("/panel/error?error=Unauthorized access") page_data['banned_players'] = get_banned_players_html() template = f"panel/server_{subpage}.html" elif page == 'download_backup': file = self.get_argument('file', "") server_id = self.check_server_id() if server_id is None: return server_info = self.controller.servers.get_server_data_by_id(server_id) backup_file = os.path.abspath(os.path.join(helper.get_os_understandable_path(server_info["backup_path"]), file)) if not helper.in_path(helper.get_os_understandable_path(server_info["backup_path"]), backup_file) \ or not os.path.isfile(backup_file): self.redirect("/panel/error?error=Invalid path detected") return self.download_file(file, backup_file) self.redirect(f"/panel/server_detail?id={server_id}&subpage=backup") elif page == 'panel_config': auth_servers = {} auth_role_servers = {} roles = self.controller.roles.get_all_roles() user_roles = {} for user in self.controller.users.get_all_users(): user_roles_list = self.controller.users.get_user_roles_names(user.user_id) user_servers = self.controller.servers.get_authorized_servers(user.user_id) servers = [] for server in user_servers: if server['server_name'] not in servers: servers.append(server['server_name']) new_item = {user.user_id: servers} auth_servers.update(new_item) data = {user.user_id: user_roles_list} user_roles.update(data) for role in roles: role_servers = [] role = self.controller.roles.get_role_with_servers(role.role_id) for serv_id in role['servers']: role_servers.append(self.controller.servers.get_server_data_by_id(serv_id)['server_name']) data = {role['role_id']: role_servers} auth_role_servers.update(data) page_data['auth-servers'] = auth_servers page_data['role-servers'] = auth_role_servers page_data['user-roles'] = user_roles page_data['users'] = self.controller.users.user_query(exec_user['user_id']) page_data['roles'] = self.controller.users.user_role_query(exec_user['user_id']) for user in page_data['users']: if user.user_id != exec_user['user_id']: user.api_token = "********" if superuser: for user in self.controller.users.get_all_users(): if user.superuser: super_auth_servers = ["Super User Access To All Servers"] page_data['users'] = self.controller.users.get_all_users() page_data['roles'] = self.controller.roles.get_all_roles() page_data['auth-servers'][user.user_id] = super_auth_servers template = "panel/panel_config.html" elif page == "add_user": page_data['new_user'] = True page_data['user'] = {} page_data['user']['username'] = "" page_data['user']['user_id'] = -1 page_data['user']['email'] = "" page_data['user']['enabled'] = True page_data['user']['superuser'] = False page_data['user']['created'] = "N/A" page_data['user']['last_login'] = "N/A" page_data['user']['last_ip'] = "N/A" page_data['user']['last_update'] = "N/A" page_data['user']['roles'] = set() if Enum_Permissions_Crafty.User_Config not in exec_user_crafty_permissions: self.redirect("/panel/error?error=Unauthorized access: not a user editor") return page_data['roles_all'] = self.controller.roles.get_all_roles() page_data['servers'] = [] page_data['servers_all'] = self.controller.list_defined_servers() page_data['role-servers'] = [] page_data['permissions_all'] = self.controller.crafty_perms.list_defined_crafty_permissions() page_data['permissions_list'] = set() page_data['quantity_server'] = self.controller.crafty_perms.list_all_crafty_permissions_quantity_limits() page_data['languages'] = [] page_data['languages'].append(self.controller.users.get_user_lang_by_id(exec_user["user_id"])) if superuser: page_data['super-disabled'] = '' else: page_data['super-disabled'] = 'disabled' for file in sorted(os.listdir(os.path.join(helper.root_dir, 'app', 'translations'))): if file.endswith('.json'): if file not in helper.get_setting('disabled_language_files'): if file != str(page_data['languages'][0] + '.json'): page_data['languages'].append(file.split('.')[0]) template = "panel/panel_edit_user.html" elif page == "add_schedule": server_id = self.get_argument('id', None) page_data['schedules'] = management_helper.get_schedules_by_server(server_id) page_data['get_players'] = lambda: self.controller.stats.get_server_players(server_id) page_data['active_link'] = 'schedules' page_data['permissions'] = { 'Commands': Enum_Permissions_Server.Commands, 'Terminal': Enum_Permissions_Server.Terminal, 'Logs': Enum_Permissions_Server.Logs, 'Schedule': Enum_Permissions_Server.Schedule, 'Backup': Enum_Permissions_Server.Backup, 'Files': Enum_Permissions_Server.Files, 'Config': Enum_Permissions_Server.Config, 'Players': Enum_Permissions_Server.Players, } page_data['user_permissions'] = self.controller.server_perms.get_user_id_permissions_list(exec_user["user_id"], server_id) page_data['server_data'] = self.controller.servers.get_server_data_by_id(server_id) page_data['server_stats'] = self.controller.servers.get_server_stats_by_id(server_id) page_data['server_stats']['server_type'] = self.controller.servers.get_server_type_by_id(server_id) page_data['new_schedule'] = True page_data['schedule'] = {} page_data['schedule']['children'] = [] page_data['schedule']['server_id'] = server_id page_data['schedule']['schedule_id'] = '' page_data['schedule']['action'] = "" page_data['schedule']['enabled'] = True page_data['schedule']['command'] = '' page_data['schedule']['one_time'] = False page_data['schedule']['cron_string'] = "" page_data['schedule']['time'] = "" page_data['schedule']['interval'] = "" #we don't need to check difficulty here. We'll just default to basic for new schedules page_data['schedule']['difficulty'] = "basic" page_data['schedule']['interval_type'] = 'days' if not Enum_Permissions_Server.Schedule in page_data['user_permissions']: if not superuser: self.redirect("/panel/error?error=Unauthorized access To Schedules") return template = "panel/server_schedule_edit.html" elif page == "edit_schedule": server_id = self.get_argument('id', None) page_data['schedules'] = management_helper.get_schedules_by_server(server_id) sch_id = self.get_argument('sch_id', None) schedule = self.controller.management.get_scheduled_task_model(sch_id) page_data['get_players'] = lambda: self.controller.stats.get_server_players(server_id) page_data['active_link'] = 'schedules' page_data['permissions'] = { 'Commands': Enum_Permissions_Server.Commands, 'Terminal': Enum_Permissions_Server.Terminal, 'Logs': Enum_Permissions_Server.Logs, 'Schedule': Enum_Permissions_Server.Schedule, 'Backup': Enum_Permissions_Server.Backup, 'Files': Enum_Permissions_Server.Files, 'Config': Enum_Permissions_Server.Config, 'Players': Enum_Permissions_Server.Players, } page_data['user_permissions'] = self.controller.server_perms.get_user_id_permissions_list(exec_user["user_id"], server_id) page_data['server_data'] = self.controller.servers.get_server_data_by_id(server_id) page_data['server_stats'] = self.controller.servers.get_server_stats_by_id(server_id) page_data['server_stats']['server_type'] = self.controller.servers.get_server_type_by_id(server_id) page_data['new_schedule'] = False page_data['schedule'] = {} page_data['schedule']['server_id'] = server_id page_data['schedule']['schedule_id'] = schedule.schedule_id page_data['schedule']['action'] = schedule.action page_data['schedule']['children'] = self.controller.management.get_child_schedules(sch_id) # We check here to see if the command is any of the default ones. # We do not want a user changing to a custom command and seeing our command there. if schedule.action != 'start' or schedule.action != 'stop' or schedule.action != 'restart' or schedule.action != 'backup': page_data['schedule']['command'] = schedule.command else: page_data['schedule']['command'] = '' page_data['schedule']['enabled'] = schedule.enabled page_data['schedule']['one_time'] = schedule.one_time page_data['schedule']['cron_string'] = schedule.cron_string page_data['schedule']['time'] = schedule.start_time page_data['schedule']['interval'] = schedule.interval page_data['schedule']['interval_type'] = schedule.interval_type if schedule.interval_type == 'reaction': difficulty = 'reaction' elif schedule.cron_string == '': difficulty = 'basic' else: difficulty = 'advanced' page_data['schedule']['difficulty'] = difficulty if sch_id is None or server_id is None: self.redirect("/panel/error?error=Invalid server ID or Schedule ID") if not Enum_Permissions_Server.Schedule in page_data['user_permissions']: if not superuser: self.redirect("/panel/error?error=Unauthorized access To Schedules") return template = "panel/server_schedule_edit.html" elif page == "edit_user": user_id = self.get_argument('id', None) role_servers = self.controller.servers.get_authorized_servers(user_id) page_role_servers = [] servers = set() for server in role_servers: page_role_servers.append(server['server_id']) page_data['new_user'] = False page_data['user'] = self.controller.users.get_user_by_id(user_id) page_data['servers'] = servers page_data['role-servers'] = page_role_servers page_data['roles_all'] = self.controller.roles.get_all_roles() page_data['servers_all'] = self.controller.list_defined_servers() page_data['permissions_all'] = self.controller.crafty_perms.list_defined_crafty_permissions() page_data['permissions_list'] = self.controller.crafty_perms.get_crafty_permissions_list(user_id) page_data['quantity_server'] = self.controller.crafty_perms.list_crafty_permissions_quantity_limits(user_id) page_data['languages'] = [] page_data['languages'].append(self.controller.users.get_user_lang_by_id(user_id)) #checks if super user. If not we disable the button. if superuser and str(exec_user['user_id']) != str(user_id): page_data['super-disabled'] = '' else: page_data['super-disabled'] = 'disabled' for file in sorted(os.listdir(os.path.join(helper.root_dir, 'app', 'translations'))): if file.endswith('.json'): if file not in helper.get_setting('disabled_language_files'): if file != str(page_data['languages'][0] + '.json'): page_data['languages'].append(file.split('.')[0]) if user_id is None: self.redirect("/panel/error?error=Invalid User ID") return elif Enum_Permissions_Crafty.User_Config not in exec_user_crafty_permissions: if str(user_id) != str(exec_user["user_id"]): self.redirect("/panel/error?error=Unauthorized access: not a user editor") return page_data['servers'] = [] page_data['role-servers'] = [] page_data['roles_all'] = [] page_data['servers_all'] = [] if exec_user['user_id'] != page_data['user']['user_id']: page_data['user']['api_token'] = "********" if exec_user['email'] == 'default@example.com': page_data['user']['email'] = "" template = "panel/panel_edit_user.html" elif page == "edit_user_apikeys": user_id = self.get_argument('id', None) page_data['user'] = self.controller.users.get_user_by_id(user_id) page_data['api_keys'] = self.controller.users.get_user_api_keys(user_id) # self.controller.crafty_perms.list_defined_crafty_permissions() page_data['server_permissions_all'] = self.controller.server_perms.list_defined_permissions() page_data['crafty_permissions_all'] = self.controller.crafty_perms.list_defined_crafty_permissions() if user_id is None: self.redirect("/panel/error?error=Invalid User ID") return template = "panel/panel_edit_user_apikeys.html" elif page == "remove_user": user_id = bleach.clean(self.get_argument('id', None)) if not superuser and Enum_Permissions_Crafty.User_Config not in exec_user_crafty_permissions: self.redirect("/panel/error?error=Unauthorized access: not superuser") return elif str(exec_user["user_id"]) == str(user_id): self.redirect("/panel/error?error=Unauthorized access: you cannot delete yourself") return elif user_id is None: self.redirect("/panel/error?error=Invalid User ID") return else: # does this user id exist? target_user = self.controller.users.get_user_by_id(user_id) if not target_user: self.redirect("/panel/error?error=Invalid User ID") return elif target_user['superuser']: self.redirect("/panel/error?error=Cannot remove a superuser") return self.controller.users.remove_user(user_id) self.controller.management.add_to_audit_log(exec_user['user_id'], f"Removed user {target_user['username']} (UID:{user_id})", server_id=0, source_ip=self.get_remote_ip()) self.redirect("/panel/panel_config") elif page == "add_role": user_roles = self.get_user_roles() page_data['new_role'] = True page_data['role'] = {} page_data['role']['role_name'] = "" page_data['role']['role_id'] = -1 page_data['role']['created'] = "N/A" page_data['role']['last_update'] = "N/A" page_data['role']['servers'] = set() page_data['user-roles'] = user_roles page_data['users'] = self.controller.users.get_all_users() if Enum_Permissions_Crafty.Roles_Config not in exec_user_crafty_permissions: self.redirect("/panel/error?error=Unauthorized access: not a role editor") return page_data['servers_all'] = self.controller.list_defined_servers() page_data['permissions_all'] = self.controller.server_perms.list_defined_permissions() page_data['permissions_list'] = set() template = "panel/panel_edit_role.html" elif page == "edit_role": user_roles = self.get_user_roles() page_data['new_role'] = False role_id = self.get_argument('id', None) page_data['role'] = self.controller.roles.get_role_with_servers(role_id) page_data['servers_all'] = self.controller.list_defined_servers() page_data['permissions_all'] = self.controller.server_perms.list_defined_permissions() page_data['permissions_list'] = self.controller.server_perms.get_role_permissions(role_id) page_data['user-roles'] = user_roles page_data['users'] = self.controller.users.get_all_users() if Enum_Permissions_Crafty.Roles_Config not in exec_user_crafty_permissions: self.redirect("/panel/error?error=Unauthorized access: not a role editor") return elif role_id is None: self.redirect("/panel/error?error=Invalid Role ID") return template = "panel/panel_edit_role.html" elif page == "remove_role": role_id = bleach.clean(self.get_argument('id', None)) if not superuser: self.redirect("/panel/error?error=Unauthorized access: not superuser") return elif role_id is None: self.redirect("/panel/error?error=Invalid Role ID") return else: # does this user id exist? target_role = self.controller.roles.get_role(role_id) if not target_role: self.redirect("/panel/error?error=Invalid Role ID") return self.controller.roles.remove_role(role_id) self.controller.management.add_to_audit_log(exec_user['user_id'], f"Removed role {target_role['role_name']} (RID:{role_id})", server_id=0, source_ip=self.get_remote_ip()) self.redirect("/panel/panel_config") elif page == "activity_logs": page_data['audit_logs'] = self.controller.management.get_actity_log() template = "panel/activity_logs.html" elif page == 'download_file': file = helper.get_os_understandable_path(self.get_argument('path', "")) name = self.get_argument('name', "") server_id = self.check_server_id() if server_id is None: return server_info = self.controller.servers.get_server_data_by_id(server_id) if not helper.in_path(helper.get_os_understandable_path(server_info["path"]), file) \ or not os.path.isfile(file): self.redirect("/panel/error?error=Invalid path detected") return self.download_file(name, file) self.redirect(f"/panel/server_detail?id={server_id}&subpage=files") elif page == 'download_support_package': tempZipStorage = exec_user['support_logs'] #We'll reset the support path for this user now. self.controller.users.set_support_path(exec_user["user_id"], "") self.set_header('Content-Type', 'application/octet-stream') self.set_header('Content-Disposition', 'attachment; filename=' + "support_logs.zip") chunk_size = 1024 * 1024 * 4 # 4 MiB if tempZipStorage != '': with open(tempZipStorage, 'rb') as f: while True: chunk = f.read(chunk_size) if not chunk: break try: self.write(chunk) # write the chunk to response self.flush() # send the chunk to client except iostream.StreamClosedError: # this means the client has closed the connection # so break the loop break finally: # deleting the chunk is very important because # if many clients are downloading files at the # same time, the chunks in memory will keep # increasing and will eat up the RAM del chunk self.redirect('/panel/dashboard') else: self.redirect('/panel/error?error=No path found for support logs') return elif page == "support_logs": logger.info(f"Support logs requested. Packinging logs for user with ID: {exec_user['user_id']}") logs_thread = threading.Thread(target=self.controller.package_support_logs, daemon=True, args=(exec_user,), name=f"{exec_user['user_id']}_logs_thread") logs_thread.start() self.redirect('/panel/dashboard') return self.render( template, data=page_data, time=time, utc_offset=(time.timezone * -1 / 60 / 60), translate=self.translator.translate, ) @tornado.web.authenticated def post(self, page): api_key, _token_data, exec_user = self.current_user superuser = exec_user['superuser'] if api_key is not None: superuser = superuser and api_key.superuser server_id = self.get_argument('id', None) permissions = { 'Commands': Enum_Permissions_Server.Commands, 'Terminal': Enum_Permissions_Server.Terminal, 'Logs': Enum_Permissions_Server.Logs, 'Schedule': Enum_Permissions_Server.Schedule, 'Backup': Enum_Permissions_Server.Backup, 'Files': Enum_Permissions_Server.Files, 'Config': Enum_Permissions_Server.Config, 'Players': Enum_Permissions_Server.Players, } exec_user_role = set() if superuser: # defined_servers = self.controller.list_defined_servers() exec_user_role.add("Super User") exec_user_crafty_permissions = self.controller.crafty_perms.list_defined_crafty_permissions() else: exec_user_crafty_permissions = self.controller.crafty_perms.get_crafty_permissions_list(exec_user["user_id"]) # defined_servers = self.controller.servers.get_authorized_servers(exec_user["user_id"]) for r in exec_user['roles']: role = self.controller.roles.get_role(r) exec_user_role.add(role['role_name']) if page == 'server_detail': if not permissions['Config'] in self.controller.server_perms.get_user_id_permissions_list(exec_user["user_id"], server_id): if not superuser: self.redirect("/panel/error?error=Unauthorized access to Config") return server_name = self.get_argument('server_name', None) server_obj = self.controller.servers.get_server_obj(server_id) if superuser: server_path = self.get_argument('server_path', None) if helper.is_os_windows(): server_path.replace(' ', '^ ') server_path = helper.wtol_path(server_path) log_path = self.get_argument('log_path', None) if helper.is_os_windows(): log_path.replace(' ', '^ ') log_path = helper.wtol_path(log_path) executable = self.get_argument('executable', None) execution_command = self.get_argument('execution_command', None) server_ip = self.get_argument('server_ip', None) server_port = self.get_argument('server_port', None) executable_update_url = self.get_argument('executable_update_url', None) else: execution_command = server_obj.execution_command executable = server_obj.executable stop_command = self.get_argument('stop_command', None) auto_start_delay = self.get_argument('auto_start_delay', '10') auto_start = int(float(self.get_argument('auto_start', '0'))) crash_detection = int(float(self.get_argument('crash_detection', '0'))) logs_delete_after = int(float(self.get_argument('logs_delete_after', '0'))) # subpage = self.get_argument('subpage', None) server_id = self.check_server_id() if server_id is None: return server_obj = self.controller.servers.get_server_obj(server_id) stale_executable = server_obj.executable #Compares old jar name to page data being passed. If they are different we replace the executable name in the if str(stale_executable) != str(executable): execution_command = execution_command.replace(str(stale_executable), str(executable)) server_obj.server_name = server_name if superuser: if helper.validate_traversal(helper.get_servers_root_dir(), server_path): server_obj.path = server_path server_obj.log_path = log_path if helper.validate_traversal(helper.get_servers_root_dir(), executable): server_obj.executable = executable server_obj.execution_command = execution_command server_obj.server_ip = server_ip server_obj.server_port = server_port server_obj.executable_update_url = executable_update_url else: server_obj.path = server_obj.path server_obj.log_path = server_obj.log_path server_obj.executable = server_obj.executable server_obj.execution_command = server_obj.execution_command server_obj.server_ip = server_obj.server_ip server_obj.server_port = server_obj.server_port server_obj.executable_update_url = server_obj.executable_update_url server_obj.stop_command = stop_command server_obj.auto_start_delay = auto_start_delay server_obj.auto_start = auto_start server_obj.crash_detection = crash_detection server_obj.logs_delete_after = logs_delete_after self.controller.servers.update_server(server_obj) self.controller.crash_detection(server_obj) self.controller.refresh_server_settings(server_id) self.controller.management.add_to_audit_log(exec_user['user_id'], f"Edited server {server_id} named {server_name}", server_id, self.get_remote_ip()) self.redirect(f"/panel/server_detail?id={server_id}&subpage=config") if page == "server_backup": logger.debug(self.request.arguments) server_id = self.get_argument('id', None) server_obj = self.controller.servers.get_server_obj(server_id) compress = self.get_argument('compress', False) check_changed = self.get_argument('changed') if str(check_changed) == str(1): checked = self.get_body_arguments('root_path') else: checked = self.controller.management.get_excluded_backup_dirs(server_id) if superuser: backup_path = bleach.clean(self.get_argument('backup_path', None)) if helper.is_os_windows(): backup_path.replace(' ', '^ ') backup_path = helper.wtol_path(backup_path) else: backup_path = server_obj.backup_path max_backups = bleach.clean(self.get_argument('max_backups', None)) if not permissions['Backup'] in self.controller.server_perms.get_user_id_permissions_list(exec_user["user_id"], server_id): if not superuser: self.redirect("/panel/error?error=Unauthorized access: User not authorized") return elif server_id is None: self.redirect("/panel/error?error=Invalid Server ID") return else: # does this server id exist? if not self.controller.servers.server_id_exists(server_id): self.redirect("/panel/error?error=Invalid Server ID") return server_obj = self.controller.servers.get_server_obj(server_id) server_obj.backup_path = backup_path self.controller.servers.update_server(server_obj) self.controller.management.set_backup_config(server_id, max_backups=max_backups, excluded_dirs=checked, compress=bool(compress)) self.controller.management.add_to_audit_log(exec_user['user_id'], f"Edited server {server_id}: updated backups", server_id, self.get_remote_ip()) self.tasks_manager.reload_schedule_from_db() self.redirect(f"/panel/server_detail?id={server_id}&subpage=backup") if page == "new_schedule": server_id = bleach.clean(self.get_argument('id', None)) difficulty = bleach.clean(self.get_argument('difficulty', None)) server_obj = self.controller.servers.get_server_obj(server_id) enabled = bleach.clean(self.get_argument('enabled', '0')) if difficulty == 'basic': action = bleach.clean(self.get_argument('action', None)) interval = bleach.clean(self.get_argument('interval', None)) interval_type = bleach.clean(self.get_argument('interval_type', None)) #only check for time if it's number of days if interval_type == "days": sch_time = bleach.clean(self.get_argument('time', None)) if action == "command": command = bleach.clean(self.get_argument('command', None)) elif action == "start": command = "start_server" elif action == "stop": command = "stop_server" elif action == "restart": command = "restart_server" elif action == "backup": command = "backup_server" elif difficulty == 'reaction': interval_type = 'reaction' action = bleach.clean(self.get_argument('action', None)) delay = bleach.clean(self.get_argument('delay', None)) parent = bleach.clean(self.get_argument('parent', None)) if action == "command": command = bleach.clean(self.get_argument('command', None)) elif action == "start": command = "start_server" elif action == "stop": command = "stop_server" elif action == "restart": command = "restart_server" elif action == "backup": command = "backup_server" else: interval_type = '' cron_string = bleach.clean(self.get_argument('cron', '')) try: CronValidator.parse(cron_string) except Exception as e: self.redirect(f"/panel/error?error=INVALID FORMAT: Invalid Cron Format. {e}") return action = bleach.clean(self.get_argument('action', None)) if action == "command": command = bleach.clean(self.get_argument('command', None)) elif action == "start": command = "start_server" elif action == "stop": command = "stop_server" elif action == "restart": command = "restart_server" elif action == "backup": command = "backup_server" if bleach.clean(self.get_argument('enabled', '0')) == '1': enabled = True else: enabled = False if bleach.clean(self.get_argument('one_time', '0')) == '1': one_time = True else: one_time = False if not superuser and not permissions['Backup'] in self.controller.server_perms.get_user_id_permissions_list(exec_user["user_id"], server_id): self.redirect("/panel/error?error=Unauthorized access: User not authorized") return elif server_id is None: self.redirect("/panel/error?error=Invalid Server ID") return else: # does this server id exist? if not self.controller.servers.server_id_exists(server_id): self.redirect("/panel/error?error=Invalid Server ID") return if interval_type == "days": job_data = { "server_id": server_id, "action": action, "interval_type": interval_type, "interval": interval, "command": command, "start_time": sch_time, "enabled": enabled, "one_time": one_time, "cron_string": '', "parent": None, "delay": 0 } elif difficulty == "reaction": job_data = { "server_id": server_id, "action": action, "interval_type": interval_type, "interval": '', #We'll base every interval off of a midnight start time. "start_time": '', "command": command, "cron_string": '', "enabled": enabled, "one_time": one_time, "parent": parent, "delay": delay } elif difficulty == "advanced": job_data = { "server_id": server_id, "action": action, "interval_type": '', "interval": '', #We'll base every interval off of a midnight start time. "start_time": '', "command": command, "cron_string": cron_string, "enabled": enabled, "one_time": one_time, "parent": None, "delay": 0 } else: job_data = { "server_id": server_id, "action": action, "interval_type": interval_type, "interval": interval, "command": command, "enabled": enabled, #We'll base every interval off of a midnight start time. "start_time": '00:00', "one_time": one_time, "cron_string": '', 'parent': None, 'delay': 0 } self.tasks_manager.schedule_job(job_data) self.controller.management.add_to_audit_log(exec_user['user_id'], f"Edited server {server_id}: added scheduled job", server_id, self.get_remote_ip()) self.tasks_manager.reload_schedule_from_db() self.redirect(f"/panel/server_detail?id={server_id}&subpage=schedules") if page == "edit_schedule": server_id = bleach.clean(self.get_argument('id', None)) difficulty = bleach.clean(self.get_argument('difficulty', None)) server_obj = self.controller.servers.get_server_obj(server_id) enabled = bleach.clean(self.get_argument('enabled', '0')) if difficulty == 'basic': action = bleach.clean(self.get_argument('action', None)) interval = bleach.clean(self.get_argument('interval', None)) interval_type = bleach.clean(self.get_argument('interval_type', None)) #only check for time if it's number of days if interval_type == "days": sch_time = bleach.clean(self.get_argument('time', None)) if action == "command": command = bleach.clean(self.get_argument('command', None)) elif action == "start": command = "start_server" elif action == "stop": command = "stop_server" elif action == "restart": command = "restart_server" elif action == "backup": command = "backup_server" elif difficulty == 'reaction': interval_type = 'reaction' action = bleach.clean(self.get_argument('action', None)) delay = bleach.clean(self.get_argument('delay', None)) parent = bleach.clean(self.get_argument('parent', None)) if action == "command": command = bleach.clean(self.get_argument('command', None)) elif action == "start": command = "start_server" elif action == "stop": command = "stop_server" elif action == "restart": command = "restart_server" elif action == "backup": command = "backup_server" parent = bleach.clean(self.get_argument('parent', None)) else: interval_type = '' cron_string = bleach.clean(self.get_argument('cron', '')) sch_id = self.get_argument('sch_id', None) try: CronValidator.parse(cron_string) except Exception as e: self.redirect(f"/panel/error?error=INVALID FORMAT: Invalid Cron Format. {e}") return action = bleach.clean(self.get_argument('action', None)) if action == "command": command = bleach.clean(self.get_argument('command', None)) elif action == "start": command = "start_server" elif action == "stop": command = "stop_server" elif action == "restart": command = "restart_server" elif action == "backup": command = "backup_server" if bleach.clean(self.get_argument('enabled', '0'))=='1': enabled = True else: enabled = False if bleach.clean(self.get_argument('one_time', '0')) == '1': one_time = True else: one_time = False if not superuser and not permissions['Backup'] in self.controller.server_perms.get_user_id_permissions_list(exec_user["user_id"], server_id): self.redirect("/panel/error?error=Unauthorized access: User not authorized") return elif server_id is None: self.redirect("/panel/error?error=Invalid Server ID") return else: # does this server id exist? if not self.controller.servers.server_id_exists(server_id): self.redirect("/panel/error?error=Invalid Server ID") return if interval_type == "days": job_data = { "server_id": server_id, "action": action, "interval_type": interval_type, "interval": interval, "command": command, "start_time": sch_time, "enabled": enabled, "one_time": one_time, "cron_string": '', "parent": None, "delay": 0 } elif difficulty == "advanced": job_data = { "server_id": server_id, "action": action, "interval_type": '', "interval": '', #We'll base every interval off of a midnight start time. "start_time": '', "command": command, "cron_string": cron_string, "delay": '', "parent": '', "enabled": enabled, "one_time": one_time } elif difficulty == "reaction": job_data = { "server_id": server_id, "action": action, "interval_type": interval_type, "interval": '', #We'll base every interval off of a midnight start time. "start_time": '', "command": command, "cron_string": '', "enabled": enabled, "one_time": one_time, "parent": parent, "delay": delay } else: job_data = { "server_id": server_id, "action": action, "interval_type": interval_type, "interval": interval, "command": command, "enabled": enabled, #We'll base every interval off of a midnight start time. "start_time": '00:00', "delay": '', "parent": '', "one_time": one_time, "cron_string": '' } sch_id = self.get_argument('sch_id', None) self.tasks_manager.update_job(sch_id, job_data) self.controller.management.add_to_audit_log(exec_user['user_id'], f"Edited server {server_id}: updated schedule", server_id, self.get_remote_ip()) self.tasks_manager.reload_schedule_from_db() self.redirect(f"/panel/server_detail?id={server_id}&subpage=schedules") elif page == "edit_user": if bleach.clean(self.get_argument('username', None)) == 'system': self.redirect("/panel/error?error=Unauthorized access: system user is not editable") user_id = bleach.clean(self.get_argument('id', None)) username = bleach.clean(self.get_argument('username', None)) password0 = bleach.clean(self.get_argument('password0', None)) password1 = bleach.clean(self.get_argument('password1', None)) email = bleach.clean(self.get_argument('email', "default@example.com")) enabled = int(float(self.get_argument('enabled', '0'))) lang = bleach.clean(self.get_argument('language'), helper.get_setting('language')) if superuser: #Checks if user is trying to change super user status of self. We don't want that. # Automatically make them stay super user since we know they are. if str(exec_user['user_id']) != str(user_id): superuser = bleach.clean(self.get_argument('superuser', '0')) else: superuser = '1' else: superuser = '0' if superuser == '1': superuser = True else: superuser = False if Enum_Permissions_Crafty.User_Config not in exec_user_crafty_permissions: if str(user_id) != str(exec_user["user_id"]): self.redirect("/panel/error?error=Unauthorized access: not a user editor") return user_data = { "username": username, "password": password0, "lang": lang, } self.controller.users.update_user(user_id, user_data=user_data) self.controller.management.add_to_audit_log(exec_user['user_id'], f"Edited user {username} (UID:{user_id}) password", server_id=0, source_ip=self.get_remote_ip()) self.redirect("/panel/panel_config") return elif username is None or username == "": self.redirect("/panel/error?error=Invalid username") return elif user_id is None: self.redirect("/panel/error?error=Invalid User ID") return else: # does this user id exist? if not self.controller.users.user_id_exists(user_id): self.redirect("/panel/error?error=Invalid User ID") return if password0 != password1: self.redirect("/panel/error?error=Passwords must match") return roles = self.get_user_role_memberships() permissions_mask, server_quantity = self.get_perms_quantity() # if email is None or "": # email = "default@example.com" user_data = { "username": username, "password": password0, "email": email, "enabled": enabled, "roles": roles, "lang": lang, "superuser": superuser, } user_crafty_data = { "permissions_mask": permissions_mask, "server_quantity": server_quantity } self.controller.users.update_user(user_id, user_data=user_data, user_crafty_data=user_crafty_data) self.controller.management.add_to_audit_log(exec_user['user_id'], f"Edited user {username} (UID:{user_id}) with roles {roles} and permissions {permissions_mask}", server_id=0, source_ip=self.get_remote_ip()) self.redirect("/panel/panel_config") elif page == "edit_user_apikeys": user_id = self.get_argument('id', None) name = self.get_argument('name', None) superuser = self.get_argument('superuser', None) == '1' if name is None or name == "": self.redirect("/panel/error?error=Invalid API key name") return elif user_id is None: self.redirect("/panel/error?error=Invalid User ID") return else: # does this user id exist? if not self.controller.users.user_id_exists(user_id): self.redirect("/panel/error?error=Invalid User ID") return crafty_permissions_mask = self.get_perms() server_permissions_mask = self.get_perms_server() self.controller.users.add_user_api_key(name, user_id, superuser, crafty_permissions_mask, server_permissions_mask) self.controller.management.add_to_audit_log(exec_user['user_id'], f"Added API key {name} with crafty permissions {crafty_permissions_mask}" + f" and {server_permissions_mask} for user with UID: {user_id}", server_id=0, source_ip=self.get_remote_ip()) self.redirect(f"/panel/edit_user_apikeys?id={user_id}") elif page == "get_token": key_id = self.get_argument('id', None) if key_id is None: self.redirect("/panel/error?error=Invalid Key ID") return else: key = self.controller.users.get_user_api_key(key_id) # does this user id exist? if key is None: self.redirect("/panel/error?error=Invalid Key ID") return self.controller.management.add_to_audit_log(exec_user['user_id'], f"Generated a new API token for the key {key.name} from user with UID: {key.user.user_id}", server_id=0, source_ip=self.get_remote_ip()) self.write(authentication.generate(key.user.user_id, { 'token_id': key.token_id })) self.finish() elif page == "add_user": if bleach.clean(self.get_argument('username', None)).lower() == 'system': self.redirect("/panel/error?error=Unauthorized access: username system is reserved for the Crafty system." + " Please choose a different username.") return username = bleach.clean(self.get_argument('username', None)) password0 = bleach.clean(self.get_argument('password0', None)) password1 = bleach.clean(self.get_argument('password1', None)) email = bleach.clean(self.get_argument('email', "default@example.com")) enabled = int(float(self.get_argument('enabled', '0'))) lang = bleach.clean(self.get_argument('lang', helper.get_setting('language'))) if superuser: superuser = bleach.clean(self.get_argument('superuser', '0')) else: superuser = '0' if superuser == '1': superuser = True else: superuser = False if Enum_Permissions_Crafty.User_Config not in exec_user_crafty_permissions: self.redirect("/panel/error?error=Unauthorized access: not a user editor") return elif username is None or username == "": self.redirect("/panel/error?error=Invalid username") return else: # does this user id exist? if self.controller.users.get_id_by_name(username) is not None: self.redirect("/panel/error?error=User exists") return if password0 != password1: self.redirect("/panel/error?error=Passwords must match") return roles = self.get_user_role_memberships() permissions_mask, server_quantity = self.get_perms_quantity() user_id = self.controller.users.add_user(username, password=password0, email=email, enabled=enabled, superuser=superuser) user_data = { "roles": roles, 'lang': lang, } user_crafty_data = { "permissions_mask": permissions_mask, "server_quantity": server_quantity } self.controller.users.update_user(user_id, user_data=user_data, user_crafty_data=user_crafty_data) self.controller.management.add_to_audit_log(exec_user['user_id'], f"Added user {username} (UID:{user_id})", server_id=0, source_ip=self.get_remote_ip()) self.controller.management.add_to_audit_log(exec_user['user_id'], f"Edited user {username} (UID:{user_id}) with roles {roles}", server_id=0, source_ip=self.get_remote_ip()) self.redirect("/panel/panel_config") elif page == "edit_role": role_id = bleach.clean(self.get_argument('id', None)) role_name = bleach.clean(self.get_argument('role_name', None)) if Enum_Permissions_Crafty.Roles_Config not in exec_user_crafty_permissions: self.redirect("/panel/error?error=Unauthorized access: not a role editor") return elif role_name is None or role_name == "": self.redirect("/panel/error?error=Invalid username") return elif role_id is None: self.redirect("/panel/error?error=Invalid Role ID") return else: # does this user id exist? if not self.controller.roles.role_id_exists(role_id): self.redirect("/panel/error?error=Invalid Role ID") return servers = self.get_role_servers() permissions_mask = self.get_perms_server() role_data = { "role_name": role_name, "servers": servers } self.controller.roles.update_role(role_id, role_data=role_data, permissions_mask=permissions_mask) self.controller.management.add_to_audit_log(exec_user['user_id'], f"Edited role {role_name} (RID:{role_id}) with servers {servers}", server_id=0, source_ip=self.get_remote_ip()) self.redirect("/panel/panel_config") elif page == "add_role": role_name = bleach.clean(self.get_argument('role_name', None)) if Enum_Permissions_Crafty.Roles_Config not in exec_user_crafty_permissions: self.redirect("/panel/error?error=Unauthorized access: not a role editor") return elif role_name is None or role_name == "": self.redirect("/panel/error?error=Invalid role name") return else: # does this user id exist? if self.controller.roles.get_roleid_by_name(role_name) is not None: self.redirect("/panel/error?error=Role exists") return servers = self.get_role_servers() permissions_mask = self.get_perms_server() role_id = self.controller.roles.add_role(role_name) self.controller.roles.update_role(role_id, {"servers": servers}, permissions_mask) self.controller.management.add_to_audit_log(exec_user['user_id'], f"Added role {role_name} (RID:{role_id})", server_id=0, source_ip=self.get_remote_ip()) self.controller.management.add_to_audit_log(exec_user['user_id'], f"Edited role {role_name} (RID:{role_id}) with servers {servers}", server_id=0, source_ip=self.get_remote_ip()) self.redirect("/panel/panel_config") else: self.set_status(404) page_data = { 'lang': helper.get_setting('language'), 'lang_page': helper.getLangPage(helper.get_setting('language')), } self.render( "public/404.html", translate=self.translator.translate, data=page_data ) @tornado.web.authenticated def delete(self, page): api_key, _token_data, exec_user = self.current_user superuser = exec_user['superuser'] if api_key is not None: superuser = superuser and api_key.superuser page_data = { # todo: make this actually pull and compare version data 'update_available': False, 'version_data': helper.get_version_string(), 'user_data': exec_user, 'hosts_data': self.controller.management.get_latest_hosts_stats(), 'show_contribute': helper.get_setting("show_contribute_link", True), 'lang': self.controller.users.get_user_lang_by_id(exec_user["user_id"]), 'lang_page': helper.getLangPage(self.controller.users.get_user_lang_by_id(exec_user["user_id"])), } if page == "remove_apikey": key_id = bleach.clean(self.get_argument('id', None)) if not superuser: self.redirect("/panel/error?error=Unauthorized access: not superuser") return elif key_id is None or self.controller.users.get_user_api_key(key_id) is None: self.redirect("/panel/error?error=Invalid Key ID") return else: # does this user id exist? target_key = self.controller.users.get_user_api_key(key_id) if not target_key: self.redirect("/panel/error?error=Invalid Key ID") return self.controller.users.delete_user_api_key(key_id) self.controller.management.add_to_audit_log(exec_user['user_id'], f"Removed API key {target_key} (ID: {key_id}) from user {exec_user['user_id']}", server_id=0, source_ip=self.get_remote_ip()) self.redirect("/panel/panel_config") else: self.set_status(404) self.render( "public/404.html", data=page_data, translate=self.translator.translate, )