从0开始搭建一套完整的ELK分布式日志管理系统

  |   0 评论   |   0 浏览

file

准备工作

虚拟机搭建

参考 基于VirtualBox搭建Linux(CentOS 7)虚拟机环境(学习必备技能)https://blog.lupf.cn/articles/2020/04/04/1586001434581.html

Elasticsearch安装

参考 Elasticsearch 6.6.0 集群搭建https://blog.lupf.cn/articles/2020/04/22/1587535463629.html

kafka安装

参考 并发(七)之分布式异步更新:Zookeeper+Kafka实现数据分布式异步更新 : https://blog.lupf.cn/articles/2020/04/17/1587096190497.html

JDK安装

以上文章中都有包含JDK的安装

ELK架构图

file

ELK流程图

file

  • 日志生产
    服务通过日志框架输出的日志,Nginx产生的日志;也可以是任何形式输出的日志文件。
  • 日志抓取(filebeat)
    通过配置,监控抓取符合规则的日志文件,并将抓取到的每条数据发送给kafka
  • kafka
    主要起到削峰填谷,ELK高可用的关键作用;当流量过大,kafka可以起到很好的缓冲作用,降低下游的压力;当下游的logstash或ES故障,也可以很好保证数据的完整性
  • logstash
    消费kafka的数据,并将数据持久化到ES或者其他持久化框架
  • Elasticsearch
    持久化数据,并提供分布式检索功能
  • Kibana
    展示ES中的数据
  • xpack watch
    监控异常
  • 通知模块
    将监控到的告警通知给相关责任人

项目日志规范

为什么要规范?只有规范之后的日志,在后续的抓取、整理同步至ES以及查看都会带来很多便利 , 请参考SpringBoot日志的链路追踪 : https://lupf.cn/articles/2020/06/04/1591273110824.html ; 建议优先阅读一下这篇文章 , 后续关于日志的拦截及解析都是基于这里的日志规则进行的; 具体格式如下:

// 完整格式
[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%X{requestId}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{localIp}] [%X{clientIp}] [%X{applicationName}] [%X{requestUri}] [%F,%L,%C,%M] [%m] ## '%ex'%n

// 以下是详细说明
// 其中%X打头的都是自定义的日志  需要通过DMC设置
[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}]  //当前的时间
[%X{requestId}]  // 本次请求的唯一ID
[%level{length=5}]  // 日志级别
[%thread-%tid]  // 线程id
[%logger]  // loger对应的class
[%X{hostName}]  // 服务部署的主机名
[%X{localIp}]  // 服务部署的主机ip
[%X{clientIp}]  //客户端请求的ip
[%X{applicationName}]  //应用名称
[%X{requestUri}]  // 调用的地址
[%F,%L,%C,%M]  // 当前日志所处的类的信息 
[%m]  // 打印的消息
## '%ex'  // 异常信息  使用单引号包裹起来是够了方便后续的logstash的
%n  // 换行

