基于ELK收集Nginx的日志

  |   0 评论   |   浏览

前言

前面又讲到,如何从0搭建ELK服务,如果还不太清楚的可以参考: https://lupf.cn/articles/2020/04/23/1587622994136.html

关于日志规范,可以参考 :https://lupf.cn/articles/2020/06/04/1591273110824.html

Nginx日志采集

第一步,规范Nginx日志

这里就直接讲配置了,就不涉及到软件的安装了,如果不知道可以参考上面的文章搭建

这里就直接讲配置了,就不涉及到软件的安装了,如果不知道可以参考上面的文章搭建
这里就直接讲配置了,就不涉及到软件的安装了,如果不知道可以参考上面的文章搭建

  • 配置

    log_format  main
     '[$time_iso8601] ' # 通用日志格式的本地时间
     '[$hostname] ' # 主机名
     '[$remote_addr] ' # 客户端地址
     '[$remote_port] ' # 客户端端口
     '[$connection] ' # 连接序列号
     '[$server_addr] ' # 接受请求的服务器的地址
     '[$server_port] ' # 接受请求的服务器的端口
     '[$request_id] '  # 请求的唯一ID
     '[$request_method] ' # 请求方式
     '[$request_uri] ' # 完整的原始请求URI
     '[$request_time] ' # 请求以毫秒为单位的处理时间,以毫秒为单位(1.3.9,1.2.6;从客户端读取第一个字节以来经过的时间
     '[$request_length] ' # 请求长度
     '[$request_body] ' # 请求主体
     '[-] ' #'[$bytes_received] ' #从客户端收到的字节数
     '[$content_type] ' # 内容类型
     '[$content_length] ' # 内容长度
     '[$remote_user] ' # 基本身份验证随附的用户名
     '[$request] ' # 完整的原始请求行
     '[$scheme] ' # 请求方案 http或https
     '[$msec] ' # 请求以秒为单位的时间戳
     '[-] ' #'[$protocol] ' # 与客户端通信的协议: TCP或UDP
     '[-] ' #'[$session_time] ' # 会话持续时间
     '[$upstream_addr] ' # 与上游的连接地址
     '[$upstream_response_time] ' # 从上游接受响应的耗时
     '[$upstream_connect_time] ' # 与上游建立连接的时间
     '[$upstream_bytes_sent] ' # 发送到上游服务器的字节数
     '[$upstream_bytes_received] ' # 发送到上游服务器的字节数
     '[$upstream_status] ' # 从上游获取的响应状态
     '[$status] ' # 响应状态
     '[$body_bytes_sent] ' # 发送给客户端的字节数,不计算响应头
     '[$bytes_sent] ' # 发送给客户端的字节数
     '[$http_user_agent] '
     '[$http_referer] '
     '[$http_host] '
     '[$http_x_forwarded_for] '
     '[$http_Authorization] '
     '[$cookie_uid] ';
    
    # 日志保存的位置
    access_log  logs/busi_log/access.log  main;
    

    日志输出的样例,该日志即根据上面的配置输出的日志

    [2020-09-23T00:11:00+08:00] [lvs-webserver1] [192.168.1.28] [56899] [6534884] [192.168.1.207] [443] [e3094fa308646dccdf170c95abcff40d] [GET] [/r/refs?service=git-upload-pack] [0.008] [253] [-] [-] [-] [-] [-] [GET /r/refs?service=git-upload-pack HTTP/1.1] [https] [1600791060.059] [-] [-] [192.168.1.202:10101] [0.008] [0.005] [335] [514] [401] [401] [328] [589] [git/2.26.0] [-] [git.tongkabao.com.cn] [-] [-] [-]
    [2020-09-23T00:11:00+08:00] [lvs-webserver1] [192.168.1.28] [56900] [6534887] [192.168.1.207] [443] [ecde0d3f923432367266b2aea7eb5562] [GET] [/r/refs?service=git-upload-pack] [0.004] [252] [-] [-] [-] [-] [-] [GET /r/refs?service=git-upload-pack HTTP/1.1] [https] [1600791060.066] [-] [-] [192.168.1.202:10101] [0.004] [0.000] [334] [513] [401] [401] [327] [588] [git/2.26.0] [-] [git.tongkabao.com.cn] [-] [-] [-]
    [2020-09-23T00:11:00+08:00] [lvs-webserver1] [192.168.1.28] [56905] [6534898] [192.168.1.207] [443] [ab7443972fd6eade17833a4e0070eec1] [GET] [/r/refs?service=git-upload-pack] [0.004] [288] [-] [-] [-] [-] [jenkins] [GET /r/refs?service=git-upload-pack HTTP/1.1] [https] [1600791060.077] [-] [-] [192.168.1.202:10101] [0.004] [0.000] [370] [514] [401] [401] [328] [589] [git/2.26.0] [-] [git.tongkabao.com.cn] [-] [Basic amVua2luczo=] [-]
    
