mirror of
https://gitlab.com/crafty-controller/crafty-4.git
synced 2025-01-21 02:35:28 +01:00
212 lines
7.1 KiB
Python
212 lines
7.1 KiB
Python
|
import json
|
||
|
import logging
|
||
|
|
||
|
from jsonschema import ValidationError, validate
|
||
|
from app.classes.models.crafty_permissions import Enum_Permissions_Crafty
|
||
|
from app.classes.models.roles import helper_roles
|
||
|
from app.classes.web.base_api_handler import BaseApiHandler
|
||
|
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
class ApiUsersUserIndexHandler(BaseApiHandler):
|
||
|
def get(self, user_id: str):
|
||
|
auth_data = self.authenticate_user()
|
||
|
if not auth_data:
|
||
|
return
|
||
|
(
|
||
|
_,
|
||
|
exec_user_crafty_permissions,
|
||
|
_,
|
||
|
_,
|
||
|
user,
|
||
|
) = auth_data
|
||
|
|
||
|
if user_id in ["@me", user["user_id"]]:
|
||
|
user_id = user["user_id"]
|
||
|
res_user = user
|
||
|
elif Enum_Permissions_Crafty.User_Config not in exec_user_crafty_permissions:
|
||
|
return self.finish_json(
|
||
|
400,
|
||
|
{
|
||
|
"status": "error",
|
||
|
"error": "NOT_AUTHORIZED",
|
||
|
},
|
||
|
)
|
||
|
else:
|
||
|
# has User_Config permission and isn't viewing self
|
||
|
res_user = self.controller.users.get_user(user_id)
|
||
|
|
||
|
# Remove password and valid_tokens_from from the response
|
||
|
# as those should never be sent out to the client.
|
||
|
res_user.pop("password", None)
|
||
|
res_user.pop("valid_tokens_from", None)
|
||
|
res_user["roles"] = list(
|
||
|
map(helper_roles.get_role, res_user.get("roles", set()))
|
||
|
)
|
||
|
|
||
|
self.finish_json(
|
||
|
200,
|
||
|
{"status": "ok", "data": res_user},
|
||
|
)
|
||
|
|
||
|
def delete(self, user_id: str):
|
||
|
auth_data = self.authenticate_user()
|
||
|
if not auth_data:
|
||
|
return
|
||
|
(
|
||
|
_,
|
||
|
exec_user_crafty_permissions,
|
||
|
_,
|
||
|
_,
|
||
|
user,
|
||
|
) = auth_data
|
||
|
|
||
|
if (user_id in ["@me", user["user_id"]]) and self.helper.get_setting(
|
||
|
"allow_self_delete", False
|
||
|
):
|
||
|
self.controller.users.remove_user(user["user_id"])
|
||
|
elif Enum_Permissions_Crafty.User_Config not in exec_user_crafty_permissions:
|
||
|
return self.finish_json(
|
||
|
400,
|
||
|
{
|
||
|
"status": "error",
|
||
|
"error": "NOT_AUTHORIZED",
|
||
|
},
|
||
|
)
|
||
|
else:
|
||
|
# has User_Config permission
|
||
|
self.controller.users.remove_user(user_id)
|
||
|
|
||
|
self.finish_json(
|
||
|
200,
|
||
|
{"status": "ok"},
|
||
|
)
|
||
|
|
||
|
def patch(self, user_id: str):
|
||
|
user_patch_schema = {
|
||
|
"type": "object",
|
||
|
"properties": {
|
||
|
**self.controller.users.user_jsonschema_props,
|
||
|
},
|
||
|
"anyOf": [
|
||
|
# Require at least one property
|
||
|
{"required": [name]}
|
||
|
for name in [
|
||
|
"username",
|
||
|
"password",
|
||
|
"email",
|
||
|
"enabled",
|
||
|
"lang",
|
||
|
"superuser",
|
||
|
"permissions",
|
||
|
"roles",
|
||
|
"hints",
|
||
|
]
|
||
|
],
|
||
|
"additionalProperties": False,
|
||
|
}
|
||
|
auth_data = self.authenticate_user()
|
||
|
if not auth_data:
|
||
|
return
|
||
|
(
|
||
|
_,
|
||
|
exec_user_crafty_permissions,
|
||
|
_,
|
||
|
superuser,
|
||
|
user,
|
||
|
) = auth_data
|
||
|
|
||
|
try:
|
||
|
data = json.loads(self.request.body)
|
||
|
except json.decoder.JSONDecodeError as e:
|
||
|
return self.finish_json(
|
||
|
400, {"status": "error", "error": "INVALID_JSON", "error_data": str(e)}
|
||
|
)
|
||
|
|
||
|
try:
|
||
|
validate(data, user_patch_schema)
|
||
|
except ValidationError as e:
|
||
|
return self.finish_json(
|
||
|
400,
|
||
|
{
|
||
|
"status": "error",
|
||
|
"error": "INVALID_JSON_SCHEMA",
|
||
|
"error_data": str(e),
|
||
|
},
|
||
|
)
|
||
|
|
||
|
if user_id == "@me":
|
||
|
user_id = user["user_id"]
|
||
|
|
||
|
if (
|
||
|
Enum_Permissions_Crafty.User_Config not in exec_user_crafty_permissions
|
||
|
and str(user["user_id"]) != str(user_id)
|
||
|
):
|
||
|
# If doesn't have perm can't edit other users
|
||
|
return self.finish_json(
|
||
|
400,
|
||
|
{
|
||
|
"status": "error",
|
||
|
"error": "NOT_AUTHORIZED",
|
||
|
},
|
||
|
)
|
||
|
|
||
|
if data.get("username", None) is not None:
|
||
|
if data["username"].lower() in ["system", ""]:
|
||
|
return self.finish_json(
|
||
|
400, {"status": "error", "error": "INVALID_USERNAME"}
|
||
|
)
|
||
|
if self.controller.users.get_id_by_name(data["username"]) is not None:
|
||
|
return self.finish_json(
|
||
|
400, {"status": "error", "error": "USER_EXISTS"}
|
||
|
)
|
||
|
|
||
|
if data.get("superuser", None) is not None:
|
||
|
if str(user["user_id"]) == str(user_id):
|
||
|
# Checks if user is trying to change super user status of self.
|
||
|
# We don't want that.
|
||
|
return self.finish_json(
|
||
|
400, {"status": "error", "error": "INVALID_SUPERUSER_MODIFY"}
|
||
|
)
|
||
|
if not superuser:
|
||
|
# The user is not superuser so they can't change the superuser status
|
||
|
data.pop("superuser")
|
||
|
|
||
|
if data.get("permissions", None) is not None:
|
||
|
if str(user["user_id"]) == str(user_id):
|
||
|
# Checks if user is trying to change permissions of self.
|
||
|
# We don't want that.
|
||
|
return self.finish_json(
|
||
|
400, {"status": "error", "error": "INVALID_PERMISSIONS_MODIFY"}
|
||
|
)
|
||
|
if Enum_Permissions_Crafty.User_Config not in exec_user_crafty_permissions:
|
||
|
# Checks if user is trying to change permissions of someone
|
||
|
# else without User Config permission. We don't want that.
|
||
|
return self.finish_json(
|
||
|
400, {"status": "error", "error": "INVALID_PERMISSIONS_MODIFY"}
|
||
|
)
|
||
|
|
||
|
if data.get("roles", None) is not None:
|
||
|
if str(user["user_id"]) == str(user_id):
|
||
|
# Checks if user is trying to change roles of self.
|
||
|
# We don't want that.
|
||
|
return self.finish_json(
|
||
|
400, {"status": "error", "error": "INVALID_ROLES_MODIFY"}
|
||
|
)
|
||
|
if Enum_Permissions_Crafty.User_Config not in exec_user_crafty_permissions:
|
||
|
# Checks if user is trying to change roles of someone
|
||
|
# else without User Config permission. We don't want that.
|
||
|
return self.finish_json(
|
||
|
400, {"status": "error", "error": "INVALID_ROLES_MODIFY"}
|
||
|
)
|
||
|
|
||
|
server_obj = self.controller.servers.get_server_obj(user_id)
|
||
|
for key in data:
|
||
|
# If we don't validate the input there could be security issues
|
||
|
setattr(self, key, data[key])
|
||
|
self.controller.servers.update_server(server_obj)
|
||
|
|
||
|
return self.finish_json(200, {"status": "ok"})
|