import asyncio import logging from contextlib import asynccontextmanager from datetime import datetime, timedelta from pathlib import Path from typing import List, Optional import json import os import httpx import pytz from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from icalendar import Calendar, Event from jinja2 import Template # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Global variables CALENDAR_URL = "https://outlook.live.com/owa/calendar/ef9138c2-c803-4689-a53e-fe7d0cb90124/d12c4ed3-dfa2-461f-bcd8-9442bea1903b/cid-CD3289D19EBD3DA4/calendar.ics" CACHE_FILE = Path("calendar_cache.json") calendar_data = [] last_fetch_time = None scheduler = AsyncIOScheduler() @asynccontextmanager async def lifespan(app: FastAPI): """Manage application lifespan events.""" # Startup logger.info("Starting up...") # Fetch calendar immediately await fetch_calendar() # Schedule daily updates at 2 AM scheduler.add_job( fetch_calendar, 'cron', hour=2, minute=0, id='daily_calendar_fetch' ) # Also fetch every 4 hours for more frequent updates scheduler.add_job( fetch_calendar, 'interval', hours=4, id='periodic_calendar_fetch' ) scheduler.start() logger.info("Scheduler started") yield # Shutdown logger.info("Shutting down...") scheduler.shutdown() logger.info("Scheduler stopped") # Initialize FastAPI app with lifespan app = FastAPI(title="Calendar Viewer", version="1.0.0", lifespan=lifespan) # Mount static files directory if it exists static_dir = Path("static") if not static_dir.exists(): static_dir.mkdir(exist_ok=True) logger.info("Created static directory") # Ensure logo file exists in static directory logo_source = Path("Vektor-Logo.svg") logo_dest = static_dir / "logo.svg" if logo_source.exists() and not logo_dest.exists(): import shutil shutil.copy2(logo_source, logo_dest) logger.info("Copied logo to static directory") app.mount("/static", StaticFiles(directory="static"), name="static") # HTML template for the calendar view HTML_TEMPLATE = """ Turmli Bar - Calendar Events

Turmli Bar Calendar

{% if last_updated %} Last updated: {{ last_updated }} {% endif %}
Auto-refresh in 5:00
{% if error %}

Error

{{ error }}

{% elif not events %}

No Upcoming Events

There are no events scheduled for the next 30 days.