第二步,FileBeat采集N给inx日志

在nginx的http节点下配置

FileBeat有提供Nginx的收集模块,但是我们也完全可以把Nginx的日志单纯当作一个普通的日志去采集

  • 配置 filebeat.yml

    #============== Filebeat prospectors ===========
    filebeat.prospectors:
    - type: log
      paths:
        - /usr/local/openresty/nginx/logs/busi_log/access.log
      document_type: "nginx-access-log"
      fileset:
        module: nginx
        name: access
      fields:
        module: nginx
        name: access
        log_topic: nginx-log
        service: kt  # 自定义的服务名称
        evn: dev # 自定义的环境 用户将不同环境的日志采集到不同的kafka topic中
    
      multiline:
        pattern: '^\['   #指定多行匹配的表达式 以[开头的标识一行新的数据
        negate: true     # 是否匹配到
        match: after     # 没有匹配上正则合并到上一行末尾
        max_lines: 1000  # 最大未匹配上的行数
        timeout: 2s      # 指定时间没有新的日志 就不等待后面的日志输入
    
    # 以下为error的采集 暂时关闭
    #- type: log
    #  paths:
    #    - /usr/local/openresty/nginx/logs/error.log
    #  document_type: "nginx-error-log"
    #  fileset:
    #    module: nginx
    #    name: error
    #  fields:
    #    module: nginx
    #    name: error
    #    log_topic: nginx-log
    #    evn: dev
    #
    #  multiline:
    #    pattern: '^\['   #指定多行匹配的表达式 以[开头的标识一行新的数据
    #    negate: true     # 是否匹配到
    #    match: after     # 没有匹配上正则合并到上一行末尾
    #    max_lines: 1000  # 最大未匹配上的行数
    #    timeout: 2s      # 指定时间没有新的日志 就不等待后面的日志输入
    
    
    ## 定义输出的方式
    output.kafka:
      enabled: true    # 启动
      hosts: ["kafka地址1:9092","kafka地址1:9093"]    #kafka的ip和port
      topic: '%{[fields.log_topic]}-%{[fields.evn]}'   #指定输出到的topicname 也就是上面定义的变量
      partition.hash:                  # 分区规则 hash
        reachable_only: true
      compression: gzip                # 数据压缩
      max_message_bytes: 1000000       # 最大的消息字节数
      required_acks: 1                 # kafka ack的方式 0:丢出去就好了  1:有一个响应就好了  -1:所有节点响应才算成功
    logging.to_files: true             #
    
    • 注意点1

      日志的地址一定要正确

    • 注意点2

      fields节点相当于自定义参数,是为了后续保存到不同的topic index做准备的

    • 注意点3

      kafka地址

    • 注意点4

      Kafka中需要提前创建好对应的topic

