crafty-4/app/classes/controllers/roles_controller.py

181 lines
6.5 KiB
Python
Raw Normal View History

2021-09-09 00:01:10 +02:00
import logging
2022-05-10 02:08:49 +03:00
import typing as t
2021-09-09 00:01:10 +02:00
from app.classes.models.roles import HelperRoles
2022-05-10 02:08:49 +03:00
from app.classes.models.server_permissions import PermissionsServers, RoleServers
from app.classes.shared.helpers import Helpers
2021-09-09 00:01:10 +02:00
logger = logging.getLogger(__name__)
2021-09-29 21:46:11 +02:00
class RolesController:
def __init__(self, users_helper, roles_helper):
self.users_helper = users_helper
self.roles_helper = roles_helper
2022-04-11 11:08:36 +01:00
2021-09-09 00:01:10 +02:00
@staticmethod
def get_all_roles():
return HelperRoles.get_all_roles()
2021-09-09 00:01:10 +02:00
@staticmethod
def get_all_role_ids():
return HelperRoles.get_all_role_ids()
2021-09-09 00:01:10 +02:00
@staticmethod
def get_roleid_by_name(role_name):
return HelperRoles.get_roleid_by_name(role_name)
2021-09-09 00:01:10 +02:00
@staticmethod
def get_role(role_id):
return HelperRoles.get_role(role_id)
2021-09-09 00:01:10 +02:00
@staticmethod
def update_role(role_id: str, role_data=None, permissions_mask: str = "00000000"):
2022-01-15 02:23:50 +02:00
if role_data is None:
role_data = {}
base_data = RolesController.get_role_with_servers(role_id)
2021-09-09 00:01:10 +02:00
up_data = {}
added_servers = set()
removed_servers = set()
for key in role_data:
if key == "role_id":
continue
elif key == "servers":
added_servers = set(role_data["servers"]).difference(
set(base_data["servers"])
)
removed_servers = set(base_data["servers"]).difference(
set(role_data["servers"])
)
2021-09-09 00:01:10 +02:00
elif base_data[key] != role_data[key]:
up_data[key] = role_data[key]
up_data["last_update"] = Helpers.get_time_as_string()
logger.debug(
f"role: {role_data} +server:{added_servers} -server{removed_servers}"
)
2021-09-09 00:01:10 +02:00
for server in added_servers:
PermissionsServers.get_or_create(role_id, server, permissions_mask)
for server in base_data["servers"]:
PermissionsServers.update_role_permission(role_id, server, permissions_mask)
# TODO: This is horribly inefficient and we should be using bulk queries
# but im going for functionality at this point
PermissionsServers.delete_roles_permissions(role_id, removed_servers)
2021-09-09 00:01:10 +02:00
if up_data:
HelperRoles.update_role(role_id, up_data)
2021-09-09 00:01:10 +02:00
@staticmethod
def add_role(role_name):
return HelperRoles.add_role(role_name)
2021-09-09 00:01:10 +02:00
2022-05-10 02:08:49 +03:00
class RoleServerJsonType(t.TypedDict):
server_id: t.Union[str, int]
permissions: str
@staticmethod
def get_server_ids_and_perms_from_role(
role_id: t.Union[str, int]
) -> t.List[RoleServerJsonType]:
# FIXME: somehow retrieve only the server ids, not the whole servers
return [
{
"server_id": role_servers.server_id.server_id,
"permissions": role_servers.permissions,
}
for role_servers in (
RoleServers.select(
RoleServers.server_id, RoleServers.permissions
).where(RoleServers.role_id == role_id)
)
]
@staticmethod
def add_role_advanced(
name: str,
servers: t.Iterable[RoleServerJsonType],
) -> int:
"""Add a role with a name and a list of servers
Args:
name (str): The new role's name
servers (t.List[RoleServerJsonType]): The new role's servers
Returns:
int: The new role's ID
"""
role_id: t.Final[int] = HelperRoles.add_role(name)
for server in servers:
PermissionsServers.get_or_create(
role_id, server["server_id"], server["permissions"]
)
return role_id
@staticmethod
def update_role_advanced(
role_id: t.Union[str, int],
role_name: t.Optional[str],
servers: t.Optional[t.Iterable[RoleServerJsonType]],
) -> None:
"""Update a role with a name and a list of servers
Args:
role_id (t.Union[str, int]): The ID of the role to be modified
role_name (t.Optional[str]): An optional new name for the role
servers (t.Optional[t.Iterable[RoleServerJsonType]]): An optional list of servers for the role
""" # pylint: disable=line-too-long
logger.debug(f"updating role {role_id} with advanced options")
if servers is not None:
base_data = RolesController.get_role_with_servers(role_id)
server_ids = {server["server_id"] for server in servers}
server_permissions_map = {
server["server_id"]: server["permissions"] for server in servers
}
added_servers = server_ids.difference(set(base_data["servers"]))
removed_servers = set(base_data["servers"]).difference(server_ids)
same_servers = server_ids.intersection(set(base_data["servers"]))
2022-05-10 02:08:49 +03:00
logger.debug(
f"role: {role_id} +server:{added_servers} -server{removed_servers}"
)
for server_id in added_servers:
PermissionsServers.get_or_create(
role_id, server_id, server_permissions_map[server_id]
)
if len(removed_servers) != 0:
PermissionsServers.delete_roles_permissions(role_id, removed_servers)
for server_id in same_servers:
PermissionsServers.update_role_permission(
role_id, server_id, server_permissions_map[server_id]
)
2022-05-10 02:08:49 +03:00
if role_name is not None:
up_data = {
"role_name": role_name,
"last_update": Helpers.get_time_as_string(),
}
# TODO: do the last_update on the db side
HelperRoles.update_role(role_id, up_data)
def remove_role(self, role_id):
role_data = RolesController.get_role_with_servers(role_id)
PermissionsServers.delete_roles_permissions(role_id, role_data["servers"])
self.users_helper.remove_roles_from_role_id(role_id)
return self.roles_helper.remove_role(role_id)
2021-09-09 00:01:10 +02:00
@staticmethod
def role_id_exists(role_id):
return HelperRoles.role_id_exists(role_id)
2021-09-29 21:46:11 +02:00
2021-09-09 00:01:10 +02:00
@staticmethod
def get_role_with_servers(role_id):
role = HelperRoles.get_role(role_id)
2021-09-09 00:01:10 +02:00
if role:
server_ids = PermissionsServers.get_server_ids_from_role(role_id)
role["servers"] = server_ids
# logger.debug("role: ({}) {}".format(role_id, role))
2021-09-09 00:01:10 +02:00
return role
else:
# logger.debug("role: ({}) {}".format(role_id, {}))
return {}