LogLineFollower: Seguir las líneas de un archivo de registro creciente
Lea y procese las líneas de un archivo de registro creciente y ejecute las acciones que desee.
Estaba buscando una manera de procesar líneas de un archivo de registro mientras crecía. Encontré algunos fragmentos y paquetes en internet pero no exactamente lo que quería. Por eso decidí escribir el mío propio.
Después de haber codificado una primera versión, busqué de nuevo y encontré varios paquetes más. Pero, mirando la descripción, el código y los problemas, decidí quedarme con mi propio código. No puede ser tan difícil crear algo así ... Pero como siempre, hay más de lo que se piensa inicialmente.
A continuación, voy a entrar en algunos más details y terminar con el código que muestra la primera versión de LogLineFollower. Oh, lo siento, sólo Linux , ni idea de si funciona con Windows o macOS.
String vs bytes
Algunos paquetes abren el archivo de registro en modo normal, mientras que yo decidí abrir el archivo de registro en modo binario. ¿Por qué decidí hacer esto? La razón es que en el pasado me enfrenté varias veces con archivos de registro que contenían caracteres inesperados, una palabra mejor es ilegal, que pueden hacer que los procesadores de archivos de registro se bloqueen. Sí, incluso herramientas como grep pueden fallar. En un caso, syslog se corrompió varias veces, cuando un proceso que escribía en syslog se estrelló.
Utilizar bytes significa utilizar 'read(chunk_size)' en lugar de 'readline()'. Y en lugar de utilizar '\n' como separador de líneas, ahora utilizamos b'\n'.
La idea de usar bytes es evitar que este proceso se cuelgue, para manejar los errores (de decodificación) de alguna manera y tratar de continuar la operación normal, todo sin nuestra interacción.
Detección de cambios en los archivos
Los archivos de registro pueden cambiar, normalmente por la rotación de los archivos de registro. El archivo actual es renombrado y se utiliza un nuevo archivo. Esto se puede detectar en Linux comprobando el inode. Si el inode del archivo de registro cambia, entonces el archivo ha cambiado.
Pero hay otras formas en las que los archivos de registro pueden rotar. Una es copiando el contenido del archivo de registro a otro archivo y luego truncatando el archivo de registro. O copiando un nuevo archivo vacío en el archivo de registro actual. En estos casos, el inode no cambia y lo único que podemos hacer es comprobar si el archivo de registro se ha hecho más pequeño en lugar de más grande.
Para empeorar las cosas, el nombre del archivo de registro puede haber cambiado después de la rotación. Por ejemplo de 'access.log' a 'access.log.1'.En este caso necesitamos alguna lógica inteligente para encontrar el nuevo archivo de registro.
Poder continuar donde lo dejamos
No vivimos en un mundo perfecto y nuestro procesador de archivos de registro puede fallar. O bien, queremos detenerlo, hacer algo de mantenimiento y ejecutarlo de nuevo pero saltándonos las líneas que ya hemos procesado.
Esto significa que debemos recordar cuál fue la última línea que procesamos. Al almacenar el desplazamiento actual del archivo de registro en un archivo separado después de procesar una línea, creamos una forma de continuar después de una interrupción: En un reinicio recuperamos el desplazamiento guardado y lo utilizamos para establecer la siguiente posición en el archivo de registro.
Una forma diferente de continuar después de una interrupción puede ser guardar una copia de la última línea procesada, y luego usar esta copia al recorrer el archivo.
El código: LogLineFollower
La primera versión del código de abajo tiene las siguientes limitaciones:
- No hay continuación o lo que sea en la rotación del archivo de registro.
- Cuando se interrumpe y se reinicia, (algunas) últimas líneas procesadas pueden aparecer de nuevo o saltarse, ver código.
La(s) última(s) línea(s) puede(n) ser importante(s), pero eso lo decide usted. Esto sucede porque he decidido, por razones de rendimiento, que debemos ser capaces de procesar una lista de líneas en lugar de una sola línea a la vez.
Si las mismas líneas aparecen de nuevo después de un reinicio, entonces la operación en su función 'process_line()' debe ser idempotente,
lo que significa que la misma operación puede ser aplicada múltiples veces sin cambiar el resultado.
Sin cambios, el código siguiente comienza en la primera línea de /var/log/syslog. Cuando llega al final de syslog, espera nuevas líneas.
Para probarlo: Copia-pega y ejecuta. Y cambiar. Y ejecuta.
"""
follow a log file, e.g. syslog, postfix.log
note(s):
- linux only
- in process_lines() you do what you want
- lines assumed to be ending with '\n'
- we only decode bytes after (!) we have a complete line
- file change (rotate) is detected on inode change
"""
import logging
import os
import sys
import time
# console logger
def get_logger():
log_level = logging.DEBUG
#log_level = logging.INFO
log_format = logging.Formatter('%(asctime)s [%(filename)-30s%(funcName)20s():%(lineno)03s] %(levelname)-8.8s %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(log_level)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(log_level)
console_handler.setFormatter(log_format)
logger.addHandler(console_handler)
return logger
class DummyLogger:
def __getattr__(self, name):
return lambda *x: None
class LogLineFollower:
"""
wait for new lines in the file to follow and calls process_line() or your callback cb_line() where you can do your work.
arguments:
logger (optional) the logger.
follow_file the file to follow, e.g. /var/log/syslog.
cb_process_line (optional) callback. your line processing function.
cb_process_lines (optional) callback. your multiple lines processing function.
offset_file (optional) file holding the information to continue on a restart.
the offset file is always present. remove this file to make a fresh start.
if not specified its path is follow_file with the extension replaced by '.offset'.
offset_type (optional) where to start in follow_file, can be one of:
- 'begin': start at beginning of file
- 'continue': start where we left off using data from offset_file
- 'end': (default) start at end of file
important: if offset_file is present, its values will be used to continue
regardless of the value of offset_type! see also offset_file.
chunk_size (optional) maximum number of bytes we are trying to read at a time from follow_file.
default: 4096
followed_lines_file (optional) receives all lines read.
can be used for testing and/or checking.
"""
def __init__(
self,
logger=None,
follow_file=None,
cb_process_line=None,
cb_process_lines=None,
offset_file=None,
offset_type=None,
chunk_size=None,
followed_lines_file=None,
):
self.logger = logger or DummyLogger()
self.follow_file = follow_file
self.cb_process_line = cb_process_line
self.cb_process_lines = cb_process_lines
self.offset_file = offset_file or os.path.splitext(self.follow_file)[0] + '.offset'
self.offset_type = offset_type or 'end'
self.chunk_size = chunk_size or 4096
self.followed_lines_file = followed_lines_file
def append_lines_to_followed_lines_file(self, blines):
if self.followed_lines_file is None:
return
#self.logger.debug('appending blines = {}'.format(blines))
# decode to utf-8 before appending
lines = []
for bline in blines:
# 'try-except' this if decode has errors
lines.append(bline.decode('utf-8'))
with open(self.followed_lines_file, 'a') as fo:
fo.write('\n'.join(lines) + '\n')
def process_lines(self, blines):
# decode to utf-8 before printing
lines = []
for bline in blines:
# 'try-except' this if decode has errors
line = bline.decode('utf-8')
lines.append(line)
self.process_line(line)
if self.cb_process_lines is not None:
self.cb_process_lines(lines)
def process_line(self, line):
if self.cb_process_line is not None:
self.cb_process_line(line)
else:
self.logger.info('line = {}'.format(line))
def write_offset_file(self, pos, incomplete_line_chunk):
# write a comma-separated bytes string: b'inode,pos,incomplete_line_chunk'
self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(self.inode, pos, incomplete_line_chunk))
bs = self.inode.to_bytes(8, 'big') + b','
bs += pos.to_bytes(8, 'big') + b','
bs += incomplete_line_chunk or b''
self.logger.debug('bs = {}'.format(bs))
with open(self.offset_file, 'wb') as fo:
fo.write(bs)
def read_offset_file(self):
bs = None
try:
with open(self.offset_file, 'rb') as fo:
bs = fo.read()
if bs is not None:
bs_parts = bs.split(b',', 2)
bs_parts_len = len(bs_parts)
if bs_parts_len == 3:
inode = int.from_bytes(bs_parts[0], 'big')
pos = int.from_bytes(bs_parts[1], 'big')
incomplete_line_chunk = bs_parts[2]
if incomplete_line_chunk == b'':
incomplete_line_chunk = None
self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(inode, pos, incomplete_line_chunk))
return inode, pos, incomplete_line_chunk
except Exception as e:
pass
return None, None, None
def set_file(self):
try:
inode = os.stat(self.follow_file).st_ino
self.logger.debug('inode = {}'.format(inode))
self.inode = inode
return True
except Exception as e:
self.logger.debug('exception = {}, e.args = {}'.format(type(e).__name, e.args))
return False
def file_changed(self):
try:
inode = os.stat(self.follow_file).st_ino
self.logger.debug('inode = {}'.format(inode))
if inode != self.inode:
return True
return False
except Exception as e:
self.logger.debug('get_inode: exception = {}, e.args = {}'.format(type(e).__name, e.args))
return True
def run(self):
incomplete_line_chunk = None
wait_count = 0
self.set_file()
with open(self.follow_file, 'rb') as fo:
# continue where we left off, remove offset_file to start fresh
start_pos_set = False
inode, pos, incomplete_line_chunk = self.read_offset_file()
if inode is not None:
# check saved inode against current inode
# ...
# seek to pos in offset_file
fo.seek(pos, 0)
start_pos_set = True
if not start_pos_set:
if self.offset_type == 'end':
# start at end
fo.seek(0, 2)
else:
# start at beginning
fo.seek(0, 0)
while True:
if self.file_changed():
self.logger.debug('file not present or changed')
break
# read new chunk
chunk = fo.read(self.chunk_size)
chunk_size_read = len(chunk)
self.logger.debug('chunk_size_read = {}'.format(chunk_size_read))
if chunk_size_read == 0:
self.logger.debug('waiting for new data ... {}'.format(wait_count))
time.sleep(1)
wait_count += 1
continue
wait_count = 0
# prepend incomplete_line_chunk if any
if incomplete_line_chunk is not None:
chunk = incomplete_line_chunk + chunk
incomplete_line_chunk = None
# split chunk on new lines
blines = chunk.split(b'\n')
# get completed lines, and incomplete_line
if blines[-1] == b'':
# last line is empty means last line -1 is complete
del blines[-1]
else:
# last line is not empty means last line is not complete
incomplete_line_chunk = blines[-1]
del blines[-1]
# do something with complete lines (bytes)
if len(blines) > 0:
self.append_lines_to_followed_lines_file(blines)
self.process_lines(blines)
# show incomplete_line_chunk
if incomplete_line_chunk is not None:
self.logger.debug('incomplete_line_chunk = {}'.format(incomplete_line_chunk))
# here we write the file offset pos AFTER 'do something with complete lines'
# if 'do something with complete lines' is interrupted,
# (some of) these lines will be read (and processed) AGAIN the next time.
#
# the alternative is to write the offset file BEFORE 'do something with complete lines'
# if 'do something with complete lines' is interrupted,
# (some of) these lines will be SKIPPED the next time
#
fo.seek(0, 1)
pos = fo.tell()
self.logger.debug('pos = {}'.format(pos))
self.write_offset_file(pos, incomplete_line_chunk)
#self.read_offset_file()
def process_line(line):
print('processing line = {}'.format(line))
if __name__ == '__main__':
f = '/var/log/syslog'
llf = LogLineFollower(
logger=get_logger(),
follow_file=f,
offset_file='file_follower.offset',
cb_process_line=process_line,
offset_type='begin',
#offset_type='end',
chunk_size=4096,
followed_lines_file='followed_lines.log',
)
llf.run()
Resumen
Como en muchos casos, empiezas con unas pocas líneas de código, y estás en el camino. Pronto buscarás la manera de probar tu código. Por eso he añadido el archivo opcional 'líneas_seguidas'. Puedes comparar los datos de este archivo con los datos del 'archivo_seguido'.
¿He ahorrado tiempo escribiendo esto yo mismo, o debería haber utilizado uno de los paquetes de Pypi.org ? Desgraciadamente, la mayoría de los paquetes de Pypi.org que tienen una funcionalidad comparable, no cumplen algunos de mis requisitos, tienen problemas o no están realmente mantenidos. Pero fue instructivo estudiarlos. Supongo que no tenía muchas opciones en este caso.
Enlaces / créditos
logtailtool
https://github.com/minyong-jeong/logtailtool
py-logwatcher
https://github.com/tonytan4ever/py-logwatcher
pygtail
https://github.com/bgreenlee/pygtail
tail-uwsgi-log
https://pypi.org/project/tail-uwsgi-log
Leer más
Log file
Recientes
- Cómo ocultar las claves primarias de la base de datos UUID de su aplicación web
- Don't Repeat Yourself (DRY) con Jinja2
- SQLAlchemy, PostgreSQL, número máximo de filas por user
- Mostrar los valores en filtros dinámicos SQLAlchemy
- Transferencia de datos segura con cifrado de Public Key y pyNaCl
- rqlite: una alternativa de alta disponibilidad y dist distribuida SQLite
Más vistos
- Usando Python's pyOpenSSL para verificar los certificados SSL descargados de un host
- Usando UUIDs en lugar de Integer Autoincrement Primary Keys con SQLAlchemy y MariaDb
- Conectarse a un servicio en un host Docker desde un contenedor Docker
- Usando PyInstaller y Cython para crear un ejecutable de Python
- SQLAlchemy: Uso de Cascade Deletes para eliminar objetos relacionados
- Flask RESTful API validación de parámetros de solicitud con esquemas Marshmallow