from enum import Enum import logging from peewee import ( ForeignKeyField, CharField, CompositeKey, JOIN, ) from app.classes.models.base_model import BaseModel from app.classes.models.servers import Servers from app.classes.models.roles import Roles from app.classes.models.users import UserRoles, HelperUsers, ApiKeys, Users from app.classes.shared.permission_helper import PermissionHelper logger = logging.getLogger(__name__) # ********************************************************************************** # Role Servers Class # ********************************************************************************** class RoleServers(BaseModel): role_id = ForeignKeyField(Roles, backref="role_server") server_id = ForeignKeyField(Servers, backref="role_server") permissions = CharField(default="00000000") class Meta: table_name = "role_servers" primary_key = CompositeKey("role_id", "server_id") # ********************************************************************************** # Servers Permissions Class # ********************************************************************************** class EnumPermissionsServer(Enum): COMMANDS = 0 TERMINAL = 1 LOGS = 2 SCHEDULE = 3 BACKUP = 4 FILES = 5 CONFIG = 6 PLAYERS = 7 class PermissionsServers: @staticmethod def get_or_create(role_id, server, permissions_mask): return RoleServers.get_or_create( role_id=role_id, server_id=server, permissions=permissions_mask ) @staticmethod def get_permissions_list(): permissions_list = [] for member in EnumPermissionsServer.__members__.items(): permissions_list.append(member[1]) return permissions_list @staticmethod def get_permissions(permissions_mask): permissions_list = [] for member in EnumPermissionsServer.__members__.items(): if PermissionsServers.has_permission(permissions_mask, member[1]): permissions_list.append(member[1]) return permissions_list @staticmethod def has_permission(permission_mask, permission_tested: EnumPermissionsServer): return permission_mask[permission_tested.value] == "1" @staticmethod def set_permission( permission_mask, permission_tested: EnumPermissionsServer, value ): list_perms = list(permission_mask) list_perms[permission_tested.value] = str(value) permission_mask = "".join(list_perms) return permission_mask @staticmethod def get_permission(permission_mask, permission_tested: EnumPermissionsServer): return permission_mask[permission_tested.value] @staticmethod def get_token_permissions(permissions_mask, api_permissions_mask): permissions_list = [] for member in EnumPermissionsServer.__members__.items(): if PermissionHelper.both_have_perm( permissions_mask, api_permissions_mask, member[1] ): permissions_list.append(member[1]) return permissions_list # ********************************************************************************** # Role_Servers Methods # ********************************************************************************** @staticmethod def get_role_servers_from_role_id(roleid): return RoleServers.select().where(RoleServers.role_id == roleid) @staticmethod def get_servers_from_role(role_id): return ( RoleServers.select() .join(Servers, JOIN.INNER) .where(RoleServers.role_id == role_id) ) @staticmethod def get_roles_from_server(server_id): return ( RoleServers.select() .join(Roles, JOIN.INNER) .where(RoleServers.server_id == server_id) ) @staticmethod def add_role_server(server_id, role_id, rs_permissions="00000000"): servers = RoleServers.insert( { RoleServers.server_id: server_id, RoleServers.role_id: role_id, RoleServers.permissions: rs_permissions, } ).execute() return servers @staticmethod def get_permissions_mask(role_id, server_id): permissions_mask = "" role_server = ( RoleServers.select() .where(RoleServers.role_id == role_id) .where(RoleServers.server_id == server_id) .get() ) permissions_mask = role_server.permissions return permissions_mask @staticmethod def get_server_roles(server_id): role_list = [] roles = RoleServers.select().where(RoleServers.server_id == server_id).execute() for role in roles: role_list.append(role.role_id) return role_list @staticmethod def get_role_permissions_list(role_id): permissions_mask = "00000000" role_server = RoleServers.get_or_none(RoleServers.role_id == role_id) if role_server is not None: permissions_mask = role_server.permissions permissions_list = PermissionsServers.get_permissions(permissions_mask) return permissions_list @staticmethod def update_role_permission(role_id, server_id, permissions_mask): role_server = ( RoleServers.select() .where(RoleServers.role_id == role_id) .where(RoleServers.server_id == server_id) .get() ) role_server.permissions = permissions_mask RoleServers.save(role_server) @staticmethod def delete_roles_permissions(role_id, removed_servers=None): if removed_servers is None: removed_servers = {} return ( RoleServers.delete() .where(RoleServers.role_id == role_id) .where(RoleServers.server_id.in_(removed_servers)) .execute() ) @staticmethod def remove_roles_of_server(server_id): return RoleServers.delete().where(RoleServers.server_id == server_id).execute() @staticmethod def get_user_id_permissions_mask(user_id, server_id: str): user = HelperUsers.get_user_model(user_id) return PermissionsServers.get_user_permissions_mask(user, server_id) @staticmethod def get_user_permissions_mask(user: Users, server_id: str): if user.superuser: permissions_mask = "1" * len(PermissionsServers.get_permissions_list()) else: roles_list = HelperUsers.get_user_roles_id(user.user_id) role_server = ( RoleServers.select() .where(RoleServers.role_id.in_(roles_list)) .where(RoleServers.server_id == server_id) .execute() ) try: permissions_mask = role_server[0].permissions except IndexError: permissions_mask = "0" * len(PermissionsServers.get_permissions_list()) return permissions_mask @staticmethod def get_server_user_list(server_id): final_users = [] server_roles = RoleServers.select().where(RoleServers.server_id == server_id) super_users = Users.select().where( Users.superuser == True # pylint: disable=singleton-comparison ) for role in server_roles: users = UserRoles.select().where(UserRoles.role_id == role.role_id) for user in users: if user.user_id.user_id not in final_users: final_users.append(user.user_id.user_id) for suser in super_users: if suser.user_id not in final_users: final_users.append(suser.user_id) return final_users @staticmethod def get_user_id_permissions_list(user_id, server_id: str): user = HelperUsers.get_user_model(user_id) return PermissionsServers.get_user_permissions_list(user, server_id) @staticmethod def get_user_permissions_list(user: Users, server_id: str): if user.superuser: permissions_list = PermissionsServers.get_permissions_list() else: permissions_mask = PermissionsServers.get_user_permissions_mask( user, server_id ) permissions_list = PermissionsServers.get_permissions(permissions_mask) return permissions_list @staticmethod def get_api_key_id_permissions_list(key_id, server_id: str): key = ApiKeys.get(ApiKeys.token_id == key_id) return PermissionsServers.get_api_key_permissions_list(key, server_id) @staticmethod def get_api_key_permissions_list(key: ApiKeys, server_id: str): user = HelperUsers.get_user(key.user_id) if user["superuser"] and key.superuser: return PermissionsServers.get_permissions_list() else: roles_list = HelperUsers.get_user_roles_id(user["user_id"]) role_server = ( RoleServers.select() .where(RoleServers.role_id.in_(roles_list)) .where(RoleServers.server_id == server_id) .execute() ) try: user_permissions_mask = role_server[0].permissions except: if user["superuser"]: user_permissions_mask = "11111111" else: user_permissions_mask = "00000000" key_permissions_mask = key.server_permissions permissions_mask = PermissionHelper.combine_masks( user_permissions_mask, key_permissions_mask ) permissions_list = PermissionsServers.get_permissions(permissions_mask) return permissions_list