LogLineFollower: Следите за строками растущего файла журнала
Считывайте и обрабатывайте строки из растущего файла журнала и выполняйте необходимые действия.
Я искал способ обработки строк лог-файла в процессе его роста. В интернете я нашел несколько фрагментов и пакетов, но не совсем то, что мне было нужно. Поэтому я решил написать свой собственный.
После того как я написал первую версию, я поискал снова и нашел еще несколько пакетов. Но, посмотрев на описание, код и проблемы, я решил остановиться на собственном коде. Не может быть так сложно создать что-то подобное... Но, как всегда, в этом деле есть нечто большее, чем кажется на первый взгляд.
Ниже я рассмотрю еще несколько detail и закончу кодом, показывающим первую версию LogLineFollower. О, извините, только Linux , не знаю, работает ли это на Windows или macOS.
Строка против bytes
Некоторые пакеты открывают файл журнала в обычном режиме, в то время как я решил открыть файл журнала в двоичном режиме. Почему я решил поступить именно так? Причина в том, что в прошлом я несколько раз сталкивался с лог-файлами, содержащими неожиданные, лучше сказать, недопустимые символы, которые могут привести к сбою процессоров лог-файлов. Да, даже такие инструменты, как grep, могут дать сбой. В одном случае syslog был поврежден несколько раз, когда процесс, записывающий в syslog , потерпел крах.
Использование bytes означает использование 'read(chunk_size)' вместо 'readline()'. И вместо использования '\n' в качестве разделителя строк мы теперь используем b'\n'.
Вся идея использования bytes заключается в том, чтобы предотвратить падение этого процесса, обработать ошибки (декодирования) определенным образом и попытаться продолжить нормальную работу, все без нашего вмешательства.
Обнаружение изменений файлов
Файлы журналов могут изменяться, как правило, в результате ротации файлов журналов. Текущий файл переименовывается и используется новый файл. Это можно обнаружить на Linux , проверив inode. Если inode файла журнала изменяется, значит, файл изменился.
Но есть и другие способы, с помощью которых файлы журналов могут вращаться. Один из них - это копирование содержимого файла журнала в другой файл, а затем truncatирование файла журнала. Или путем копирования нового пустого файла в текущий файл журнала. В этих случаях inode не меняется, и единственное, что мы можем сделать, это проверить, стал ли файл журнала меньше, а не больше.
Что еще хуже, имя файла журнала могло измениться после поворота. Например, с 'access.log' на 'access.log.1'. В этом случае нам нужна умная логика, чтобы найти новый файл журнала.
Возможность продолжить работу с того места, где остановились
Мы живем не в идеальном мире, и наш процессор журнальных файлов может сломаться. Или же мы хотим остановить его, провести некоторое обслуживание и запустить снова, но пропустить строки, которые мы уже обработали.
Это означает, что мы должны помнить, какая последняя строка была обработана. Сохраняя текущее смещение файла журнала в отдельном файле после обработки строки, мы создаем возможность продолжить работу после прерывания: При перезапуске мы получаем сохраненное смещение и используем его для установки следующей позиции в файле журнала.
Другим способом продолжения работы после прерывания может быть сохранение копии последней обработанной строки, а затем использование этой копии при проходе по файлу.
Код: LogLineFollower
Первая версия приведенного ниже кода имеет следующие ограничения:
- Нет продолжения или чего-либо еще при вращении файла журнала.
- При прерывании и перезапуске, (некоторые) обработанные последние строки могут появиться снова или быть пропущены, см. код.
Последний элемент(ы) может быть важным, но это вам решать. Это происходит потому, что я решил, что по соображениям производительности мы должны иметь возможность обрабатывать список строк, а не одну строку за раз.
Если после перезапуска снова появятся те же строки, то операция в вашей функции 'process_line()' должна быть идемпотентной,
означает, что одна и та же операция может быть применена несколько раз без изменения результата.
Без изменений приведенный ниже код начинается с первой строки /var/log/syslog. Когда он достигает конца syslog, он ожидает новых строк.
Чтобы попробовать: Скопируйте-вставьте и запустите. И измените. И запустить.
"""
follow a log file, e.g. syslog, postfix.log
note(s):
- linux only
- in process_lines() you do what you want
- lines assumed to be ending with '\n'
- we only decode bytes after (!) we have a complete line
- file change (rotate) is detected on inode change
"""
import logging
import os
import sys
import time
# console logger
def get_logger():
log_level = logging.DEBUG
#log_level = logging.INFO
log_format = logging.Formatter('%(asctime)s [%(filename)-30s%(funcName)20s():%(lineno)03s] %(levelname)-8.8s %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(log_level)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(log_level)
console_handler.setFormatter(log_format)
logger.addHandler(console_handler)
return logger
class DummyLogger:
def __getattr__(self, name):
return lambda *x: None
class LogLineFollower:
"""
wait for new lines in the file to follow and calls process_line() or your callback cb_line() where you can do your work.
arguments:
logger (optional) the logger.
follow_file the file to follow, e.g. /var/log/syslog.
cb_process_line (optional) callback. your line processing function.
cb_process_lines (optional) callback. your multiple lines processing function.
offset_file (optional) file holding the information to continue on a restart.
the offset file is always present. remove this file to make a fresh start.
if not specified its path is follow_file with the extension replaced by '.offset'.
offset_type (optional) where to start in follow_file, can be one of:
- 'begin': start at beginning of file
- 'continue': start where we left off using data from offset_file
- 'end': (default) start at end of file
important: if offset_file is present, its values will be used to continue
regardless of the value of offset_type! see also offset_file.
chunk_size (optional) maximum number of bytes we are trying to read at a time from follow_file.
default: 4096
followed_lines_file (optional) receives all lines read.
can be used for testing and/or checking.
"""
def __init__(
self,
logger=None,
follow_file=None,
cb_process_line=None,
cb_process_lines=None,
offset_file=None,
offset_type=None,
chunk_size=None,
followed_lines_file=None,
):
self.logger = logger or DummyLogger()
self.follow_file = follow_file
self.cb_process_line = cb_process_line
self.cb_process_lines = cb_process_lines
self.offset_file = offset_file or os.path.splitext(self.follow_file)[0] + '.offset'
self.offset_type = offset_type or 'end'
self.chunk_size = chunk_size or 4096
self.followed_lines_file = followed_lines_file
def append_lines_to_followed_lines_file(self, blines):
if self.followed_lines_file is None:
return
#self.logger.debug('appending blines = {}'.format(blines))
# decode to utf-8 before appending
lines = []
for bline in blines:
# 'try-except' this if decode has errors
lines.append(bline.decode('utf-8'))
with open(self.followed_lines_file, 'a') as fo:
fo.write('\n'.join(lines) + '\n')
def process_lines(self, blines):
# decode to utf-8 before printing
lines = []
for bline in blines:
# 'try-except' this if decode has errors
line = bline.decode('utf-8')
lines.append(line)
self.process_line(line)
if self.cb_process_lines is not None:
self.cb_process_lines(lines)
def process_line(self, line):
if self.cb_process_line is not None:
self.cb_process_line(line)
else:
self.logger.info('line = {}'.format(line))
def write_offset_file(self, pos, incomplete_line_chunk):
# write a comma-separated bytes string: b'inode,pos,incomplete_line_chunk'
self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(self.inode, pos, incomplete_line_chunk))
bs = self.inode.to_bytes(8, 'big') + b','
bs += pos.to_bytes(8, 'big') + b','
bs += incomplete_line_chunk or b''
self.logger.debug('bs = {}'.format(bs))
with open(self.offset_file, 'wb') as fo:
fo.write(bs)
def read_offset_file(self):
bs = None
try:
with open(self.offset_file, 'rb') as fo:
bs = fo.read()
if bs is not None:
bs_parts = bs.split(b',', 2)
bs_parts_len = len(bs_parts)
if bs_parts_len == 3:
inode = int.from_bytes(bs_parts[0], 'big')
pos = int.from_bytes(bs_parts[1], 'big')
incomplete_line_chunk = bs_parts[2]
if incomplete_line_chunk == b'':
incomplete_line_chunk = None
self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(inode, pos, incomplete_line_chunk))
return inode, pos, incomplete_line_chunk
except Exception as e:
pass
return None, None, None
def set_file(self):
try:
inode = os.stat(self.follow_file).st_ino
self.logger.debug('inode = {}'.format(inode))
self.inode = inode
return True
except Exception as e:
self.logger.debug('exception = {}, e.args = {}'.format(type(e).__name, e.args))
return False
def file_changed(self):
try:
inode = os.stat(self.follow_file).st_ino
self.logger.debug('inode = {}'.format(inode))
if inode != self.inode:
return True
return False
except Exception as e:
self.logger.debug('get_inode: exception = {}, e.args = {}'.format(type(e).__name, e.args))
return True
def run(self):
incomplete_line_chunk = None
wait_count = 0
self.set_file()
with open(self.follow_file, 'rb') as fo:
# continue where we left off, remove offset_file to start fresh
start_pos_set = False
inode, pos, incomplete_line_chunk = self.read_offset_file()
if inode is not None:
# check saved inode against current inode
# ...
# seek to pos in offset_file
fo.seek(pos, 0)
start_pos_set = True
if not start_pos_set:
if self.offset_type == 'end':
# start at end
fo.seek(0, 2)
else:
# start at beginning
fo.seek(0, 0)
while True:
if self.file_changed():
self.logger.debug('file not present or changed')
break
# read new chunk
chunk = fo.read(self.chunk_size)
chunk_size_read = len(chunk)
self.logger.debug('chunk_size_read = {}'.format(chunk_size_read))
if chunk_size_read == 0:
self.logger.debug('waiting for new data ... {}'.format(wait_count))
time.sleep(1)
wait_count += 1
continue
wait_count = 0
# prepend incomplete_line_chunk if any
if incomplete_line_chunk is not None:
chunk = incomplete_line_chunk + chunk
incomplete_line_chunk = None
# split chunk on new lines
blines = chunk.split(b'\n')
# get completed lines, and incomplete_line
if blines[-1] == b'':
# last line is empty means last line -1 is complete
del blines[-1]
else:
# last line is not empty means last line is not complete
incomplete_line_chunk = blines[-1]
del blines[-1]
# do something with complete lines (bytes)
if len(blines) > 0:
self.append_lines_to_followed_lines_file(blines)
self.process_lines(blines)
# show incomplete_line_chunk
if incomplete_line_chunk is not None:
self.logger.debug('incomplete_line_chunk = {}'.format(incomplete_line_chunk))
# here we write the file offset pos AFTER 'do something with complete lines'
# if 'do something with complete lines' is interrupted,
# (some of) these lines will be read (and processed) AGAIN the next time.
#
# the alternative is to write the offset file BEFORE 'do something with complete lines'
# if 'do something with complete lines' is interrupted,
# (some of) these lines will be SKIPPED the next time
#
fo.seek(0, 1)
pos = fo.tell()
self.logger.debug('pos = {}'.format(pos))
self.write_offset_file(pos, incomplete_line_chunk)
#self.read_offset_file()
def process_line(line):
print('processing line = {}'.format(line))
if __name__ == '__main__':
f = '/var/log/syslog'
llf = LogLineFollower(
logger=get_logger(),
follow_file=f,
offset_file='file_follower.offset',
cb_process_line=process_line,
offset_type='begin',
#offset_type='end',
chunk_size=4096,
followed_lines_file='followed_lines.log',
)
llf.run()
Резюме
Как и во многих других случаях, вы начинаете с нескольких строк кода, и вы в пути. Вскоре вы будете искать способы протестировать свой код. Именно поэтому я добавил дополнительный файл 'followed_lines_file'. Вы можете сравнить данные этого файла с данными 'follow_file'.
Сэкономил ли я время, написав это сам, или мне следовало использовать один из пакетов Pypi.org ? К сожалению, большинство пакетов на Pypi.org , имеющих сопоставимую функциональность, не удовлетворяют некоторым из моих требований, имеют проблемы или не поддерживаются. Но изучать их было познавательно. Думаю, в этом случае у меня не было особого выбора.
Ссылки / кредиты
logtailtool
https://github.com/minyong-jeong/logtailtool
py-logwatcher
https://github.com/tonytan4ever/py-logwatcher
pygtail
https://github.com/bgreenlee/pygtail
tail-uwsgi-log
https://pypi.org/project/tail-uwsgi-log
Подробнее
Log file
Недавний
- Скрытие первичных ключей базы данных UUID вашего веб-приложения
- Don't Repeat Yourself (DRY) с Jinja2
- SQLAlchemy, PostgreSQL, максимальное количество строк для user
- Показать значения в динамических фильтрах SQLAlchemy
- Безопасная передача данных с помощью шифрования Public Key и pyNaCl
- rqlite: альтернатива dist с высокой степенью готовности и SQLite
Большинство просмотренных
- Используя Python pyOpenSSL для проверки SSL-сертификатов, загруженных с хоста
- Использование UUID вместо Integer Autoincrement Primary Keys с SQLAlchemy и MariaDb
- Подключение к службе на хосте Docker из контейнера Docker
- Использование PyInstaller и Cython для создания исполняемого файла Python
- SQLAlchemy: Использование Cascade Deletes для удаления связанных объектов
- Flask Удовлетворительный запрос API проверка параметров запроса с помощью схем Маршмэллоу