mirror of
https://gitlab.com/crafty-controller/crafty-4.git
synced 2025-01-26 16:25:29 +01:00
296 lines
10 KiB
Python
296 lines
10 KiB
Python
import json
|
|
import logging
|
|
import typing as t
|
|
|
|
from jsonschema import ValidationError, validate
|
|
from app.classes.controllers.users_controller import UsersController
|
|
from app.classes.models.crafty_permissions import (
|
|
EnumPermissionsCrafty,
|
|
PermissionsCrafty,
|
|
)
|
|
from app.classes.models.roles import HelperRoles
|
|
from app.classes.models.users import HelperUsers
|
|
from app.classes.web.base_api_handler import BaseApiHandler
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ApiUsersUserIndexHandler(BaseApiHandler):
|
|
def get(self, user_id: str):
|
|
auth_data = self.authenticate_user()
|
|
if not auth_data:
|
|
return
|
|
(
|
|
_,
|
|
exec_user_crafty_permissions,
|
|
_,
|
|
_,
|
|
user,
|
|
) = auth_data
|
|
|
|
if user_id in ["@me", user["user_id"]]:
|
|
user_id = user["user_id"]
|
|
res_user = user
|
|
elif EnumPermissionsCrafty.USER_CONFIG not in exec_user_crafty_permissions:
|
|
return self.finish_json(
|
|
400,
|
|
{
|
|
"status": "error",
|
|
"error": "NOT_AUTHORIZED",
|
|
},
|
|
)
|
|
else:
|
|
# has User_Config permission and isn't viewing self
|
|
res_user = self.controller.users.get_user_by_id(user_id)
|
|
if not res_user:
|
|
return self.finish_json(
|
|
404,
|
|
{
|
|
"status": "error",
|
|
"error": "USER_NOT_FOUND",
|
|
},
|
|
)
|
|
|
|
# Remove password and valid_tokens_from from the response
|
|
# as those should never be sent out to the client.
|
|
res_user.pop("password", None)
|
|
res_user.pop("valid_tokens_from", None)
|
|
res_user["roles"] = list(
|
|
map(HelperRoles.get_role, res_user.get("roles", set()))
|
|
)
|
|
|
|
self.finish_json(
|
|
200,
|
|
{"status": "ok", "data": res_user},
|
|
)
|
|
|
|
def delete(self, user_id: str):
|
|
auth_data = self.authenticate_user()
|
|
if not auth_data:
|
|
return
|
|
(
|
|
_,
|
|
exec_user_crafty_permissions,
|
|
_,
|
|
_,
|
|
user,
|
|
) = auth_data
|
|
|
|
if (user_id in ["@me", user["user_id"]]) and self.helper.get_setting(
|
|
"allow_self_delete", False
|
|
):
|
|
user_id = user["user_id"]
|
|
self.controller.users.remove_user(user_id)
|
|
elif EnumPermissionsCrafty.USER_CONFIG not in exec_user_crafty_permissions:
|
|
return self.finish_json(
|
|
400,
|
|
{
|
|
"status": "error",
|
|
"error": "NOT_AUTHORIZED",
|
|
},
|
|
)
|
|
else:
|
|
# has User_Config permission
|
|
self.controller.users.remove_user(user_id)
|
|
|
|
self.controller.management.add_to_audit_log(
|
|
user["user_id"],
|
|
f"deleted the user {user_id}",
|
|
server_id=0,
|
|
source_ip=self.get_remote_ip(),
|
|
)
|
|
|
|
self.finish_json(
|
|
200,
|
|
{"status": "ok"},
|
|
)
|
|
|
|
def patch(self, user_id: str):
|
|
user_patch_schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
**self.controller.users.user_jsonschema_props,
|
|
},
|
|
"additionalProperties": False,
|
|
"minProperties": 1,
|
|
}
|
|
auth_data = self.authenticate_user()
|
|
if not auth_data:
|
|
return
|
|
(
|
|
_,
|
|
exec_user_crafty_permissions,
|
|
_,
|
|
superuser,
|
|
user,
|
|
) = auth_data
|
|
|
|
try:
|
|
data = json.loads(self.request.body)
|
|
except json.decoder.JSONDecodeError as e:
|
|
return self.finish_json(
|
|
400, {"status": "error", "error": "INVALID_JSON", "error_data": str(e)}
|
|
)
|
|
|
|
try:
|
|
validate(data, user_patch_schema)
|
|
except ValidationError as e:
|
|
return self.finish_json(
|
|
400,
|
|
{
|
|
"status": "error",
|
|
"error": "INVALID_JSON_SCHEMA",
|
|
"error_data": str(e),
|
|
},
|
|
)
|
|
|
|
if user_id == "@me":
|
|
user_id = user["user_id"]
|
|
|
|
if (
|
|
EnumPermissionsCrafty.USER_CONFIG not in exec_user_crafty_permissions
|
|
and str(user["user_id"]) != str(user_id)
|
|
):
|
|
# If doesn't have perm can't edit other users
|
|
return self.finish_json(
|
|
400,
|
|
{
|
|
"status": "error",
|
|
"error": "NOT_AUTHORIZED",
|
|
},
|
|
)
|
|
|
|
if "username" in data:
|
|
if data["username"].lower() in ["system", ""]:
|
|
return self.finish_json(
|
|
400, {"status": "error", "error": "INVALID_USERNAME"}
|
|
)
|
|
if self.controller.users.get_id_by_name(
|
|
data["username"]
|
|
) is not None and self.controller.users.get_id_by_name(
|
|
data["username"]
|
|
) != int(
|
|
user_id
|
|
):
|
|
return self.finish_json(
|
|
400, {"status": "error", "error": "USER_EXISTS"}
|
|
)
|
|
|
|
if "superuser" in data:
|
|
if str(user["user_id"]) == str(user_id) and not superuser:
|
|
# Checks if user is trying to change super user status
|
|
# of self without superuser. We don't want that.
|
|
return self.finish_json(
|
|
400, {"status": "error", "error": "INVALID_SUPERUSER_MODIFY"}
|
|
)
|
|
if not superuser:
|
|
# The user is not superuser so they can't change the superuser status
|
|
data.pop("superuser")
|
|
|
|
if "permissions" in data:
|
|
if str(user["user_id"]) == str(user_id) and not superuser:
|
|
# Checks if user is trying to change permissions
|
|
# of self without superuser. We don't want that.
|
|
return self.finish_json(
|
|
400, {"status": "error", "error": "INVALID_PERMISSIONS_MODIFY"}
|
|
)
|
|
if EnumPermissionsCrafty.USER_CONFIG not in exec_user_crafty_permissions:
|
|
# Checks if user is trying to change permissions of someone
|
|
# else without User Config permission. We don't want that.
|
|
return self.finish_json(
|
|
400, {"status": "error", "error": "INVALID_PERMISSIONS_MODIFY"}
|
|
)
|
|
|
|
if "roles" in data:
|
|
if str(user["user_id"]) == str(user_id) and not superuser:
|
|
# Checks if user is trying to change roles of
|
|
# self without superuser. We don't want that.
|
|
return self.finish_json(
|
|
400, {"status": "error", "error": "INVALID_ROLES_MODIFY"}
|
|
)
|
|
if EnumPermissionsCrafty.USER_CONFIG not in exec_user_crafty_permissions:
|
|
# Checks if user is trying to change roles of someone
|
|
# else without User Config permission. We don't want that.
|
|
return self.finish_json(
|
|
400, {"status": "error", "error": "INVALID_ROLES_MODIFY"}
|
|
)
|
|
|
|
user_obj = HelperUsers.get_user_model(user_id)
|
|
if "password" in data and str(user["user_id"]) != str(user_id):
|
|
if str(user["user_id"]) != str(user_obj.manager):
|
|
# TODO: edit your own password
|
|
return self.finish_json(
|
|
400, {"status": "error", "error": "INVALID_PASSWORD_MODIFY"}
|
|
)
|
|
|
|
if "roles" in data:
|
|
roles: t.Set[str] = set(data.pop("roles"))
|
|
base_roles: t.Set[str] = set(user_obj.roles)
|
|
added_roles = roles.difference(base_roles)
|
|
removed_roles = base_roles.difference(roles)
|
|
logger.debug(
|
|
f"updating user {user_id}'s roles: "
|
|
f"+role:{added_roles} -role:{removed_roles}"
|
|
)
|
|
|
|
for role_id in added_roles:
|
|
HelperUsers.get_or_create(user_id, role_id)
|
|
|
|
if len(removed_roles) != 0:
|
|
self.controller.users.users_helper.delete_user_roles(
|
|
user_id, removed_roles
|
|
)
|
|
|
|
if "manager" in data and (
|
|
data["manager"] == self.controller.users.get_id_by_name("SYSTEM")
|
|
or data["manager"] == 0
|
|
):
|
|
data["manager"] = None
|
|
|
|
if "permissions" in data:
|
|
permissions: t.List[UsersController.ApiPermissionDict] = data.pop(
|
|
"permissions"
|
|
)
|
|
permissions_mask = "0" * len(EnumPermissionsCrafty)
|
|
limit_server_creation = 0
|
|
limit_user_creation = 0
|
|
limit_role_creation = 0
|
|
|
|
for permission in permissions:
|
|
permissions_mask = self.controller.crafty_perms.set_permission(
|
|
permissions_mask,
|
|
EnumPermissionsCrafty.__members__[permission["name"]],
|
|
"1" if permission["enabled"] else "0",
|
|
)
|
|
|
|
PermissionsCrafty.add_or_update_user(
|
|
user_id,
|
|
permissions_mask,
|
|
limit_server_creation,
|
|
limit_user_creation,
|
|
limit_role_creation,
|
|
)
|
|
|
|
# TODO: make this more efficient
|
|
if len(data) != 0:
|
|
for key in data:
|
|
# If we don't validate the input there could be security issues
|
|
value = data[key]
|
|
if key == "password":
|
|
value = self.helper.encode_pass(value)
|
|
setattr(user_obj, key, value)
|
|
user_obj.save()
|
|
|
|
self.controller.management.add_to_audit_log(
|
|
user["user_id"],
|
|
(
|
|
f"edited user {user_obj.username} (UID: {user_id})"
|
|
f"with roles {user_obj.roles}"
|
|
),
|
|
server_id=0,
|
|
source_ip=self.get_remote_ip(),
|
|
)
|
|
|
|
return self.finish_json(200, {"status": "ok"})
|