logstash是一个数据收集处理引擎,可是视之为一个ETL工具。
file
, syslog
, redis
, beats
等json
,multiline
In Memory
和Persistent Queue In Disk
两种grok
,mutate
, drop
, clone
, geoip
等elastcisearch
,file
, graphite
, statsd
等Pipline
由【Input
】-【Filter
】-【Output
】3个阶段处理流程组成,包含了队列管理和插件生命周期管理
Logstash Event
是内部流转的数据的表现形式。原数据在Input
转换为Event
,在Output event
时被转换为目标数据格式,在配置文件中可以对Event
中的属性进行判断和增删改查
In Memory
是一个内存的Queue,无法通过参数设置。无法处理进程崩溃、机器宕机等情况,会导致数据丢失
Persistent Queue In Disk
可保证在出现意外的情况下保证数据不丢失,而且能保证数据至少被消费一次
Persistent Queue In Disk
对性能影响估计是在5%以内,如果没有特殊需求一般建议打开
参数 | 解释 |
---|---|
queue.type | 队列类型可选值:persisted、memory【默认】 |
queue.max_bytes | 队列存储的最大容量,默认1gb |
path.queue | Persistent Queue In Disk存到磁盘的哪个位置 |
queue.page_capacity | 控制消息队列每一个文件的大小,默认64mb |
queue.checkpoint.writes | 提升容灾能力,如果是1 表示每写一个数据都去做盘,默认1024 |
pipeline.workers | filter_output的线程数,默认CPU核心数 |
pipeline.batch.size | 批处理延迟文档数,默认125 |
pipeline.batch.delay | 批处理延迟,默认50ms |
在conf文件夹中被称之为setting files
logstash.yml
中是logstash相关配置比如node.name
,path.data
,pipline.workers
、queue.type
等,这些配置可以被命令行参数替代参数 | 描述 |
---|---|
node.name | 节点名,便于识别 |
path.data | 持久化存储数据文件夹,默认logstash下的data目录,多实例不能相同 |
path.config | 设定pipeline的路径,如果是目录则扫描目录下的所有.conf文件 |
path.log | pipeline日志文件的目录 |
pipeline.workers | 设定pipeline的线程数 |
pipeline.batch.size/delay | 批处理延迟 |
queue.type | 队列类型可选值:persisted、memory |
queue.max_bytes | 队列存储的最大容量,默认1gb |
命令行参数 | 描述 |
---|---|
–node.name | 同node.name |
-f | 同path.config |
–path.settings | logstash文件夹目录主要包含logstash.yml,启动多实例时会用 |
-e | 指明pipeline文件内容,多用于测试 |
-w | 同pipeline.workers |
-b | 同pipeline.batch.size |
–path.data | 同path.data |
–debug | 打开调试日志 |
-t | 检测pipeline文件语法是否正确 |
jvm.options
是修改jvm的相关参数pipeline文件
定义数据处理流程的文件,默认以.conf
结尾,结构如下
input {}
filter {}
output {}
数据类型
布尔类型 isFailed => true
数值类型 num => 23
字符串类型 name => “Hello”
数组类型 user => [ { “id” =>1, “name” => “bob” }, { “id” => 2, “name” => “tom” } ] | name => [ “bob” , “tom” ] |
哈希类型
match => {
"k1" => "v1"
"k2" => "v2"
}
注释
井号开头# This is a comment
取属性
判断值
input插件指定了数据输入源,一个pipeline可以有多个input插件。
最简单的输入类型,从标准输入流读入数据,通用配置为
input {
stdin {
codec => "plain"
type => "std"
tags => ["info log"]
add_field => {
"info" => "this is info message"
}
}
}
output {
stdout {
codec => "rubydebug"
}
}
file插件会定时检查文件是否更新和文件夹下是否有新文件生成,为了不重复读取内容,使用了sincedb
记录了文件读取的行号。文件记录的的底层的nodeId
所以当文件发生归档时也不会发生错误。其配置为
glob
语法glob
语法sincedb
文件路径beginning
or end
,是否从头读取文件start_postion有两个参数可选,默认是end,表示运行logstash运行之后只会读取从logstash运行之后新进来的数据。如果想让logstash从头读取要设置为beginning。注意只有在第一次读取的时候这个beginning才生效,一旦读取过这个文件在sincedb中有记录了,logstash之后再去跑发现在sincedb中有记录了,就不会生效了也就是不会再从头读取了。这个的坏处就是我们做一些调试的时候比较麻烦只能把sincedb文件给删掉。我们可以把sincedb_path设置为/dev/null 这是个特殊的文件,所有写入到这个文件的内容都不会存储,那么这样再把start_postion设置为beginning这样每一次运行logstash的时候都是从头读取的。
input {
file {
path => "/var/log/*.log"
sincedb_path => "/dev/null"
start_position => "beginning"
discover_interval => 15
}
}
output {
stdout {codec => rubydebug {}}
}
kafka是最流行的消息队列,在Elastic Stack架构中常用,使用相对简单
input {
kafka {
zk_connect => "kafka:2181"
group_id => "logstash"
topic_id => "apache_logs"
consumer_threads => 16
}
}
output {
stdout {codec => rubydebug {}}
}
常和启动参数-r
连用,用于热调试
input {
http {
port => "5555"
}
}
output {
stdout {codec => rubydebug {}}
}
codec
插件作用于input
插件和output
插件,负责将数据在原始于logstash event
之间转换,常见的codec有
Logstash Event
按照ruby格式输出,便于调试bin/logstash -e "input{stdin{codec=>\"plain\"}} output{stdout{codec=>\"rubydebug\"}}"
bin/logstash -e "input{stdin{codec=>\"plain\"}} output{stdout{codec=>\"dots\"}}"
bin/logstash -e "input{stdin{codec=>\"line\"}} output{stdout{codec=>\"rubydebug\"}}"
bin/logstash -e "input{stdin{codec=>\"json\"}} output{stdout{codec=>\"rubydebug\"}}"
当一个Event的message由多行组成时需要使用mutiline codec
,常见的情况时处理Java堆栈异常。主要参数如下
grok
语法what previous | next 如果匹配成功,那么匹配行是属于上一个时间还剩下一个事件 |
negate true | false 是否对pattern 匹配结果取反,默认false |
##################################以下可以用于简单读取java堆栈跟踪日志##################################
input {
stdin {
codec => multiline {
what => "next"
negate => false
}
}
}
output {
stdout {
codec => "rubydebug"
}
}
Filter是logstash功能强大的主要原因,它可以对Logstash Event
进行丰富的处理,比如数据解析、删除字段、类型转换等等。常见如下:
Logstash Event
将日期字符串解析为日期类型
input {
stdin {
codec => json
}
}
filter {
date {
# 匹配的字段
match => ["log_date", "yyyy-MM-dd HH:mm:ss"]
# 赋值目标字段,如果不设置则会覆盖@timestamp,如果设置时区而且覆盖本字段,时区无效
target => "log_data"
# 时区设置
timezone => "Asia/Shanghai"
}
}
output{
stdout {
codec => rubydebug
}
}
使用定义好的或者自定义的正则表达式进行匹配,已定义的常用正则查看参考
Grok语法为%{SYNTAX:SEMANTIC}
,SYNTAX
为grok pattern
的名称,SEMANTIC
为赋值字段名称。%{NUMBER:duration}
可以用于匹配数值类型,可以在后跟int
或float
来进行强制类型转换,例如%{NUMBER:duration:int}
。如果匹配失败将会有一个_grokparsefailure
字段产生,可用于后续判断
##################################使用系统定义的pattern##################################
filter {
grok {
match => { "message"=>"%{IP:ip} %{GREEDYDATA:data}"}
}
}
##################################使用自定义的pattern##################################
filter {
grok {
match => { "message"=>"%{IP:ip} %{SERVICE_NAME:service_name} %{GREEDYDATA:data}"}
pattern_definitions => {
"SERVICE_NAME" => "[a-zA-Z]{5}"
}
}
}
###################################overwrite的使用###################################
filter {
grok {
match => { "message"=>"%{IP:ip} %{GREEDYDATA:message}"}
overwrite => ["message"] #将上述匹配的message信息覆盖到message,如果没有overwrite将不执行覆盖操作
}
}
基于分隔符的原理进行解析数据,数据比grok
快,但是使用具有局限性
+
具有拼接效果/num
代表拼接的顺序?
代表创建一个属性名,&
代表给?
的属性名赋值convert_datatype
可以进行类型转换filter {
dissect {
mapping => {
"message" => "%{+ts/2} %{+ts/1}"
"urlParams" => "%{?username}=%{&username}"
}
convert_datatype => {
"age" => "int"
}
}
}
可以对各种字段进行各种操作,比如重命名、删除、替换、更新等。主要操作如下:
############################################################################
#convert 实现字段类型转换,类型为hash,支持转换为integer、float、string和boolean #
############################################################################
filter {
mutate {
convert => {
"age" => "integer"
"isLived" => "boolean"
}
}
}
############################################################################
#gsub 对字段内容进行替换,类型为数组,每三项为一个替换配置 #
############################################################################
filter {
mutate {
gsub => [
"path","[\\]","/",
"urlParams","[\\?#-]","."
]
}
}
############################################################################
#split/join/merge 字符串切割、数组合并为字符串、数组合并为数组 #
############################################################################
filter {
mutate {
split => {"jobs" => ","}
join => {"hobby" => ","}
merge => {"dest_arr" => "source_arr"}
}
}
############################################################################
#rename 将字段重命名 #
############################################################################
filter {
mutate {
rename => {
"message" => "source"
"@timestamp" => "create_time"
}
}
}
############################################################################
#update 只在字段存在时生效 #
#replace 如果字段不存在会执行新增操作 #
############################################################################
filter {
mutate {
replace => {"source" => "source: %{message}"}
update => {"age" => "This is %{age}"}
}
}
############################################################################
#remove_field 删除字段 #
############################################################################
filter {
mutate {
remove_field => ["@timestamp"]
}
}
将字段内容为json格式的数据进行解析
# 注意不能在http的Post的entity为Json的状态下测试
filter {
json {
source => "message"
# 如果没有target它将会被放在root级别,如果有target将会放在target下
target => "msg_json"
}
}
提供了根据Ip地址对应的经纬度、城市名等数据的查询,方便进行地理位置的分析
filter {
geoip {
source => "ip"
}
}
最灵活的插件,可以通过ruby代码来修改Logstash Event
filter {
ruby {
codec => 'size = event.get("message").size;
event.set("message_size", size)'
}
}
输出到标准输出流,一般用于调试
output {
stdout {
codec => rubydebug
}
}
输出到文件,一般用于将多地分散日志统一的需求
# 默认以json格式进行输出,下面改为了行输出,输出Event中的format字段
output {
file {
path => "/opt/web.log"
codec => line{ format => "%{message}"}
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "nginx-%{+YYYY.MM.dd}"
document_id => "%{id}" #作为ID的字段
}
}
存在一个@metadata
的字段,其内容不会输出在output中,此字段适合用于做条件判断,临时存储等。相比remove_field
性能好。
input {
http {
port => 5555
}
}
filter {
mutate {
add_field => {
"[@metadata][debug]"=> true
}
}
}
output {
if([@metadata][debug]) {
stdout {
codec => rubydebug
}
}else {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "nginx-%{+YYYY.MM.dd}"
}
}
}
在logstash.yml
下打开xpack
监控配置,即可在elasticsearch中查看logstash状态
xpack.monitoring.elasticsearch.hosts: ["http://192.168.1.155:9200"] #注意http
第一步:环境准备
数据库如果做增量数据导入,必须提高一个可控边界。可控边界一般使用自增主键或者时间戳
logstash不能创建ElasticSearch中的索引,所以需要手工创建索引或使用索引模板
导入mysql驱动jar包。最佳保存在$logstash_home/config目录中
第二步:定义logstash-mysql增量导入配置文件
$logstash_home/config创建文件logstash-mysql-es.conf
input {
jdbc {
# mysql相关jdbc配置
jdbc_connection_string => "jdbc:mysql://192.168.1.117:3306/logstash?useUnicode=true&characterEncoding=utf-8&useSSL=false"
jdbc_user => "root"
jdbc_password => "123456"
# jdbc连接mysql驱动的文件目录
jdbc_driver_library => "./config/mysql-connector-java-5.1.5.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => "50000"
jdbc_default_timezone =>"Asia/Shanghai"
# 可以直接写SQL语句在此处,使用大于等于避免丢失数据。如下:
statement => "select * from test_logstash where update_time >= :sql_last_value"
# 也可以将SQL定义在文件中,如下:
#statement_filepath => "./config/jdbc.sql"
# 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
schedule => "* * * * *"
# 在ES6.x版本中,不需要定义type。即使定义,logstash也是自动创建索引type为doc
#type => "doc"
# 是否记录上次执行结果, 如果为真,将会把上次执行到的tracking_column字段的值记录下来,保存到last_run_metadata_path指定的文件中,对磁盘的存储压力太大
#record_last_run => true
# 是否需要记录某个column的值,如果record_last_run为真,可以自定义我们需要track的column名称,此时该参数就要为true。否则默认track的是timestamp的值
use_column_value => true
# 如果use_column_value为真,需配置此参数.track的数据库column名,该column必须是递增的. 一般是mysql主键
tracking_column => "update_time"
tracking_column_type => "timestamp"
last_run_metadata_path => "./logstash_capital_bill_last_id"
# 是否清除last_run_metadata_path的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => false
#是否将字段(column)名称转小写
lowercase_column_names => false
}
}
output {
elasticsearch {
hosts => "localhost:9200" #如果是多个ES,使用逗号分隔多个ip和端口
index => "mysql_datas" #索引名称
document_id => "%{id}" #数据库中数据与ES中document数据关联的字段,此处代表数据库中的id字段和ES中的document的id关联
template_overwrite => true #是否使用模板,开启效率更高
}
# 这里输出调试,正式运行时可以注释掉
stdout {
codec => json_lines
}
}
第三步:启动
bin/logstash -f config/logstash-mysql-es.conf
input {
file {
path => "/opt/elasticsearch/logstash/instance2/earthquakes.csv"
sincedb_path => "/dev/null"
start_position => "beginning"
}
}
filter {
csv {
columns => ["dateTime","latitude","longitude","depth","magnitude","magType",
"nbStations","gap","distance","rms","source","eventID"]
convert => {
"latitude" => "float"
"longitude" => "float"
"depth" => "float"
"rms" => "float"
"gap" => "float"
}
}
date {
match => ["dateTime", "yyyy/MM/dd HH:mm:ss.SS"]
timezone => "Asia/Shanghai"
}
mutate {
add_field => {
"[@metadata][debug]"=> true
}
}
}
output {
if([@metadata][debug]) {
stdout {
codec => rubydebug
}
}else {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "csv_datas"
}
}
}