filebeat安装

  • 下载

    // 迅雷下载
    https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.6.0-linux-x86_64.tar.gz
    
  • 上传至服务器解压

    cd /usr/local/src
    tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz
    
    mv filebeat-6.6.0-linux-x86_64 /usr/local/filebeat-6.6.0
    cd /usr/local/filebeat-6.6.0
    
  • filebeat.yml配置

    #============== Filebeat prospectors ===========
    filebeat.prospectors:
    - type: log
      paths:
        - /usr/local/src/logs/log4j2-demo/all.log
    #    - /usr/local/src/logs/*/*.log
      document_type: "all-log"
      fields:
        logbiz: log4j2-demo            # 项目名称
        log_topic: all-log-log4j2-demo     # kafka的topic
        evn: dev
      multiline:
        pattern: '^\['   #指定多行匹配的表达式 以[开头的标识一行新的数据
        negate: true     # 是否匹配到 
        match: after     # 没有匹配上正则合并到上一行末尾
        max_lines: 1000  # 最大未匹配上的行数
        timeout: 2s      # 指定时间没有新的日志 就不等待后面的日志输入
    
    - type: log          #定义多个输入
      paths:
        - /usr/local/src/logs/log4j2-demo/error.log
    #    - /usr/local/src/logs/*/*.log
      document_type: "err-log"
      fields:
        logbiz: log4j2-demo
        log_topic: err-log-log4j2-demo     # kafka的topic
        evn: dev
      multiline:
        pattern: '^\['   #指定多行匹配的表达式 以[开头的标识一行新的数据
        negate: true     # 是否匹配到 
        match: after     # 没有匹配上正则合并到上一行末尾
        max_lines: 1000  # 最大未匹配上的行数
        timeout: 2s      # 指定时间没有新的日志 就不等待后面的日志输入
    
    ## 定义输出的方式
    output.kafka:
      enabled: true    # 启动
      hosts: ["192.168.1.160:9092"]    #kafka的ip和port
      topic: '%{[fields.log_topic]}'   #指定输出到的topicname 也就是上面定义的变量
      partition.hash:                  # 分区规则 hash   
        reachable_only: true
      compression: gzip                # 数据压缩
      max_message_bytes: 1000000       # 最大的消息字节数
      required_acks: 1                 # kafka ack的方式 0:丢出去就好了  1:有一个响应就好了  -1:所有节点响应才算成功
    logging.to_files: true             #
    
  • 测试配置

    cd /usr/local/filebeat-6.6.0
    ./filebeat -c filebeat.yml -configtest
    

    file

  • kafka创建topic

    cd /usr/local/kafka/
    bin/kafka-topics.sh --zookeeper cache1000:2181,cache1001:2181,cache1002:2181 --topic all-log-log4j2-demo --replication-factor 1 --partitions 1 --create
    bin/kafka-topics.sh --zookeeper cache1000:2181,cache1001:2181,cache1002:2181 --topic err-log-log4j2-demo --replication-factor 1 --partitions 1 --create
    // 查看topic的详情
    bin/kafka-topics.sh --zookeeper cache1000:2181,cache1001:2181,cache1002:2181 --topic  all-log-log4j2-demo --describe
    

    file

  • 启动filebeat

    cd /usr/local/filebeat-6.6.0
    ./filebeat 1>/dev/null 2>&1 &
    
    ps -ef | grep filebeat
    

    file

  • 产生日志数据并确定kafka是否拿到了数据

    cd /tmp/kafka-logs/err-log-log4j2-demo-0
    ll
    cd /tmp/kafka-logs/all-log-log4j2-demo-0
    ll
    // 看到文件不为空,说明filebeat生产的数据kafka已经收到
    

    file

