通过源代码可以看出 logstash-input-kafka
if @decorate_events
event.set("[@metadata][kafka][topic]", record.topic)
event.set("[@metadata][kafka][consumer_group]", @group_id)
event.set("[@metadata][kafka][partition]", record.partition)
event.set("[@metadata][kafka][offset]", record.offset)
event.set("[@metadata][kafka][key]", record.key)
event.set("[@metadata][kafka][timestamp]", record.timestamp)
input {
kafka {
bootstrap_servers => "127.0.0.1:9092"
topics => ["test"]
group_id => "test"
#如果使用元数据就不能使用下面的byte字节序列化,否则会报错
#key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
#value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
consumer_threads => 1
#默认为false,只有为true的时候才会获取到元数据
decorate_events => true
auto_offset_reset => "earliest"
}
}
filter {
mutate {
#从kafka的key中获取数据并按照逗号切割
split => ["[@metadata][kafka][key]", ","]
add_field => {
#将切割后的第一位数据放入自定义的“index”字段中
"index" => "%{[@metadata][kafka][key][0]}"
}
}
}
output {
elasticsearch {
user => elastic
password => changeme
pool_max => 1000
pool_max_per_route => 200
hosts => ["127.0.0.1:9200"]
index => "test-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}