discord-jellyseerr/jellyseerr_api.py

322 lines
14 KiB
Python

import aiohttp
import asyncio
import config
import urllib.parse
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Union
logger = logging.getLogger('jellyseerr_api')
class JellyseerrAPI:
def __init__(self, base_url: str, email: str = None, password: str = None):
self.base_url = base_url.rstrip('/')
self.email = email
self.password = password
self.headers = {
'Content-Type': 'application/json'
}
self.session = None
self.cookie_jar = None
self.auth_cookie = None
self.last_login = None
async def __aenter__(self):
if not self.cookie_jar:
self.cookie_jar = aiohttp.CookieJar()
self.session = aiohttp.ClientSession(cookie_jar=self.cookie_jar)
await self.login()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def login(self) -> bool:
"""Log in to Jellyseerr using local authentication"""
# Skip login if we have a valid cookie and it's not expired
cookie_expiry_days = config.AUTH_COOKIE_EXPIRY
if (self.auth_cookie and self.last_login and
datetime.now() < self.last_login + timedelta(days=cookie_expiry_days)):
logger.debug("Using existing auth cookie")
return True
if not self.session:
if not self.cookie_jar:
self.cookie_jar = aiohttp.CookieJar()
self.session = aiohttp.ClientSession(cookie_jar=self.cookie_jar)
try:
login_data = {
'email': self.email,
'password': self.password
}
logger.info(f"Logging in to Jellyseerr with email: {self.email}")
logger.debug(f"Login URL: {self.base_url}/api/v1/auth/local")
# Print request info for debugging
logger.debug(f"Request headers: {self.session.headers}")
logger.debug(f"Login payload (without password): {{'email': '{self.email}'}}")
async with self.session.post(
f"{self.base_url}/api/v1/auth/local",
json=login_data
) as response:
response_text = await response.text()
logger.debug(f"Login response status: {response.status}")
logger.debug(f"Login response headers: {response.headers}")
logger.debug(f"Login response body: {response_text}")
if response.status == 200:
# Login successful, cookies are automatically stored in the cookie jar
self.last_login = datetime.now()
logger.info("Successfully logged in to Jellyseerr")
return True
else:
# Try to parse response as JSON, but handle case where it's not JSON
try:
error_data = json.loads(response_text)
error_message = error_data.get('message', 'Unknown error')
except json.JSONDecodeError:
error_message = f"Non-JSON response (Status {response.status}): {response_text}"
logger.error(f"Login failed: {error_message}")
if response.status == 401:
raise Exception(f"Authentication failed: Invalid email or password")
elif response.status == 403:
raise Exception(f"Authentication failed: Access denied. Account may be disabled or lacks permissions")
elif response.status == 500:
raise Exception(f"Authentication failed: Server error. Check Jellyseerr logs")
else:
raise Exception(f"Authentication failed ({response.status}): {error_message}")
except aiohttp.ClientConnectorError as e:
logger.error(f"Connection error: {str(e)}")
raise Exception(f"Failed to connect to Jellyseerr at {self.base_url}: {str(e)}")
except aiohttp.ClientError as e:
logger.error(f"Client error: {str(e)}")
raise Exception(f"HTTP client error: {str(e)}")
except Exception as e:
logger.error(f"Login error: {str(e)}", exc_info=True)
raise Exception(f"Failed to authenticate with Jellyseerr: {str(e)}")
async def _request(self, method: str, endpoint: str, params: Dict = None, data: Dict = None) -> Any:
# Ensure we're logged in
if not self.last_login:
try:
await self.login()
except Exception as e:
logger.warning(f"Authentication failed, attempting request without authentication: {str(e)}")
# Continue anyway - some endpoints might work without auth
# Create session if it doesn't exist
if self.session is None:
if not self.cookie_jar:
self.cookie_jar = aiohttp.CookieJar()
self.session = aiohttp.ClientSession(cookie_jar=self.cookie_jar)
# Debug info
logger.debug(f"Making request: {method} {self.base_url}/api/v1{endpoint}")
if params:
logger.debug(f"Request params: {params}")
if data:
logger.debug(f"Request data: {data}")
url = f"{self.base_url}/api/v1{endpoint}"
# URL encode parameters
if params:
# Create a new dict with URL-encoded values
encoded_params = {}
for key, value in params.items():
if isinstance(value, str):
encoded_params[key] = urllib.parse.quote(value)
else:
encoded_params[key] = value
params = encoded_params
try:
# Attempt the request
async with self.session.request(method, url, headers=self.headers, params=params, json=data) as response:
if response.status == 204: # No content
return None
try:
response_data = await response.json()
# If unauthorized, try to login again and retry
if response.status == 401:
logger.warning("Received unauthorized response, attempting to re-login")
try:
await self.login()
# Retry the request after re-login
async with self.session.request(method, url, headers=self.headers, params=params, json=data) as retry_response:
logger.debug(f"Retry response status: {retry_response.status}")
if retry_response.status == 204:
return None
try:
retry_data = await retry_response.json()
if not retry_response.ok:
error_message = retry_data.get('message', 'Unknown error')
raise Exception(f"API Error ({retry_response.status}): {error_message}")
return retry_data
except aiohttp.ContentTypeError:
# Handle non-JSON responses
retry_text = await retry_response.text()
logger.error(f"Non-JSON response on retry: {retry_text}")
raise Exception(f"API returned non-JSON response: {retry_text[:100]}...")
except Exception as login_err:
logger.error(f"Re-login failed: {str(login_err)}")
raise Exception(f"Authentication error: {str(login_err)}")
if not response.ok:
error_message = response_data.get('message', 'Unknown error')
raise Exception(f"API Error ({response.status}): {error_message}")
return response_data
except aiohttp.ClientResponseError:
raise Exception(f"API Error ({response.status}): Failed to parse response")
except aiohttp.ClientError as e:
logger.error(f"Connection error: {str(e)}")
raise Exception(f"Connection error: {str(e)}")
# Search endpoints
async def search(self, query: str, page: int = 1, language: str = 'en') -> Dict:
"""Search for movies, TV shows, or people"""
params = {
'query': query,
'page': page,
'language': language
}
return await self._request('GET', '/search', params=params)
# Media information endpoints
async def get_movie_details(self, movie_id: int, language: str = 'en') -> Dict:
"""Get detailed information about a movie"""
params = {'language': language}
return await self._request('GET', f'/movie/{movie_id}', params=params)
async def get_tv_details(self, tv_id: int, language: str = 'en') -> Dict:
"""Get detailed information about a TV show"""
params = {'language': language}
return await self._request('GET', f'/tv/{tv_id}', params=params)
async def get_season_details(self, tv_id: int, season_id: int, language: str = 'en') -> Dict:
"""Get detailed information about a TV season"""
params = {'language': language}
return await self._request('GET', f'/tv/{tv_id}/season/{season_id}', params=params)
# Recommendation endpoints
async def get_movie_recommendations(self, movie_id: int, page: int = 1, language: str = 'en') -> Dict:
"""Get movie recommendations based on a movie"""
params = {
'page': page,
'language': language
}
return await self._request('GET', f'/movie/{movie_id}/recommendations', params=params)
async def get_tv_recommendations(self, tv_id: int, page: int = 1, language: str = 'en') -> Dict:
"""Get TV show recommendations based on a TV show"""
params = {
'page': page,
'language': language
}
return await self._request('GET', f'/tv/{tv_id}/recommendations', params=params)
# Request endpoints
async def get_requests(self, filter_status: str = 'all', page: int = 1, page_size: int = 10) -> Dict:
"""Get all requests with pagination"""
params = {
'filter': filter_status,
'take': page_size,
'skip': (page - 1) * page_size
}
return await self._request('GET', '/request', params=params)
async def create_request(self, media_type: str, media_id: int,
seasons: Union[List[int], str] = None,
is_4k: bool = False) -> Dict:
"""Create a new media request"""
data = {
'mediaType': media_type,
'mediaId': media_id,
'is4k': is_4k
}
if media_type == 'tv' and seasons:
data['seasons'] = seasons
return await self._request('POST', '/request', data=data)
async def get_request(self, request_id: int) -> Dict:
"""Get details of a specific request"""
return await self._request('GET', f'/request/{request_id}')
# Methods for approving and declining requests have been removed
# These actions should be performed in the Jellyseerr web interface
# Discover endpoints
async def discover_movies(self, page: int = 1, language: str = 'en',
genre: Optional[str] = None, sort_by: str = 'popularity.desc') -> Dict:
"""Discover movies with various filters"""
params = {
'page': page,
'language': language,
'sortBy': sort_by
}
if genre:
params['genre'] = genre
return await self._request('GET', '/discover/movies', params=params)
async def discover_tv(self, page: int = 1, language: str = 'en',
genre: Optional[str] = None, sort_by: str = 'popularity.desc') -> Dict:
"""Discover TV shows with various filters"""
params = {
'page': page,
'language': language,
'sortBy': sort_by
}
if genre:
params['genre'] = genre
return await self._request('GET', '/discover/tv', params=params)
async def discover_trending(self, page: int = 1, language: str = 'en') -> Dict:
"""Get trending movies and TV shows"""
params = {
'page': page,
'language': language
}
return await self._request('GET', '/discover/trending', params=params)
# Genre endpoints
async def get_movie_genres(self, language: str = 'en') -> List:
"""Get a list of movie genres"""
params = {'language': language}
return await self._request('GET', '/genres/movie', params=params)
async def get_tv_genres(self, language: str = 'en') -> List:
"""Get a list of TV show genres"""
params = {'language': language}
return await self._request('GET', '/genres/tv', params=params)
async def get_media(self, filter_status: str = 'all', page: int = 1, page_size: int = 10) -> Dict:
"""Get all media with pagination"""
params = {
'filter': filter_status,
'take': page_size,
'skip': (page - 1) * page_size
}
return await self._request('GET', '/media', params=params)
# Create an instance for import (but don't initialize cookie_jar yet)
jellyseerr_api = JellyseerrAPI(config.JELLYSEERR_URL, config.JELLYSEERR_EMAIL, config.JELLYSEERR_PASSWORD)