{% else %} {% for date, day_events in events_by_date.items() %}
{{ date }}
{% for event in day_events %}
{% if event.all_day %}
All Day
{% else %}
{{ event.time }}
{% endif %}
{{ event.title }}
{% if event.location %}
📍 {{ event.location }}
{% endif %}
{% endfor %}
{% endfor %} {% endif %}
""" async def fetch_calendar(): """Fetch and parse the ICS calendar file.""" global calendar_data, last_fetch_time try: logger.info(f"Fetching calendar from {CALENDAR_URL}") async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get(CALENDAR_URL) response.raise_for_status() # Parse the ICS file cal = Calendar.from_ical(response.content) events = [] # Get current time and timezone tz = pytz.timezone('Europe/Berlin') # Adjust timezone as needed now = datetime.now(tz) cutoff = now + timedelta(days=30) # Show events for next 30 days for component in cal.walk(): if component.name == "VEVENT": event_data = {} # Get event title event_data['title'] = str(component.get('SUMMARY', 'Untitled Event')) # Get event location location = component.get('LOCATION') event_data['location'] = str(location) if location else None # Get event time dtstart = component.get('DTSTART') dtend = component.get('DTEND') if dtstart: # Handle both datetime and date objects if hasattr(dtstart.dt, 'date'): # It's a datetime start_dt = dtstart.dt if not start_dt.tzinfo: start_dt = tz.localize(start_dt) event_data['start'] = start_dt event_data['all_day'] = False else: # It's a date (all-day event) event_data['start'] = tz.localize(datetime.combine(dtstart.dt, datetime.min.time())) event_data['all_day'] = True # Only include future events within the cutoff if event_data['start'] >= now and event_data['start'] <= cutoff: if dtend: if hasattr(dtend.dt, 'date'): end_dt = dtend.dt if not end_dt.tzinfo: end_dt = tz.localize(end_dt) event_data['end'] = end_dt else: event_data['end'] = tz.localize(datetime.combine(dtend.dt, datetime.min.time())) events.append(event_data) # Sort events by start time events.sort(key=lambda x: x['start']) calendar_data = events last_fetch_time = datetime.now() # Cache the data cache_data = { 'events': [ { 'title': e['title'], 'location': e['location'], 'start': e['start'].isoformat(), 'end': e.get('end').isoformat() if e.get('end') else None, 'all_day': e.get('all_day', False) } for e in events ], 'last_fetch': last_fetch_time.isoformat() } with open(CACHE_FILE, 'w') as f: json.dump(cache_data, f) logger.info(f"Successfully fetched {len(events)} events") except Exception as e: logger.error(f"Error fetching calendar: {e}") # Try to load from cache if CACHE_FILE.exists(): try: with open(CACHE_FILE, 'r') as f: cache_data = json.load(f) tz = pytz.timezone('Europe/Berlin') calendar_data = [] for e in cache_data['events']: event = { 'title': e['title'], 'location': e['location'], 'start': datetime.fromisoformat(e['start']), 'all_day': e.get('all_day', False) } if e.get('end'): event['end'] = datetime.fromisoformat(e['end']) calendar_data.append(event) last_fetch_time = datetime.fromisoformat(cache_data['last_fetch']) logger.info("Loaded events from cache") except Exception as cache_error: logger.error(f"Error loading cache: {cache_error}") @app.get("/", response_class=HTMLResponse) async def home(request: Request): """Display the calendar events.""" try: # Group events by date events_by_date = {} for event in calendar_data: date_key = event['start'].strftime('%A, %B %d, %Y') if date_key not in events_by_date: events_by_date[date_key] = [] # Format event for display display_event = { 'title': event['title'], 'location': event['location'], 'all_day': event.get('all_day', False) } if not event.get('all_day'): if event.get('end'): display_event['time'] = f"{event['start'].strftime('%I:%M %p')} - {event['end'].strftime('%I:%M %p')}" else: display_event['time'] = event['start'].strftime('%I:%M %p') events_by_date[date_key].append(display_event) template = Template(HTML_TEMPLATE) html_content = template.render( events=calendar_data, events_by_date=events_by_date, last_updated=last_fetch_time.strftime('%B %d, %Y at %I:%M %p') if last_fetch_time else None, error=None ) return HTMLResponse(content=html_content) except Exception as e: logger.error(f"Error rendering page: {e}") template = Template(HTML_TEMPLATE) html_content = template.render( events=[], events_by_date={}, last_updated=None, error=str(e) ) return HTMLResponse(content=html_content) @app.get("/api/events") async def get_events(): """API endpoint to get calendar events as JSON.""" return { "events": [ { "title": e['title'], "location": e['location'], "start": e['start'].isoformat(), "end": e.get('end').isoformat() if e.get('end') else None, "all_day": e.get('all_day', False) } for e in calendar_data ], "last_updated": last_fetch_time.isoformat() if last_fetch_time else None } @app.post("/api/refresh") async def refresh_calendar(): """Manually refresh the calendar data.""" try: # Store the previous count for comparison previous_count = len(calendar_data) # Fetch the latest calendar data await fetch_calendar() # Get the new count new_count = len(calendar_data) # Determine if data changed data_changed = new_count != previous_count return { "status": "success", "message": "Calendar refreshed successfully", "events_count": new_count, "previous_count": previous_count, "data_changed": data_changed, "last_updated": last_fetch_time.isoformat() if last_fetch_time else None } except Exception as e: logger.error(f"Error during manual refresh: {e}") # Try to return cached data info if available return { "status": "error", "message": f"Failed to refresh calendar: {str(e)}", "events_count": len(calendar_data), "cached_data_available": len(calendar_data) > 0, "last_successful_update": last_fetch_time.isoformat() if last_fetch_time else None } @app.get("/logo") async def get_logo(): """Serve the logo SVG file with proper content type.""" from fastapi.responses import FileResponse logo_path = Path("static/logo.svg") if not logo_path.exists(): logo_path = Path("Vektor-Logo.svg") if logo_path.exists(): return FileResponse(logo_path, media_type="image/svg+xml") return {"error": "Logo not found"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)