#!/usr/bin/env python3 import os import sys import time import logging import yaml import re from typing import Dict, List, Optional, Tuple from pathlib import Path from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from logbull import LogBullLogger # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class Config: def __init__(self, config_path: str = "config.yaml"): self.config_path = config_path self.config = self._load_config() def _load_config(self) -> Dict: try: with open(self.config_path, 'r') as f: config = yaml.safe_load(f) return config except FileNotFoundError: logger.error(f"Config file {self.config_path} not found") raise except yaml.YAMLError as e: logger.error(f"Error parsing config file: {e}") raise def get_logbull_config(self) -> Dict: return self.config.get('logbull', {}) def get_log_files(self) -> List[str]: return self.config.get('log_files', []) def get_service_config(self) -> Dict: return self.config.get('service', {}) class LogParser: """Parse nginx and naxsi log formats""" NGINX_ACCESS_REGEX = re.compile( r'(?P\S+) - - \[(?P[^\]]+)\] "(?P\S+) (?P\S+) (?P\S+)" (?P\d+) (?P\d+) "(?P[^"]*)" "(?P[^"]*)"' ) NGINX_ERROR_REGEX = re.compile( r'(?P\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?P\w+)\] (?P\d+)#\d+: \*(?P\d+)(?: .*)?, client: (?P\S+), server: (?P\S+), request: "(?P[^"]*)", (?:upstream: "(?P[^"]*)", )?host: "(?P\S+)"' ) NAXSI_REGEX = re.compile( r'\{(?P.*?)\}, client: (?P\S+), server: (?P\S+), request: "(?P[^"]*)", host: "(?P\S+)"' ) @classmethod def parse_access_log(cls, line: str) -> Optional[Dict]: match = cls.NGINX_ACCESS_REGEX.match(line) if not match: return None return { 'type': 'access', 'ip': match.group('ip'), 'timestamp': match.group('timestamp'), 'method': match.group('method'), 'path': match.group('path'), 'protocol': match.group('protocol'), 'status': match.group('status'), 'size': match.group('size'), 'referer': match.group('referer'), 'user_agent': match.group('user_agent') } @classmethod def parse_error_log(cls, line: str) -> Optional[Dict]: # First try naxsi format naxsi_match = cls.NAXSI_REGEX.search(line) if naxsi_match: naxsi_data = naxsi_match.group('naxsi_data') return { 'type': 'error', 'subtype': 'naxsi', 'ip': naxsi_match.group('ip'), 'server': naxsi_match.group('server'), 'request': naxsi_match.group('request'), 'host': naxsi_match.group('host'), 'naxsi_data': naxsi_data } # Try regular nginx error format error_match = cls.NGINX_ERROR_REGEX.match(line) if error_match: return { 'type': 'error', 'timestamp': error_match.group('timestamp'), 'level': error_match.group('level'), 'ip': error_match.group('ip'), 'server': error_match.group('server'), 'request': error_match.group('request'), 'host': error_match.group('host'), 'upstream': error_match.group('upstream') } return None @classmethod def parse_log_line(cls, line: str) -> Optional[Dict]: """Try to parse any type of log line""" line = line.strip() if not line: return None # Try access log format first parsed = cls.parse_access_log(line) if parsed: return parsed # Try error log format parsed = cls.parse_error_log(line) if parsed: return parsed return None class LogMonitor: def __init__(self, config: Config): self.config = config self.log_files = config.get_log_files() self.file_handlers = {} self.file_positions = {} # Initialize LogBull logger logbull_config = config.get_logbull_config() self.logbull_logger = LogBullLogger( host=logbull_config.get('host', 'http://localhost:4005'), project_id=logbull_config.get('project_id') ) self.flush_interval = logbull_config.get('flush_interval', 5) self.last_flush = time.time() # Service settings service_config = config.get_service_config() self.poll_interval = service_config.get('poll_interval', 1) self.max_lines_per_batch = service_config.get('max_lines_per_batch', 100) # Initialize file positions for log_file in self.log_files: self.file_positions[log_file] = 0 def _read_new_lines(self, file_path: str) -> List[str]: """Read new lines from file since last position""" try: with open(file_path, 'r') as f: f.seek(self.file_positions[file_path]) lines = f.readlines() self.file_positions[file_path] = f.tell() return lines except (FileNotFoundError, IOError) as e: logger.warning(f"Error reading {file_path}: {e}") return [] def process_logs(self): """Process all log files and send to LogBull""" for log_file in self.log_files: if not os.path.exists(log_file): logger.warning(f"Log file not found: {log_file}") continue lines = self._read_new_lines(log_file) if not lines: continue logger.info(f"Processing {len(lines)} new lines from {log_file}") for line in lines[:self.max_lines_per_batch]: # Limit batch size parsed_log = LogParser.parse_log_line(line) if parsed_log: self._send_to_logbull(parsed_log, log_file) # Periodically flush if time.time() - self.last_flush > self.flush_interval: self.logbull_logger.flush() self.last_flush = time.time() def _send_to_logbull(self, log_data: Dict, source_file: str): """Send parsed log data to LogBull""" try: # Add source file information log_data['source_file'] = source_file # Send based on log type if log_data['type'] == 'access': self.logbull_logger.info("NGINX Access Log", fields=log_data) elif log_data['type'] == 'error': if log_data.get('subtype') == 'naxsi': self.logbull_logger.warning("NAXSI Block", fields=log_data) else: self.logbull_logger.error("NGINX Error Log", fields=log_data) except Exception as e: logger.error(f"Error sending to LogBull: {e}") def run(self): """Main monitoring loop""" logger.info("Starting log monitoring service") try: while True: self.process_logs() time.sleep(self.poll_interval) except KeyboardInterrupt: logger.info("Shutting down gracefully...") self.logbull_logger.flush() logger.info("Service stopped") class LogFileHandler(FileSystemEventHandler): """Handle file system events for log files""" def __init__(self, monitor: LogMonitor): self.monitor = monitor def on_modified(self, event): if not event.is_directory and event.src_path in self.monitor.log_files: logger.debug(f"Detected change in {event.src_path}") # Process immediately when file changes self.monitor.process_logs() def main(): try: # Parse command line arguments config_path = "config.yaml" if len(sys.argv) > 1: if sys.argv[1] == '--config' and len(sys.argv) > 2: config_path = sys.argv[2] elif sys.argv[1].endswith('.yaml') or sys.argv[1].endswith('.yml'): config_path = sys.argv[1] # Load configuration config = Config(config_path) # Set log level from config service_config = config.get_service_config() log_level = service_config.get('log_level', 'INFO').upper() logger.setLevel(getattr(logging, log_level, logging.INFO)) # Create and run monitor monitor = LogMonitor(config) monitor.run() except Exception as e: logger.error(f"Fatal error: {e}") raise if __name__ == "__main__": main()