目 录CONTENT

文章目录

ELK多路径日志采集

JamKing
2025-11-21 / 0 评论 / 0 点赞 / 11 阅读 / 0 字 / 正在检测是否收录...

1、elk+filebeat+kafka业务场景

1.1 filebeat配置

filebeat.inputs:
- type: filestream
  id: backend-logs                       #filestream需要唯一id
  enabled: true                          #启用filestream
  paths: 
    - /data/logs/*/*.log                 #使用通配符采集logs下所有子路径的日志
  processors:
    #子目录路径核心配置
    - dissect:                           #定义路径的模板
        tokenizer: "/data/logs/%{[service_name]}/%{[file_name]}"   #解析logs子目录的服务名
        # 解析采集日志路径
        field: "[log][file][path]"       #Filebeat 自动生成的日志文件路径
        target_prefix: ""                #将解析出的字段添加到事件的根级别
    - add_fields:
        target: ''
        fields:
          logs_source: backend           #新增值为backend的logs_source字段,区别推送不同topic

- type: filestream
  id: nginx-logs                         #filestream需要唯一id
  enabled: true
  paths:
    - /data/nginx/logs/*.log             #nginx采集日志路径
  processors:
    - add_fields:
        target: ''
        fields:
          logs_source: nginx             #新增值为backend的logs_source字段,区别推送不同topic

output.kafka:
  hosts: ["localhost:9092"]              #kafka实际地址
  topic: '%{[logs_source]}-logs-topic'   #logs_source变量topic

1.2 kafka配置

#因logs_source有两个变量值,分别是nginx和backend,因此创建nginx-logs-topic和backend-logs-topic两个topic

./bin/kafka-topics.sh --create --topic nginx-logs-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

./bin/kafka-topics.sh --create --topic backend-logs-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

1.3 logstash配置

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    ##多队列用,分隔,topic名用""引起来
    topics => ["nginx-logs-topic","backend-logs-topic"]
    ####十分关键的字段,从kafka消费的数据json化,否则会读取一些字段失败,如logs_source等
    codec => json 
  }
}
filter {
    #判断Logs_source字段,用于不同的grok表达式,这里是nginx
    if [logs_source] == "nginx" {
        grok {
            match => { "message" => '%{GREEDYDATA}' }
       }
    }
    #判断Logs_source字段,用于不同的grok表达式,这里是backend
    else if [logs_source] == "backend" {
        grok {
            match => {
                "message" => '%{TIMESTAMP_ISO8601:log_date}\s*\|\s*(?<thread>.*?)\s*\|\s*%{LOGLEVEL:log_level}\s*\|\s*(?<test_id>.*?)\s*\|\s*(?<logger>.*?)\s*\|\s*(?<log_type>.*?)\s*\|\s*(?<host_ip>.*?)\s*\|\s*(?<remote_ip>.*?)\s*\|\s*(?<system>.*?)\s*\|\s*(?<service>.*?)\s*\|\s*(?<request_url>.*?)\s*\|\s*(?<trace_id>.*?)\s*\|\|\s*(?<id>.*?)\s*\|\s*(?<phone_number>.*?)\s*\|\s*(?<device_type>.*?)\s*\|\s*(?<request_status>.*?)\s*\|\s*(?<duration>.*?)\s*\|\s*(?<span_id>.*?)\s*\|\s*%{GREEDYDATA:msg}'
            }
        }
    }
    #移除不必要的字段
    mutate {
        remove_field => ["host"]
    }
}

output {
  elasticsearch {
    hosts => ["http://172.16.10.132:9200"]
    #logs_source变量,会根据消费kafka的字段生成对应的索引,如nginx-logs-2025.11.22和backend-logs-2025.11.22
    index => "%{logs_source}-logs-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "xxxxxx"
  }
}

1.4 重启filebeat和logstash

/data/filebeat/filebeat -e &
/data/logstash/bin/logstash -f /data/logstash/config/logstash.conf

1.5 检查生成不同日志索引

索引

image-sYbp.png

索引模式

参考本文创建索引模式

1.6 预览配置后的日志

正常预览不同服务、路径日志

image-FgXF.png

image-QTTS.png

image-NRZC.png

2、传统elk业务场景

2.1 logstash配置

input {
  #采集nginx日志
  file {
    path => "/data/nginx/logs/*.log"
    start_position => "beginning"
    #建议开启sincedb,默认的sincedb路径可能删除索引无法恢复索引,手动配置可解决这个问题,也保证了日志采集的连续性和不重不漏,这里配置.nginx
    sincedb_path=> "/data/logstash/data/plugins/inputs/file/.nginx"
    add_field => { 
      #新增logs_source字段,用于生成nginx对应的索引
      "logs_source" => "nginx" 
    }
  }

  #采集backend日志
  file {
    path => "/data/logs/*/*.log"
    start_position => "beginning"
    #建议开启sincedb,默认的sincedb路径可能删除索引无法恢复索引,手动配置可解决这个问题,也保证了日志采集的连续性和不重不漏,这里配置.backend
    sincedb_path=> "/data/logstash/data/plugins/inputs/file/.backend"
    add_field => {
      #新增logs_source字段,用于生成backend对应的索引
      "logs_source" => "backend"
   }
 }
}

filter {
  #判断Nginx日志来源
  if [logs_source] == "nginx" {
    grok {
         #贪婪匹配原则,仅演示,按日志需求配置正则
         match => { "message" => '%{GREEDYDATA}' }
       }
    }
  #判断后端日志来源
  else if [logs_source] == "backend" {
    #多后端业务日志核心配置,用于识别不同子路径的服务名和文件名
    dissect {
      mapping => {
          #匹配好路径,映射到logstash的默认字段path
          "path" => "/data/logs/%{service_name}/%{file_name}"
        }
      }
    grok {
         #贪婪匹配原则,仅演示,按日志需求配置正则
         match => { "message" => '%{GREEDYDATA}' }
    }
  }
}

#输出数据到es
output {
  elasticsearch {
    hosts => ["http://172.16.10.132:9200"]
    #logs_source变量,会根据消费kafka的字段生成对应的索引,如nginx-logs-2025.11.22和backend-logs-2025.11.22
    index => "%{logs_source}-logs-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "xxxxxx"
  }
}

2.2 重启logstash

/data/logstash/bin/logstash -f /data/logstash/config/logstash.conf

2.3 检查索引

image-wGhF.png

2.4 预览配置后的日志

因仅演示,未作grok表达式,字段较集中

image-QPSC.png

image-zxTi.png

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区