logStash配置
  • 配置

    # 添加以下配置
    # 输入从kafka
    input { 
      kafka {
        topics_pattern => "nginx-log-.*"               # 也可以模糊匹配 如:err-log-*   这样就可以匹配到 err-log-product err-log-user
        bootstrap_servers => "ip:9092"   # kafka的ip 端口
        codec => json                               # 数据格式
        consumer_threads => 1                       # 对应partition的数量
        decorate_events => true
        #auto_offset_rest => "latest"               # 默认值就是这个
        group_id => "nginx-logs-group"                # kafka的消费组
        client_id => "logstash-1-1"
      }
    }
    
    filter {
      ruby {
        code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
      }
    
      if "nginx-log" in [fields][log_topic] {
          grok {
            match => ["message","\[%{NOTSPACE:currentDataTime}\] \[%{DATA:hostname}\] \[%{NOTSPACE:remoteAddr}\] \[%{NOTSPACE:remote.port}\] \[%{NOTSPACE:connection}\] \[%{NOTSPACE:server.addr}\] \[%{NOTSPACE:server.port}\] \[%{DATA:request.id}\] \[%{DATA:request.method}\] \[%{DATA:request.uri}\] \[%{DATA:request.time}\] \[%{DATA:request.length}\] \[%{DATA:request.body}\] \[%{DATA:bytesReceived}\] \[%{DATA:content.type}\] \[%{DATA:content.length}\] \[%{DATA:remote.user}\] \[%{DATA:request.info}\] \[%{DATA:scheme}\] \[%{DATA:msec}\] \[%{DATA:protocol}\] \[%{DATA:sessionTime}\] \[%{DATA:upstream.addr}\] \[%{DATA:upstream.responseTime}\] \[%{DATA:upstream.connectTime}\] \[%{DATA:upstream.bytesSent}\] \[%{DATA:upstream.bytesReceived}\] \[%{DATA:upstream.status}\] \[%{DATA:status}\] \[%{DATA:bodyBytesSent}\] \[%{DATA:bytesSent}\] \[%{DATA:http.user_agent}\] \[%{DATA:http.referer}\] \[%{DATA:http.host}\] \[%{DATA:http.x_forwarded_for}\] \[%{DATA:http.Authorization}\] \[%{DATA:cookie.uid}\] "]
          }
    
          # 根据条件添加数据
          if "iPhone;" in [http.user_agent] {
            mutate { add_field => { "deviceOS" => "iphone OS" } }
          }else if "Android" in [http.user_agent] {
            mutate { add_field => { "deviceOS" => "Android" } }
          }else if "Windows" in [http.user_agent] {
            mutate { add_field => { "deviceOS" => "Windows" } }
          }else if "Macintosh;" in [http.user_agent] {
            mutate { add_field => { "deviceOS" => "Mac OS" } }
          }else {
            mutate { add_field => { "deviceOS" => "unkown" } }
          }
    
          # 根据geoip 分析用户所处的地区
          geoip {
            source => [remoteAddr]
            target => "geoip"
            database => "/usr/local/logstash-6.8.9/GeoLite2-City.mmdb"
            #add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
            #add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
          }
    
          # 类型转换
          mutate {
            convert => [ "bodyBytesSent", "integer" ]
            convert => [ "bytesSent", "integer" ]
            convert => [ "remote.port", "integer" ]
            convert => [ "request.length", "integer" ]
            convert => [ "server.port", "integer" ]
            convert => [ "status", "integer" ]
            convert => [ "upstream.bytesReceived", "integer" ]
            convert => [ "upstream.bytesSent", "integer" ]
            convert => [ "upstream.status", "integer" ]
            convert => [ "msec", "float" ]
            convert => [ "request.time", "float" ]
            convert => [ "upstream.connectTime", "float" ]
            convert => [ "upstream.responseTime", "float" ]
            #convert => [ "[geoip][coordinates]", "float" ]
          }
        }
    # }
    }
    
    ## 输出方式
    ## 到控制台 当测试无误之后,可以将此输出关闭
    output {
      stdout { codec => rubydebug }
    }
    
    output {
      if "nginx-log" in [fields][log_topic] {
        elasticsearch {
          hosts => ["ip:9200"]        # es的ip 端口
          # 用户名 密码
          user => "用户名"
          password => "密码"
    
          # 索引名称(index)
          # 索引格式:all-log-应用名称-按天分组
          # [fields][logbiz]为filebeat中定义的变量
          index => "logstash-%{[fields][module]}-%{[fields][name]}-%{[fields][evn]}-%{index_time}"
          # 是否嗅探集群
          # 通过嗅探机制解析es集群的负载均衡发送日志
          sniffing => true
    
          # logstash默认自带一个mapping模版,进行模版覆盖
          template_overwrite => true
        }
      }
    }
    
    • 注意点1

      kafka的地址

    • 注意点2

      kafka中的topic,不能填错了

    • 注意点3;GEO

      可以按以下地址下载

      链接:https://pan.baidu.com/s/1aMZdF8uyCu46eaJwrH-S_Q
      提取码:4aj7

    • 注意点4;添加字段和转换字段

    • 注意点5;ES的index得根据自己的场景规划好

ELK采集Nginx,完。。。



标题:基于ELK收集Nginx的日志
作者:码霸霸
地址:https://lupf.cn/articles/2020/09/23/1600792705484.html