129 lines
4.6 KiB
Python
Raw Normal View History

import logging
import os
from app.classes.models.server_permissions import EnumPermissionsServer
from app.classes.models.servers import Servers
from app.classes.shared.file_helpers import FileHelpers
from app.classes.web.base_api_handler import BaseApiHandler
logger = logging.getLogger(__name__)
class ApiServersServerActionHandler(BaseApiHandler):
def post(self, server_id: str, action: str):
auth_data = self.authenticate_user()
if not auth_data:
return
if server_id not in [str(x["server_id"]) for x in auth_data[0]]:
# if the user doesn't have access to the server, return an error
return self.finish_json(400, {"status": "error", "error": "NOT_AUTHORIZED"})
if (
EnumPermissionsServer.COMMANDS
not in self.controller.server_perms.get_user_id_permissions_list(
auth_data[4]["user_id"], server_id
)
):
# if the user doesn't have Commands permission, return an error
return self.finish_json(400, {"status": "error", "error": "NOT_AUTHORIZED"})
if action == "clone_server":
if (
self.controller.crafty_perms.can_create_server(auth_data[4]["user_id"])
or auth_data[4]["superuser"]
):
srv_object = self.controller.servers.get_server_instance_by_id(
server_id
)
if srv_object.check_running():
return self.finish_json(
409,
{
"status": "error",
"error": "Server Running!",
},
)
self._clone_server(server_id, auth_data[4]["user_id"])
return self.finish_json(200, {"status": "ok"})
return self.finish_json(
200, {"status": "error", "error": "SERVER_LIMIT_REACHED"}
)
2023-09-04 18:24:10 -04:00
if action == "eula":
2023-07-13 10:39:25 -04:00
return self._agree_eula(server_id, auth_data[4]["user_id"])
self.controller.management.send_command(
auth_data[4]["user_id"], server_id, self.get_remote_ip(), action
)
self.finish_json(
200,
{"status": "ok"},
)
2023-07-13 10:39:25 -04:00
def _agree_eula(self, server_id, user):
svr = self.controller.servers.get_server_instance_by_id(server_id)
svr.agree_eula(user)
return self.finish_json(200, {"status": "ok"})
def _clone_server(self, server_id, user_id):
def is_name_used(name):
return Servers.select().where(Servers.server_name == name).exists()
server_data = self.controller.servers.get_server_data_by_id(server_id)
new_server_name = server_data.get("server_name") + " (Copy)"
name_counter = 1
while is_name_used(new_server_name):
name_counter += 1
new_server_name = server_data.get("server_name") + f" (Copy {name_counter})"
new_server_id = self.helper.create_uuid()
new_server_path = os.path.join(self.helper.servers_dir, new_server_id)
new_backup_path = os.path.join(self.helper.backup_path, new_server_id)
new_server_command = str(server_data.get("execution_command")).replace(
server_id, new_server_id
)
new_server_log_path = server_data.get("log_path").replace(
server_id, new_server_id
)
self.controller.register_server(
2023-11-01 18:44:56 +01:00
new_server_name,
new_server_id,
new_server_path,
new_backup_path,
new_server_command,
2023-11-01 18:44:56 +01:00
server_data.get("executable"),
new_server_log_path,
2023-11-01 18:44:56 +01:00
server_data.get("stop_command"),
server_data.get("server_port"),
user_id,
server_data.get("type"),
2023-11-01 18:44:56 +01:00
)
self.controller.management.add_to_audit_log(
user_id,
f"is cloning server {server_id} named {server_data.get('server_name')}",
server_id,
self.get_remote_ip(),
)
# copy the old server
FileHelpers.copy_dir(server_data.get("path"), new_server_path)
for role in self.controller.server_perms.get_server_roles(server_id):
mask = self.controller.server_perms.get_permissions_mask(
role.role_id, server_id
)
self.controller.server_perms.add_role_server(
new_server_id, role.role_id, mask
)
2022-05-30 07:36:25 +02:00
self.controller.servers.init_all_servers()
self.finish_json(
200,
{"status": "ok", "data": {"new_server_id": str(new_server_id)}},
)