logstash

  • 下载

    // 迅雷下载
    https://artifacts.elastic.co/downloads/logstash/logstash-6.6.0.tar.gz
    
    // 上传至任意主机 /usr/local/src 目录下
    
  • 解压

    cd /usr/local/src
    tar -zxvf logstash-6.6.0.tar.gz
    mv logstash-6.6.0 /usr/local/logstash-6.6.0
    
  • 添加配置
    grok内置属性https://lupf.cn/articles/2020/08/17/1597661487154.html

    mkdir /usr/local/logstash-6.6.0/script
    vim logstash-script.conf
    
    # 添加以下配置
    # 输入从kafka
    input {
      kafka {
        topics_pattern => "all-log-.*"         # 也可以模糊匹配 如:all-log.*
        bootstrap_servers => "192.168.1.160:9092"   # kafka的ip 端口
        codec => json                               # 数据格式
        consumer_threads => 1                       # 对应partition的数量
        decorate_events => true
        #auto_offset_rest => "latest"               # 默认值就是这个
        group_id => "all-logs-group"                # kafka的消费组
      }
    
      kafka {
        topics_pattern => "err-log-.*"               # 也可以模糊匹配 如:err-log-*   这样就可以匹配到 err-log-product err-log-user
        bootstrap_servers => "192.168.1.160:9092"   # kafka的ip 端口
        codec => json                               # 数据格式
        consumer_threads => 1                       # 对应partition的数量
        decorate_events => true
        #auto_offset_rest => "latest"               # 默认值就是这个
        group_id => "err-logs-group"                # kafka的消费组
      }
    }
    
    filter {
      ruby {
        code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
      }
    
      ## 这里匹配的是filebeat中的fields.log_topic字段
      if "all-log" in [fields][log_topic] {
        grok {
          ## 表达式
          ## 这里的表达式的定义需要和真实的lo4j2中定义的规则保持一致;否则将无法匹配上
          match => ["message","\[%{NOTSPACE:currentDataTime}\] \[%{NOTSPACE:requestId}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:logger}\] \[%{DATA:hostName}\] \[%{DATA:localIp}\] \[%{DATA:clientIp}\] \[%{DATA:applicationName}\] \[%{DATA:requestUrl}\] \[%{DATA:location}\] \[%{DATA:loginfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
        }
      }
    
      if "err-log" in [fields][log_topic] {
        grok {
          ## 表达式
          match => ["message","\[%{NOTSPACE:currentDataTime}\] \[%{NOTSPACE:requestId}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:logger}\] \[%{DATA:hostName}\] \[%{DATA:localIp}\] \[%{DATA:clientIp}\] \[%{DATA:applicationName}\] \[%{DATA:requestUrl}\] \[%{DATA:location}\] \[%{DATA:loginfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
        }
      }
    }
    
    ## 输出方式
    ## 到控制台 当测试无误之后,可以将此输出关闭
    output {
      stdout { codec => rubydebug }
    }
    
    output {
      if "all-log" in [fields][log_topic] {
        elasticsearch {
          hosts => ["192.168.1.170:9200"]        # es的ip 端口
          # 用户名 密码
          # user => "admin"
          # password => "admin"
    
          # 索引名称(index)
          # 索引格式:all-log-应用名称-按天分组
          # [fields][logbiz]为filebeat中定义的变量
          index => "all-log-%{[fields][logbiz]}-%{index_time}"
          # 是否嗅探集群
          # 通过嗅探机制解析es集群的负载均衡发送日志
          sniffing => true 
    
          # logstash默认自带一个mapping模版,进行模版覆盖
          template_overwrite => true
        }
      }
    
      if "err-log" in [fields][log_topic] {
        elasticsearch {
          hosts => ["192.168.1.170:9200"]        # es的ip 端口
          # 用户名 密码
          # user => "admin"
          # password => "admin"
    
          # 索引名称(index)
          # 索引格式:all-log-应用名称-按天分组
          # [fields][logbiz]为filebeat中定义的变量
          index => "err-log-%{[fields][logbiz]}-%{index_time}"
          # 是否嗅探集群
          # 通过嗅探机制解析es集群的负载均衡发送日志
          sniffing => true
    
          # logstash默认自带一个mapping模版,进行模版覆盖
          template_overwrite => true
        }
      }
    }
    
  • 启动

    cd /usr/local/logstash-6.6.0
    // 前台启动 方便观察数据及日志
    ./bin/logstash -f ./script/logstash-script.conf &
    
    // 后台启动
    nohup ./bin/logstash -f ./script/logstash-script.conf 1>/dev/null 2>&1 &
    // jps查看是否有Logstash的进程
    jps
    
  • logstash的输出

    // 以下是产生的日志数据
    [2020-04-22T21:35:28.164+08:00] [4562f064-a59f-420d-8673-f9810ef1361c] [INFO] [http-nio-8080-exec-3-21] [com.lupf.log4j2demo.controller.Log4j2Controller] [elasticsearch0000] [192.168.1.170] [192.168.1.82] [log4j2-demo] [/info] [Log4j2Controller.java,14,com.lupf.log4j2demo.controller.Log4j2Controller,info] [我是info日志!!!] ## ''
    // logstash 接收到kafka的消息并输出的对象
    {
                    "log" => {
            "file" => {
                "path" => "/usr/local/src/logs/log4j2-demo/all.log"
            }
        },
               "@version" => "1",
        "currentDataTime" => "2020-04-23T13:51:58.396+08:00",
                  "input" => {
            "type" => "log"
        },
                   "host" => {
            "name" => "elasticsearch0000"
        },
                   "beat" => {
                "name" => "elasticsearch0000",
            "hostname" => "elasticsearch0000",
             "version" => "6.6.0"
        },
                 "source" => "/usr/local/src/logs/log4j2-demo/all.log",
             "index_time" => "2020.04.23",
             "requestUrl" => "/info",
                 "offset" => 69870,
                "message" => "[2020-04-23T13:51:58.396+08:00] [54452aa5-8a5c-4518-8e43-be347c55ff09] [INFO] [http-nio-8080-exec-7-25] [com.lupf.log4j2demo.controller.Log4j2Controller] [elasticsearch0000] [192.168.1.170] [192.168.1.82] [log4j2-demo] [/info] [Log4j2Controller.java,14,com.lupf.log4j2demo.controller.Log4j2Controller,info] [我是info日志!!!] ## ''",
        "applicationName" => "log4j2-demo",
                 "logger" => "com.lupf.log4j2demo.controller.Log4j2Controller",
             "prospector" => {
            "type" => "log"
        },
                "loginfo" => "我是info日志!!!",
              "thread-id" => "http-nio-8080-exec-7-25",
             "@timestamp" => 2020-04-23T05:51:59.763Z,
               "clientIp" => "192.168.1.82",
                  "level" => "INFO",
               "location" => "Log4j2Controller.java,14,com.lupf.log4j2demo.controller.Log4j2Controller,info",
               "hostName" => "elasticsearch0000",
                 "fields" => {
            "log_topic" => "all-log-log4j2-demo",
               "logbiz" => "log4j2-demo",
                  "evn" => "dev"
        },
              "requestId" => "54452aa5-8a5c-4518-8e43-be347c55ff09",
                "localIp" => "192.168.1.170"
    }
    
  • 查看kafka的消费情况

    cd /usr/local/kafka/
    bin/kafka-consumer-groups.sh --bootstrap-server cache1000:9092,cache1001:9092,cache1002:9092 --describe --group err-logs-group
    bin/kafka-consumer-groups.sh --bootstrap-server cache1000:9092,cache1001:9092,cache1002:9092 --describe --group all-logs-group
    

    file

logstash输入、过滤器,输出测试

当如果上面的 logstash配置导致输入、过滤器、输出的不对,就可以通过以下的方式进行配置测试,快速定位问题或者配置匹配调整;

  • 定义一个手动输入的配置文件

    cd /usr/local/logstash-6.6.0
    vim script/test.conf
    
    // 加入以下配置
    # 手动输入
    input {
      stdin {
      }
    }
    
    # 过滤器
    filter{
      grok{
        # 测试匹配IP
        match => {"message" => "%{IPV4:ip}"}
      }
    }
    
    # 输出
    output {
      stdout {
      }
    }
    
  • 通过以上的配置文件启动logstash

    cd /usr/local/logstash-6.6.0
    ./bin/logstash -f ./script/test.conf
    
    // 然后直接输入想要测试的文本,回车就可以看见结果;如:
    192.168.1.123 aabbcc 11223344
    

    file

Kibana日志信息查看

  • 进入kibana

    // kibana监听的是5601端口
    http://elasticsearch:5601
    
  • 优先查看索引是否是正常的

    GET /_cat/indices?v
    

    file

  • 配置索引管理
    file
    file
    file
    file

  • 日志查看
    file

到此!一个从0搭建的ELK技术栈即完成!!!



标题:从0开始搭建一套完整的ELK分布式日志管理系统
作者:码霸霸
地址:https://lupf.cn/articles/2020/04/23/1587622994136.html