import asyncio
import logging
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional
import json
import os
import httpx
import pytz
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from icalendar import Calendar, Event
from jinja2 import Template
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global variables
CALENDAR_URL = "https://outlook.live.com/owa/calendar/ef9138c2-c803-4689-a53e-fe7d0cb90124/d12c4ed3-dfa2-461f-bcd8-9442bea1903b/cid-CD3289D19EBD3DA4/calendar.ics"
CACHE_FILE = Path("calendar_cache.json")
calendar_data = []
last_fetch_time = None
scheduler = AsyncIOScheduler()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifespan events."""
# Startup
logger.info("Starting up...")
# Fetch calendar immediately
await fetch_calendar()
# Schedule daily updates at 2 AM
scheduler.add_job(
fetch_calendar,
'cron',
hour=2,
minute=0,
id='daily_calendar_fetch'
)
# Also fetch every 4 hours for more frequent updates
scheduler.add_job(
fetch_calendar,
'interval',
hours=4,
id='periodic_calendar_fetch'
)
scheduler.start()
logger.info("Scheduler started")
yield
# Shutdown
logger.info("Shutting down...")
scheduler.shutdown()
logger.info("Scheduler stopped")
# Initialize FastAPI app with lifespan
app = FastAPI(title="Calendar Viewer", version="1.0.0", lifespan=lifespan)
# Mount static files directory if it exists
static_dir = Path("static")
if not static_dir.exists():
static_dir.mkdir(exist_ok=True)
logger.info("Created static directory")
# Ensure logo file exists in static directory
logo_source = Path("Vektor-Logo.svg")
logo_dest = static_dir / "logo.svg"
if logo_source.exists() and not logo_dest.exists():
import shutil
shutil.copy2(logo_source, logo_dest)
logger.info("Copied logo to static directory")
app.mount("/static", StaticFiles(directory="static"), name="static")
# HTML template for the calendar view
HTML_TEMPLATE = """
Turmli Bar - Calendar Events
{% if error %}
{% elif not events %}
No Upcoming Events
There are no events scheduled for the next 30 days.
{% else %}
{% for date, day_events in events_by_date.items() %}
{% for event in day_events %}
{% if event.all_day %}
All Day
{% else %}
{{ event.time }}
{% endif %}
{{ event.title }}
{% if event.location %}
📍 {{ event.location }}
{% endif %}
{% endfor %}
{% endfor %}
{% endif %}
"""
async def fetch_calendar():
"""Fetch and parse the ICS calendar file."""
global calendar_data, last_fetch_time
try:
logger.info(f"Fetching calendar from {CALENDAR_URL}")
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(CALENDAR_URL)
response.raise_for_status()
# Parse the ICS file
cal = Calendar.from_ical(response.content)
events = []
# Get current time and timezone
tz = pytz.timezone('Europe/Berlin') # Adjust timezone as needed
now = datetime.now(tz)
cutoff = now + timedelta(days=30) # Show events for next 30 days
for component in cal.walk():
if component.name == "VEVENT":
event_data = {}
# Get event title
event_data['title'] = str(component.get('SUMMARY', 'Untitled Event'))
# Get event location
location = component.get('LOCATION')
event_data['location'] = str(location) if location else None
# Get event time
dtstart = component.get('DTSTART')
dtend = component.get('DTEND')
if dtstart:
# Handle both datetime and date objects
if hasattr(dtstart.dt, 'date'):
# It's a datetime
start_dt = dtstart.dt
if not start_dt.tzinfo:
start_dt = tz.localize(start_dt)
event_data['start'] = start_dt
event_data['all_day'] = False
else:
# It's a date (all-day event)
event_data['start'] = tz.localize(datetime.combine(dtstart.dt, datetime.min.time()))
event_data['all_day'] = True
# Only include future events within the cutoff
if event_data['start'] >= now and event_data['start'] <= cutoff:
if dtend:
if hasattr(dtend.dt, 'date'):
end_dt = dtend.dt
if not end_dt.tzinfo:
end_dt = tz.localize(end_dt)
event_data['end'] = end_dt
else:
event_data['end'] = tz.localize(datetime.combine(dtend.dt, datetime.min.time()))
events.append(event_data)
# Sort events by start time
events.sort(key=lambda x: x['start'])
calendar_data = events
last_fetch_time = datetime.now()
# Cache the data
cache_data = {
'events': [
{
'title': e['title'],
'location': e['location'],
'start': e['start'].isoformat(),
'end': e.get('end').isoformat() if e.get('end') else None,
'all_day': e.get('all_day', False)
}
for e in events
],
'last_fetch': last_fetch_time.isoformat()
}
with open(CACHE_FILE, 'w') as f:
json.dump(cache_data, f)
logger.info(f"Successfully fetched {len(events)} events")
except Exception as e:
logger.error(f"Error fetching calendar: {e}")
# Try to load from cache
if CACHE_FILE.exists():
try:
with open(CACHE_FILE, 'r') as f:
cache_data = json.load(f)
tz = pytz.timezone('Europe/Berlin')
calendar_data = []
for e in cache_data['events']:
event = {
'title': e['title'],
'location': e['location'],
'start': datetime.fromisoformat(e['start']),
'all_day': e.get('all_day', False)
}
if e.get('end'):
event['end'] = datetime.fromisoformat(e['end'])
calendar_data.append(event)
last_fetch_time = datetime.fromisoformat(cache_data['last_fetch'])
logger.info("Loaded events from cache")
except Exception as cache_error:
logger.error(f"Error loading cache: {cache_error}")
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
"""Display the calendar events."""
try:
# Group events by date
events_by_date = {}
for event in calendar_data:
date_key = event['start'].strftime('%A, %B %d, %Y')
if date_key not in events_by_date:
events_by_date[date_key] = []
# Format event for display
display_event = {
'title': event['title'],
'location': event['location'],
'all_day': event.get('all_day', False)
}
if not event.get('all_day'):
if event.get('end'):
display_event['time'] = f"{event['start'].strftime('%I:%M %p')} - {event['end'].strftime('%I:%M %p')}"
else:
display_event['time'] = event['start'].strftime('%I:%M %p')
events_by_date[date_key].append(display_event)
template = Template(HTML_TEMPLATE)
html_content = template.render(
events=calendar_data,
events_by_date=events_by_date,
last_updated=last_fetch_time.strftime('%B %d, %Y at %I:%M %p') if last_fetch_time else None,
error=None
)
return HTMLResponse(content=html_content)
except Exception as e:
logger.error(f"Error rendering page: {e}")
template = Template(HTML_TEMPLATE)
html_content = template.render(
events=[],
events_by_date={},
last_updated=None,
error=str(e)
)
return HTMLResponse(content=html_content)
@app.get("/api/events")
async def get_events():
"""API endpoint to get calendar events as JSON."""
return {
"events": [
{
"title": e['title'],
"location": e['location'],
"start": e['start'].isoformat(),
"end": e.get('end').isoformat() if e.get('end') else None,
"all_day": e.get('all_day', False)
}
for e in calendar_data
],
"last_updated": last_fetch_time.isoformat() if last_fetch_time else None
}
@app.post("/api/refresh")
async def refresh_calendar():
"""Manually refresh the calendar data."""
try:
# Store the previous count for comparison
previous_count = len(calendar_data)
# Fetch the latest calendar data
await fetch_calendar()
# Get the new count
new_count = len(calendar_data)
# Determine if data changed
data_changed = new_count != previous_count
return {
"status": "success",
"message": "Calendar refreshed successfully",
"events_count": new_count,
"previous_count": previous_count,
"data_changed": data_changed,
"last_updated": last_fetch_time.isoformat() if last_fetch_time else None
}
except Exception as e:
logger.error(f"Error during manual refresh: {e}")
# Try to return cached data info if available
return {
"status": "error",
"message": f"Failed to refresh calendar: {str(e)}",
"events_count": len(calendar_data),
"cached_data_available": len(calendar_data) > 0,
"last_successful_update": last_fetch_time.isoformat() if last_fetch_time else None
}
@app.get("/logo")
async def get_logo():
"""Serve the logo SVG file with proper content type."""
from fastapi.responses import FileResponse
logo_path = Path("static/logo.svg")
if not logo_path.exists():
logo_path = Path("Vektor-Logo.svg")
if logo_path.exists():
return FileResponse(logo_path, media_type="image/svg+xml")
return {"error": "Logo not found"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)