This commit is contained in:
2026-03-02 23:21:47 +01:00
parent 589f560137
commit 6cfea15b9e
6 changed files with 537 additions and 7 deletions

254
main.py Normal file → Executable file
View File

@@ -1,6 +1,254 @@
def main():
print("Hello from proxy-to-logbull!")
#!/usr/bin/env python3
import os
import time
import logging
import yaml
import re
from typing import Dict, List, Optional, Tuple
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from logbull import LogBullLogger
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class Config:
def __init__(self, config_path: str = "config.yaml"):
self.config_path = config_path
self.config = self._load_config()
def _load_config(self) -> Dict:
try:
with open(self.config_path, 'r') as f:
config = yaml.safe_load(f)
return config
except FileNotFoundError:
logger.error(f"Config file {self.config_path} not found")
raise
except yaml.YAMLError as e:
logger.error(f"Error parsing config file: {e}")
raise
def get_logbull_config(self) -> Dict:
return self.config.get('logbull', {})
def get_log_files(self) -> List[str]:
return self.config.get('log_files', [])
def get_service_config(self) -> Dict:
return self.config.get('service', {})
class LogParser:
"""Parse nginx and naxsi log formats"""
NGINX_ACCESS_REGEX = re.compile(
r'(?P<ip>\S+) - - \[(?P<timestamp>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" (?P<status>\d+) (?P<size>\d+) "(?P<referer>[^"]*)" "(?P<user_agent>[^"]*)"'
)
NGINX_ERROR_REGEX = re.compile(
r'(?P<timestamp>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?P<level>\w+)\] (?P<pid>\d+)#\d+: \*(?P<connection_id>\d+)(?: .*)?, client: (?P<ip>\S+), server: (?P<server>\S+), request: "(?P<request>[^"]*)", (?:upstream: "(?P<upstream>[^"]*)", )?host: "(?P<host>\S+)"'
)
NAXSI_REGEX = re.compile(
r'\{(?P<naxsi_data>.*?)\}, client: (?P<ip>\S+), server: (?P<server>\S+), request: "(?P<request>[^"]*)", host: "(?P<host>\S+)"'
)
@classmethod
def parse_access_log(cls, line: str) -> Optional[Dict]:
match = cls.NGINX_ACCESS_REGEX.match(line)
if not match:
return None
return {
'type': 'access',
'ip': match.group('ip'),
'timestamp': match.group('timestamp'),
'method': match.group('method'),
'path': match.group('path'),
'protocol': match.group('protocol'),
'status': match.group('status'),
'size': match.group('size'),
'referer': match.group('referer'),
'user_agent': match.group('user_agent')
}
@classmethod
def parse_error_log(cls, line: str) -> Optional[Dict]:
# First try naxsi format
naxsi_match = cls.NAXSI_REGEX.search(line)
if naxsi_match:
naxsi_data = naxsi_match.group('naxsi_data')
return {
'type': 'error',
'subtype': 'naxsi',
'ip': naxsi_match.group('ip'),
'server': naxsi_match.group('server'),
'request': naxsi_match.group('request'),
'host': naxsi_match.group('host'),
'naxsi_data': naxsi_data
}
# Try regular nginx error format
error_match = cls.NGINX_ERROR_REGEX.match(line)
if error_match:
return {
'type': 'error',
'timestamp': error_match.group('timestamp'),
'level': error_match.group('level'),
'ip': error_match.group('ip'),
'server': error_match.group('server'),
'request': error_match.group('request'),
'host': error_match.group('host'),
'upstream': error_match.group('upstream')
}
return None
@classmethod
def parse_log_line(cls, line: str) -> Optional[Dict]:
"""Try to parse any type of log line"""
line = line.strip()
if not line:
return None
# Try access log format first
parsed = cls.parse_access_log(line)
if parsed:
return parsed
# Try error log format
parsed = cls.parse_error_log(line)
if parsed:
return parsed
return None
class LogMonitor:
def __init__(self, config: Config):
self.config = config
self.log_files = config.get_log_files()
self.file_handlers = {}
self.file_positions = {}
# Initialize LogBull logger
logbull_config = config.get_logbull_config()
self.logbull_logger = LogBullLogger(
host=logbull_config.get('host', 'http://localhost:4005'),
project_id=logbull_config.get('project_id')
)
self.flush_interval = logbull_config.get('flush_interval', 5)
self.last_flush = time.time()
# Service settings
service_config = config.get_service_config()
self.poll_interval = service_config.get('poll_interval', 1)
self.max_lines_per_batch = service_config.get('max_lines_per_batch', 100)
# Initialize file positions
for log_file in self.log_files:
self.file_positions[log_file] = 0
def _read_new_lines(self, file_path: str) -> List[str]:
"""Read new lines from file since last position"""
try:
with open(file_path, 'r') as f:
f.seek(self.file_positions[file_path])
lines = f.readlines()
self.file_positions[file_path] = f.tell()
return lines
except (FileNotFoundError, IOError) as e:
logger.warning(f"Error reading {file_path}: {e}")
return []
def process_logs(self):
"""Process all log files and send to LogBull"""
for log_file in self.log_files:
if not os.path.exists(log_file):
logger.warning(f"Log file not found: {log_file}")
continue
lines = self._read_new_lines(log_file)
if not lines:
continue
logger.info(f"Processing {len(lines)} new lines from {log_file}")
for line in lines[:self.max_lines_per_batch]: # Limit batch size
parsed_log = LogParser.parse_log_line(line)
if parsed_log:
self._send_to_logbull(parsed_log, log_file)
# Periodically flush
if time.time() - self.last_flush > self.flush_interval:
self.logbull_logger.flush()
self.last_flush = time.time()
def _send_to_logbull(self, log_data: Dict, source_file: str):
"""Send parsed log data to LogBull"""
try:
# Add source file information
log_data['source_file'] = source_file
# Send based on log type
if log_data['type'] == 'access':
self.logbull_logger.info("NGINX Access Log", fields=log_data)
elif log_data['type'] == 'error':
if log_data.get('subtype') == 'naxsi':
self.logbull_logger.warning("NAXSI Block", fields=log_data)
else:
self.logbull_logger.error("NGINX Error Log", fields=log_data)
except Exception as e:
logger.error(f"Error sending to LogBull: {e}")
def run(self):
"""Main monitoring loop"""
logger.info("Starting log monitoring service")
try:
while True:
self.process_logs()
time.sleep(self.poll_interval)
except KeyboardInterrupt:
logger.info("Shutting down gracefully...")
self.logbull_logger.flush()
logger.info("Service stopped")
class LogFileHandler(FileSystemEventHandler):
"""Handle file system events for log files"""
def __init__(self, monitor: LogMonitor):
self.monitor = monitor
def on_modified(self, event):
if not event.is_directory and event.src_path in self.monitor.log_files:
logger.debug(f"Detected change in {event.src_path}")
# Process immediately when file changes
self.monitor.process_logs()
def main():
try:
# Load configuration
config = Config()
# Set log level from config
service_config = config.get_service_config()
log_level = service_config.get('log_level', 'INFO').upper()
logger.setLevel(getattr(logging, log_level, logging.INFO))
# Create and run monitor
monitor = LogMonitor(config)
monitor.run()
except Exception as e:
logger.error(f"Fatal error: {e}")
raise
if __name__ == "__main__":
main()
main()