331 lines
13 KiB
Python
Executable File
331 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import logging
|
|
import yaml
|
|
import re
|
|
from typing import Dict, List, Optional, Tuple
|
|
from pathlib import Path
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
from logbull import LogBullLogger
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class Config:
|
|
def __init__(self, config_path: str = "config.yaml"):
|
|
self.config_path = config_path
|
|
self.config = self._load_config()
|
|
|
|
def _load_config(self) -> Dict:
|
|
try:
|
|
with open(self.config_path, 'r') as f:
|
|
config = yaml.safe_load(f)
|
|
return config
|
|
except FileNotFoundError:
|
|
logger.error(f"Config file {self.config_path} not found")
|
|
raise
|
|
except yaml.YAMLError as e:
|
|
logger.error(f"Error parsing config file: {e}")
|
|
raise
|
|
|
|
def get_logbull_config(self) -> Dict:
|
|
return self.config.get('logbull', {})
|
|
|
|
def get_log_files(self) -> List[str]:
|
|
return self.config.get('log_files', [])
|
|
|
|
def get_service_config(self) -> Dict:
|
|
return self.config.get('service', {})
|
|
|
|
class LogParser:
|
|
"""Parse nginx and naxsi log formats"""
|
|
|
|
NGINX_ACCESS_REGEX = re.compile(
|
|
r'(?P<ip>\S+) - - \[(?P<timestamp>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" (?P<status>\d+) (?P<size>\d+) "(?P<referer>[^"]*)" "(?P<user_agent>[^"]*)"'
|
|
)
|
|
|
|
NGINX_ERROR_REGEX = re.compile(
|
|
r'(?P<timestamp>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?P<level>\w+)\] (?P<pid>\d+)#\d+: \*(?P<connection_id>\d+)(?: .*)?, client: (?P<ip>\S+), server: (?P<server>\S+), request: "(?P<request>[^"]*)", (?:upstream: "(?P<upstream>[^"]*)", )?host: "(?P<host>\S+)"'
|
|
)
|
|
|
|
NAXSI_REGEX = re.compile(
|
|
r'\{(?P<naxsi_data>.*?)\}, client: (?P<ip>\S+), server: (?P<server>\S+), request: "(?P<request>[^"]*)", host: "(?P<host>\S+)"'
|
|
)
|
|
|
|
@classmethod
|
|
def parse_access_log(cls, line: str) -> Optional[Dict]:
|
|
match = cls.NGINX_ACCESS_REGEX.match(line)
|
|
if not match:
|
|
return None
|
|
|
|
return {
|
|
'type': 'access',
|
|
'ip': match.group('ip'),
|
|
'timestamp': match.group('timestamp'),
|
|
'method': match.group('method'),
|
|
'path': match.group('path'),
|
|
'protocol': match.group('protocol'),
|
|
'status': match.group('status'),
|
|
'size': match.group('size'),
|
|
'referer': match.group('referer'),
|
|
'user_agent': match.group('user_agent')
|
|
}
|
|
|
|
@classmethod
|
|
def parse_error_log(cls, line: str) -> Optional[Dict]:
|
|
# First try naxsi format
|
|
naxsi_match = cls.NAXSI_REGEX.search(line)
|
|
if naxsi_match:
|
|
naxsi_data = naxsi_match.group('naxsi_data')
|
|
return {
|
|
'type': 'error',
|
|
'subtype': 'naxsi',
|
|
'ip': naxsi_match.group('ip'),
|
|
'server': naxsi_match.group('server'),
|
|
'request': naxsi_match.group('request'),
|
|
'host': naxsi_match.group('host'),
|
|
'naxsi_data': naxsi_data
|
|
}
|
|
|
|
# Try regular nginx error format
|
|
error_match = cls.NGINX_ERROR_REGEX.match(line)
|
|
if error_match:
|
|
return {
|
|
'type': 'error',
|
|
'timestamp': error_match.group('timestamp'),
|
|
'level': error_match.group('level'),
|
|
'ip': error_match.group('ip'),
|
|
'server': error_match.group('server'),
|
|
'request': error_match.group('request'),
|
|
'host': error_match.group('host'),
|
|
'upstream': error_match.group('upstream')
|
|
}
|
|
|
|
return None
|
|
|
|
@classmethod
|
|
def parse_log_line(cls, line: str) -> Optional[Dict]:
|
|
"""Try to parse any type of log line"""
|
|
line = line.strip()
|
|
if not line:
|
|
return None
|
|
|
|
# Try access log format first
|
|
parsed = cls.parse_access_log(line)
|
|
if parsed:
|
|
return parsed
|
|
|
|
# Try error log format
|
|
parsed = cls.parse_error_log(line)
|
|
if parsed:
|
|
return parsed
|
|
|
|
return None
|
|
|
|
class LogMonitor:
|
|
def __init__(self, config: Config):
|
|
self.config = config
|
|
self.log_files = config.get_log_files()
|
|
self.file_handlers = {}
|
|
self.file_positions = {}
|
|
|
|
# Initialize LogBull logger
|
|
logbull_config = config.get_logbull_config()
|
|
logger.info(f"LogBull config: host={logbull_config.get('host', 'http://localhost:4005')}, project_id={logbull_config.get('project_id')}")
|
|
|
|
self.logbull_logger = LogBullLogger(
|
|
host=logbull_config.get('host', 'http://localhost:4005'),
|
|
project_id=logbull_config.get('project_id')
|
|
)
|
|
logger.info("LogBull logger initialized successfully")
|
|
|
|
self.flush_interval = logbull_config.get('flush_interval', 5)
|
|
self.last_flush = time.time()
|
|
|
|
# Service settings
|
|
service_config = config.get_service_config()
|
|
self.poll_interval = service_config.get('poll_interval', 1)
|
|
self.max_lines_per_batch = service_config.get('max_lines_per_batch', 100)
|
|
|
|
logger.info(f"Service config: poll_interval={self.poll_interval}s, max_lines_per_batch={self.max_lines_per_batch}")
|
|
logger.info(f"Monitoring {len(self.log_files)} log files: {self.log_files}")
|
|
|
|
# Initialize file positions
|
|
for log_file in self.log_files:
|
|
self.file_positions[log_file] = 0
|
|
if os.path.exists(log_file):
|
|
logger.info(f"Found log file: {log_file}")
|
|
else:
|
|
logger.warning(f"Log file does not exist yet: {log_file}")
|
|
|
|
def _read_new_lines(self, file_path: str) -> List[str]:
|
|
"""Read new lines from file since last position"""
|
|
try:
|
|
with open(file_path, 'r') as f:
|
|
current_position = self.file_positions[file_path]
|
|
f.seek(current_position)
|
|
lines = f.readlines()
|
|
new_position = f.tell()
|
|
self.file_positions[file_path] = new_position
|
|
|
|
if lines:
|
|
logger.debug(f"Read {len(lines)} new lines from {file_path} (position {current_position} -> {new_position})")
|
|
|
|
return lines
|
|
except (FileNotFoundError, IOError) as e:
|
|
logger.warning(f"Error reading {file_path}: {e}")
|
|
return []
|
|
|
|
def process_logs(self):
|
|
"""Process all log files and send to LogBull"""
|
|
total_processed = 0
|
|
total_sent = 0
|
|
|
|
for log_file in self.log_files:
|
|
if not os.path.exists(log_file):
|
|
logger.debug(f"Log file not found: {log_file}")
|
|
continue
|
|
|
|
lines = self._read_new_lines(log_file)
|
|
if not lines:
|
|
continue
|
|
|
|
logger.info(f"Processing {len(lines)} new lines from {log_file}")
|
|
|
|
parsed_count = 0
|
|
unparsed_count = 0
|
|
|
|
for line in lines[:self.max_lines_per_batch]: # Limit batch size
|
|
total_processed += 1
|
|
parsed_log = LogParser.parse_log_line(line)
|
|
if parsed_log:
|
|
parsed_count += 1
|
|
logger.debug(f"Parsed log line: type={parsed_log.get('type')}, subtype={parsed_log.get('subtype', 'N/A')}")
|
|
self._send_to_logbull(parsed_log, log_file)
|
|
total_sent += 1
|
|
else:
|
|
unparsed_count += 1
|
|
logger.debug(f"Failed to parse line: {line.strip()[:100]}")
|
|
|
|
if parsed_count > 0:
|
|
logger.info(f"From {log_file}: parsed {parsed_count}, unparsed {unparsed_count}")
|
|
|
|
# Periodically flush
|
|
if time.time() - self.last_flush > self.flush_interval:
|
|
logger.info(f"Flushing logs to LogBull (total sent since last flush: {total_sent})")
|
|
self.logbull_logger.flush()
|
|
self.last_flush = time.time()
|
|
logger.info("Flush completed")
|
|
|
|
def _send_to_logbull(self, log_data: Dict, source_file: str):
|
|
"""Send parsed log data to LogBull"""
|
|
try:
|
|
# Add source file information
|
|
log_data['source_file'] = source_file
|
|
|
|
# Log the full data being sent
|
|
logger.debug(f"Preparing to send log to LogBull:")
|
|
logger.debug(f" - Type: {log_data.get('type')}")
|
|
logger.debug(f" - Source: {source_file}")
|
|
logger.debug(f" - Full data: {log_data}")
|
|
|
|
# Send based on log type
|
|
if log_data['type'] == 'access':
|
|
logger.debug(f"Sending ACCESS log to LogBull: {log_data.get('method')} {log_data.get('path')} - {log_data.get('status')}")
|
|
self.logbull_logger.info("NGINX Access Log", fields=log_data)
|
|
elif log_data['type'] == 'error':
|
|
if log_data.get('subtype') == 'naxsi':
|
|
logger.debug(f"Sending NAXSI log to LogBull: {log_data.get('request')}")
|
|
self.logbull_logger.warning("NAXSI Block", fields=log_data)
|
|
else:
|
|
logger.debug(f"Sending ERROR log to LogBull: {log_data.get('request')}")
|
|
self.logbull_logger.error("NGINX Error Log", fields=log_data)
|
|
|
|
logger.debug(f"LogBull message queued successfully (type={log_data['type']})")
|
|
|
|
# Check queue size
|
|
if hasattr(self.logbull_logger, 'sender') and self.logbull_logger.sender:
|
|
queue_size = self.logbull_logger.sender._log_queue.qsize()
|
|
logger.debug(f"LogBull queue size: {queue_size}")
|
|
except Exception as e:
|
|
logger.error(f"Error sending to LogBull: {e}", exc_info=True)
|
|
|
|
def run(self):
|
|
"""Main monitoring loop"""
|
|
logger.info("Starting log monitoring service")
|
|
logger.info(f"LogBull endpoint: {self.config.get_logbull_config().get('host', 'http://localhost:4005')}")
|
|
logger.info(f"Project ID: {self.config.get_logbull_config().get('project_id')}")
|
|
|
|
iteration = 0
|
|
try:
|
|
while True:
|
|
iteration += 1
|
|
logger.debug(f"Processing iteration {iteration}")
|
|
self.process_logs()
|
|
time.sleep(self.poll_interval)
|
|
except KeyboardInterrupt:
|
|
logger.info("Shutting down gracefully...")
|
|
logger.info("Performing final flush...")
|
|
self.logbull_logger.flush()
|
|
logger.info("Service stopped")
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in monitoring loop: {e}", exc_info=True)
|
|
raise
|
|
|
|
class LogFileHandler(FileSystemEventHandler):
|
|
"""Handle file system events for log files"""
|
|
|
|
def __init__(self, monitor: LogMonitor):
|
|
self.monitor = monitor
|
|
|
|
def on_modified(self, event):
|
|
if not event.is_directory and event.src_path in self.monitor.log_files:
|
|
logger.debug(f"Detected change in {event.src_path}")
|
|
# Process immediately when file changes
|
|
self.monitor.process_logs()
|
|
|
|
def main():
|
|
try:
|
|
# Parse command line arguments
|
|
config_path = "config.yaml"
|
|
if len(sys.argv) > 1:
|
|
if sys.argv[1] == '--config' and len(sys.argv) > 2:
|
|
config_path = sys.argv[2]
|
|
elif sys.argv[1].endswith('.yaml') or sys.argv[1].endswith('.yml'):
|
|
config_path = sys.argv[1]
|
|
|
|
logger.info(f"Loading configuration from: {config_path}")
|
|
|
|
# Load configuration
|
|
config = Config(config_path)
|
|
|
|
# Set log level from config
|
|
service_config = config.get_service_config()
|
|
log_level = service_config.get('log_level', 'INFO').upper()
|
|
logger.setLevel(getattr(logging, log_level, logging.INFO))
|
|
logger.info(f"Log level set to: {log_level}")
|
|
|
|
logger.info("Configuration loaded successfully")
|
|
logger.info(f"LogBull host: {config.get_logbull_config().get('host')}")
|
|
logger.info(f"LogBull project_id: {config.get_logbull_config().get('project_id')}")
|
|
logger.info(f"Log files to monitor: {config.get_log_files()}")
|
|
|
|
# Create and run monitor
|
|
logger.info("Initializing log monitor...")
|
|
monitor = LogMonitor(config)
|
|
monitor.run()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fatal error: {e}", exc_info=True)
|
|
raise
|
|
|
|
if __name__ == "__main__":
|
|
main() |