|
Logstash事件处理管道包括三个阶段:输入→过滤器→输出。
输入会生成事件,过滤器会对其进行修改,输出会将它们发送到其他地方。
1 inputs
| file |
从文件系统上的文件读取,非常类似于UNIX命令tail -0F |
| syslog |
在端口514上侦听syslog消息并根据RFC3164格式进行解析 |
| redis |
使用redis通道和redis列表从redis服务器读取 |
| beats |
进程的事件发送的beats |
2 filters
| grok |
解析和构造任意文本,Grok当前是Logstash中将非结构化日志数据解析为结构化和可查询内容的最佳方法 |
| mutate |
对事件字段执行常规转换。您可以重命名,删除,替换和修改事件中的字段 |
| drop |
完全删除事件,例如调试事件 |
| clone |
复制事件,可能会添加或删除字段 |
| geoip |
添加有关IP地址地理位置的信息 |
filter {
if [action] == "login" {
mutate { remove_field => "secret" }
}
}
filter {
date {
match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]
}
}
3 outputs
| elasticsearch |
将事件数据发送到Elasticsearch |
| file |
将事件数据写入磁盘上的文件 |
| graphite |
将事件数据发送到graphite |
| statsd |
将事件数据发送到statsd |
output {
if "_grokparsefailure" not in [tags] {
elasticsearch { ... }
}
}
input { stdin { } }
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
4 JDK要求
Logstash requires Java 8 or Java 11.
5 配置文件logstash.yml
6 配置文件pipelines.yml
7 配置文件jvm.options
8 配置文件startup.options
9 配置文件
1】值类型
| 类型 |
例子 |
| 数组 |
users => [ {id => 1, name => bob}, {id => 2, name => jane} ] |
| 列表 |
path => [ "/var/log/messages", "/var/log/*.log" ]
uris => [ "http://elastic.co", "http://example.net" ] |
| 布尔值 |
ssl_enable => true |
| 字节 |
my_bytes => "1113" # 1113 bytes
my_bytes => "10MiB" # 10485760 bytes
my_bytes => "100kib" # 102400 bytes
my_bytes => "180 mb" # 180000000 bytes |
| 编解码器 |
codec => "json" |
| 哈希 |
match => {
"field1" => "value1"
"field2" => "value2"
...
}
match => { "field1" => "value1" "field2" => "value2" }
|
| 编号 |
port => 33 |
| 密码 |
my_password => "password" |
| URI |
my_uri => “ http:// foo:bar@example.net” |
| 路径 |
my_path => "/tmp/logstash" |
| |
|
2】转义序列
| \r |
回车 |
| \n |
换行 |
| \t |
制表tab |
| \\ |
反斜杠 |
| \" |
双引号 |
| \' |
单引号 |
-------------------------------------------------------------------------------------------------------------
1】logstash-simple.conf
input { stdin { } }
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
bin/logstash -f logstash-simple.conf
2 测试logstash输入输出
cd logstash-7.2.1
bin/logstash -e 'input { stdin { } } output { stdout {} }'
正常输出如下:

3 配置filebeat-->logstash-->elasticsearch
1】filebeat配置日志输入logstash
filebeat.yml
filebeat.inputs:
- type: log
paths:
- /var/log/*.log
fields:
type: syslog
output.logstash:
hosts: ["localhost:5044"]
2】filebeat启动
# 强制Filebeat从头读取日志文件
rm data/registry
./filebeat -e -c filebeat.yml -d "publish"
3】配置logstash
test.conf
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
4】logstash启动
# 验证配置文件语法
bin/logstash -f test.conf --config.test_and_exit
# 启用自动重新加载配置
bin/logstash -f test.conf --config.reload.automatic
-------------------------------------------------------------------
1】filebeat配置
向事件添加一个type带有值的字段syslog
filebeat.inputs:
- type: log
paths:
- /var/log/*.log
fields:
type: syslog
output.logstash:
hosts: ["localhost:5044"]
2】写入多个Elasticsearch节点
output {
elasticsearch {
hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
}
}
|