import os import shutil import html import re import logging import tornado.web import tornado.escape import bleach from app.classes.shared.console import console from app.classes.shared.helpers import helper from app.classes.shared.server import ServerOutBuf from app.classes.web.base_handler import BaseHandler from app.classes.models.server_permissions import Enum_Permissions_Server logger = logging.getLogger(__name__) class AjaxHandler(BaseHandler): def render_page(self, template, page_data): self.render( template, data=page_data, translate=self.translator.translate, ) @tornado.web.authenticated def get(self, page): _, _, exec_user = self.current_user error = bleach.clean(self.get_argument('error', "WTF Error!")) template = "panel/denied.html" page_data = { 'user_data': exec_user, 'error': error } if page == "error": template = "public/error.html" self.render_page(template, page_data) elif page == 'server_log': server_id = self.get_argument('id', None) full_log = self.get_argument('full', False) if server_id is None: logger.warning("Server ID not found in server_log ajax call") self.redirect("/panel/error?error=Server ID Not Found") return server_id = bleach.clean(server_id) server_data = self.controller.servers.get_server_data_by_id(server_id) if not server_data: logger.warning("Server Data not found in server_log ajax call") self.redirect("/panel/error?error=Server ID Not Found") return if not server_data['log_path']: logger.warning(f"Log path not found in server_log ajax call ({server_id})") if full_log: log_lines = helper.get_setting('max_log_lines') data = helper.tail_file(helper.get_os_understandable_path(server_data['log_path']), log_lines) else: data = ServerOutBuf.lines.get(server_id, []) for d in data: try: d = re.sub('(\033\\[(0;)?[0-9]*[A-z]?(;[0-9])?m?)|(> )', '', d) d = re.sub('[A-z]{2}\b\b', '', d) line = helper.log_colors(html.escape(d)) self.write(f'{line}
') # self.write(d.encode("utf-8")) except Exception as e: logger.warning(f"Skipping Log Line due to error: {e}") elif page == "announcements": data = helper.get_announcements() page_data['notify_data'] = data self.render_page('ajax/notify.html', page_data) elif page == "get_file": file_path = helper.get_os_understandable_path(self.get_argument('file_path', None)) server_id = self.get_argument('id', None) if not self.check_server_id(server_id, 'get_file'): return else: server_id = bleach.clean(server_id) if not helper.in_path(helper.get_os_understandable_path(self.controller.servers.get_server_data_by_id(server_id)['path']), file_path)\ or not helper.check_file_exists(os.path.abspath(file_path)): logger.warning(f"Invalid path in get_file ajax call ({file_path})") console.warning(f"Invalid path in get_file ajax call ({file_path})") return error = None try: with open(file_path, encoding='utf-8') as file: file_contents = file.read() except UnicodeDecodeError: file_contents = '' error = 'UnicodeDecodeError' self.write({ 'content': file_contents, 'error': error }) self.finish() elif page == "get_tree": server_id = self.get_argument('id', None) path = self.get_argument('path', None) if not self.check_server_id(server_id, 'get_tree'): return else: server_id = bleach.clean(server_id) if helper.validate_traversal(self.controller.servers.get_server_data_by_id(server_id)['path'], path): self.write(helper.get_os_understandable_path(path) + '\n' + helper.generate_tree(path)) self.finish() elif page == "get_zip_tree": server_id = self.get_argument('id', None) path = self.get_argument('path', None) self.write(helper.get_os_understandable_path(path) + '\n' + helper.generate_zip_tree(path)) self.finish() elif page == "get_zip_dir": server_id = self.get_argument('id', None) path = self.get_argument('path', None) self.write(helper.get_os_understandable_path(path) + '\n' + helper.generate_zip_dir(path)) self.finish() elif page == "get_backup_tree": server_id = self.get_argument('id', None) folder = self.get_argument('path', None) output = "" file_list = os.listdir(folder) file_list = sorted(file_list, key=str.casefold) output += \ f"""