Files
proxy-to-logbull/main.py
2026-03-03 21:32:21 +01:00

320 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import sys
import time
import logging
import yaml
import re
from typing import Dict, List, Optional, Tuple
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from logbull import LogBullLogger
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class Config:
def __init__(self, config_path: str = "config.yaml"):
self.config_path = config_path
self.config = self._load_config()
def _load_config(self) -> Dict:
try:
with open(self.config_path, 'r') as f:
config = yaml.safe_load(f)
return config
except FileNotFoundError:
logger.error(f"Config file {self.config_path} not found")
raise
except yaml.YAMLError as e:
logger.error(f"Error parsing config file: {e}")
raise
def get_logbull_config(self) -> Dict:
return self.config.get('logbull', {})
def get_log_files(self) -> List[str]:
return self.config.get('log_files', [])
def get_service_config(self) -> Dict:
return self.config.get('service', {})
class LogParser:
"""Parse nginx and naxsi log formats"""
NGINX_ACCESS_REGEX = re.compile(
r'(?P<ip>\S+) - - \[(?P<timestamp>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" (?P<status>\d+) (?P<size>\d+) "(?P<referer>[^"]*)" "(?P<user_agent>[^"]*)"'
)
NGINX_ERROR_REGEX = re.compile(
r'(?P<timestamp>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?P<level>\w+)\] (?P<pid>\d+)#\d+: \*(?P<connection_id>\d+)(?: .*)?, client: (?P<ip>\S+), server: (?P<server>\S+), request: "(?P<request>[^"]*)", (?:upstream: "(?P<upstream>[^"]*)", )?host: "(?P<host>\S+)"'
)
NAXSI_REGEX = re.compile(
r'\{(?P<naxsi_data>.*?)\}, client: (?P<ip>\S+), server: (?P<server>\S+), request: "(?P<request>[^"]*)", host: "(?P<host>\S+)"'
)
@classmethod
def parse_access_log(cls, line: str) -> Optional[Dict]:
match = cls.NGINX_ACCESS_REGEX.match(line)
if not match:
return None
return {
'type': 'access',
'ip': match.group('ip'),
'timestamp': match.group('timestamp'),
'method': match.group('method'),
'path': match.group('path'),
'protocol': match.group('protocol'),
'status': match.group('status'),
'size': match.group('size'),
'referer': match.group('referer'),
'user_agent': match.group('user_agent')
}
@classmethod
def parse_error_log(cls, line: str) -> Optional[Dict]:
# First try naxsi format
naxsi_match = cls.NAXSI_REGEX.search(line)
if naxsi_match:
naxsi_data = naxsi_match.group('naxsi_data')
return {
'type': 'error',
'subtype': 'naxsi',
'ip': naxsi_match.group('ip'),
'server': naxsi_match.group('server'),
'request': naxsi_match.group('request'),
'host': naxsi_match.group('host'),
'naxsi_data': naxsi_data
}
# Try regular nginx error format
error_match = cls.NGINX_ERROR_REGEX.match(line)
if error_match:
return {
'type': 'error',
'timestamp': error_match.group('timestamp'),
'level': error_match.group('level'),
'ip': error_match.group('ip'),
'server': error_match.group('server'),
'request': error_match.group('request'),
'host': error_match.group('host'),
'upstream': error_match.group('upstream')
}
return None
@classmethod
def parse_log_line(cls, line: str) -> Optional[Dict]:
"""Try to parse any type of log line"""
line = line.strip()
if not line:
return None
# Try access log format first
parsed = cls.parse_access_log(line)
if parsed:
return parsed
# Try error log format
parsed = cls.parse_error_log(line)
if parsed:
return parsed
return None
class LogMonitor:
def __init__(self, config: Config):
self.config = config
self.log_files = config.get_log_files()
self.file_handlers = {}
self.file_positions = {}
# Initialize LogBull logger
logbull_config = config.get_logbull_config()
logger.info(f"LogBull config: host={logbull_config.get('host', 'http://localhost:4005')}, project_id={logbull_config.get('project_id')}")
self.logbull_logger = LogBullLogger(
host=logbull_config.get('host', 'http://localhost:4005'),
project_id=logbull_config.get('project_id')
)
logger.info("LogBull logger initialized successfully")
self.flush_interval = logbull_config.get('flush_interval', 5)
self.last_flush = time.time()
# Service settings
service_config = config.get_service_config()
self.poll_interval = service_config.get('poll_interval', 1)
self.max_lines_per_batch = service_config.get('max_lines_per_batch', 100)
logger.info(f"Service config: poll_interval={self.poll_interval}s, max_lines_per_batch={self.max_lines_per_batch}")
logger.info(f"Monitoring {len(self.log_files)} log files: {self.log_files}")
# Initialize file positions
for log_file in self.log_files:
self.file_positions[log_file] = 0
if os.path.exists(log_file):
logger.info(f"Found log file: {log_file}")
else:
logger.warning(f"Log file does not exist yet: {log_file}")
def _read_new_lines(self, file_path: str) -> List[str]:
"""Read new lines from file since last position"""
try:
with open(file_path, 'r') as f:
current_position = self.file_positions[file_path]
f.seek(current_position)
lines = f.readlines()
new_position = f.tell()
self.file_positions[file_path] = new_position
if lines:
logger.debug(f"Read {len(lines)} new lines from {file_path} (position {current_position} -> {new_position})")
return lines
except (FileNotFoundError, IOError) as e:
logger.warning(f"Error reading {file_path}: {e}")
return []
def process_logs(self):
"""Process all log files and send to LogBull"""
total_processed = 0
total_sent = 0
for log_file in self.log_files:
if not os.path.exists(log_file):
logger.debug(f"Log file not found: {log_file}")
continue
lines = self._read_new_lines(log_file)
if not lines:
continue
logger.info(f"Processing {len(lines)} new lines from {log_file}")
parsed_count = 0
unparsed_count = 0
for line in lines[:self.max_lines_per_batch]: # Limit batch size
total_processed += 1
parsed_log = LogParser.parse_log_line(line)
if parsed_log:
parsed_count += 1
logger.debug(f"Parsed log line: type={parsed_log.get('type')}, subtype={parsed_log.get('subtype', 'N/A')}")
self._send_to_logbull(parsed_log, log_file)
total_sent += 1
else:
unparsed_count += 1
logger.debug(f"Failed to parse line: {line.strip()[:100]}")
if parsed_count > 0:
logger.info(f"From {log_file}: parsed {parsed_count}, unparsed {unparsed_count}")
# Periodically flush
if time.time() - self.last_flush > self.flush_interval:
logger.info(f"Flushing logs to LogBull (total sent since last flush: {total_sent})")
self.logbull_logger.flush()
self.last_flush = time.time()
logger.info("Flush completed")
def _send_to_logbull(self, log_data: Dict, source_file: str):
"""Send parsed log data to LogBull"""
try:
# Add source file information
log_data['source_file'] = source_file
# Send based on log type
if log_data['type'] == 'access':
logger.debug(f"Sending ACCESS log to LogBull: {log_data.get('method')} {log_data.get('path')} - {log_data.get('status')}")
self.logbull_logger.info("NGINX Access Log", fields=log_data)
elif log_data['type'] == 'error':
if log_data.get('subtype') == 'naxsi':
logger.debug(f"Sending NAXSI log to LogBull: {log_data.get('request')}")
self.logbull_logger.warning("NAXSI Block", fields=log_data)
else:
logger.debug(f"Sending ERROR log to LogBull: {log_data.get('request')}")
self.logbull_logger.error("NGINX Error Log", fields=log_data)
logger.debug(f"LogBull message queued successfully (type={log_data['type']})")
except Exception as e:
logger.error(f"Error sending to LogBull: {e}", exc_info=True)
def run(self):
"""Main monitoring loop"""
logger.info("Starting log monitoring service")
logger.info(f"LogBull endpoint: {self.config.get_logbull_config().get('host', 'http://localhost:4005')}")
logger.info(f"Project ID: {self.config.get_logbull_config().get('project_id')}")
iteration = 0
try:
while True:
iteration += 1
logger.debug(f"Processing iteration {iteration}")
self.process_logs()
time.sleep(self.poll_interval)
except KeyboardInterrupt:
logger.info("Shutting down gracefully...")
logger.info("Performing final flush...")
self.logbull_logger.flush()
logger.info("Service stopped")
except Exception as e:
logger.error(f"Unexpected error in monitoring loop: {e}", exc_info=True)
raise
class LogFileHandler(FileSystemEventHandler):
"""Handle file system events for log files"""
def __init__(self, monitor: LogMonitor):
self.monitor = monitor
def on_modified(self, event):
if not event.is_directory and event.src_path in self.monitor.log_files:
logger.debug(f"Detected change in {event.src_path}")
# Process immediately when file changes
self.monitor.process_logs()
def main():
try:
# Parse command line arguments
config_path = "config.yaml"
if len(sys.argv) > 1:
if sys.argv[1] == '--config' and len(sys.argv) > 2:
config_path = sys.argv[2]
elif sys.argv[1].endswith('.yaml') or sys.argv[1].endswith('.yml'):
config_path = sys.argv[1]
logger.info(f"Loading configuration from: {config_path}")
# Load configuration
config = Config(config_path)
# Set log level from config
service_config = config.get_service_config()
log_level = service_config.get('log_level', 'INFO').upper()
logger.setLevel(getattr(logging, log_level, logging.INFO))
logger.info(f"Log level set to: {log_level}")
logger.info("Configuration loaded successfully")
logger.info(f"LogBull host: {config.get_logbull_config().get('host')}")
logger.info(f"LogBull project_id: {config.get_logbull_config().get('project_id')}")
logger.info(f"Log files to monitor: {config.get_log_files()}")
# Create and run monitor
logger.info("Initializing log monitor...")
monitor = LogMonitor(config)
monitor.run()
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
raise
if __name__ == "__main__":
main()