EKS作为字符串记录到CloudWatch流 [英] EKS logs to CloudWatch stream as string

查看:220
本文介绍了EKS作为字符串记录到CloudWatch流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到了这个问题,我有一个EKS集群,该集群将日志发送到Cloudwatch,然后Firehose将日志流式传输到s3存储桶.

I'm having this issue, I have an EKS cluster which sends logs to Cloudwatch, Then Firehose stream the logs to s3 bucket.

我的目标是从s3获取这些日志,并将它们批量转发给elasticsearch. 我写了一个python lambda函数,当日志是jsons时,它可以完美地工作. 我的问题是某些日志是字符串或类" JSON.

My goal is to get these logs from s3 and forward them to elasticsearch in bulks. I wrote a python lambda function and its working perfectly when logs are jsons. My problem is some logs are strings or "kind of" JSON.

例如:

kube-authenticator:

kube-authenticator :

time="2019-09-13T09:30:50Z" level=error msg="Watch channel closed."

kube-apiserver:

kube-apiserver :

E0912 10:19:10.649757 1 watcher.go:208] watch chan error: etcdserver: mvcc: required revision has been compacted

我想知道是否应该包装这些消息并将其转换为JSON,或者是否有任何方法可以将日志格式更改为JSON.我考虑过编写正则表达式,但是我对正则表达式没有足够的了解.

I'm wondering if I should try to wrap these messages and convert them to JSON or there is any way to change the log format to JSON.I thought about writing regex but i don't have enough knowledge with regex .

推荐答案

如注释中所述,最终编写了2个函数来处理日志并将其转换为JSON.

As mentioned in comments, Ended up writing 2 functions that handle the logs and convert them to JSON.

第一个句柄kube-apiserver,kube-controller-manager and kube-scheduler记录组:

def convert_text_logs_to_json_and_add_logGroup(message,logGroup):
    month_and_day = message.split(' ')[0][1:]
    month_and_day = insert_dash(month_and_day,2)
    log_time_regex = r"\s+((?:\d{2})?:\d{1,2}:\d{1,2}.\d{1,})"
    log_time = re.findall(log_time_regex, message)[0]
    currentYear = datetime.now().year
    full_log_datetime = "%s-%sT%sZ" %(currentYear,month_and_day,log_time)
    log_contnet = (re.split(log_time_regex,message)[2])
    message = '{"timestamp": "%s", "message":"%s","logGroup" :"%s"}' %(full_log_datetime,log_contnet.replace('"',''),logGroup)
    return message

第二个功能处理authenticator日志组:

the second function handles authenticator log group :

def chunkwise(array, size=2):
    it = iter(array)
    return izip(*[it]*size)

def wrap_text_to_json_and_add_logGroup(message,logGroup):
    regex = r"\".*?\"|\w+"
    matches = re.findall(regex, message)
    key_value_pairs = chunkwise(matches)
    json_message= {}
    for key_value in key_value_pairs:
        key = key_value[0]
        if key == 'time':
            key = 'timestamp'
        value = key_value[1].replace('"','')
        json_message[key] = value
    json_message['logGroup'] = logGroup
    log_to_insert = json.dumps(json_message)
    return log_to_insert

我希望这些功能对那些可能需要将日志从cloudwatch插入elasticsearch的人有用.

I hope these functions are useful for those who might need to insert logs from cloudwatch to elasticsearch.

这篇关于EKS作为字符串记录到CloudWatch流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