侧边栏壁纸
博主头像
新都在 博主等级

行动起来,活在当下

  • 累计撰写 175 篇文章
  • 累计创建 37 个标签
  • 累计收到 88 条评论

目 录CONTENT

文章目录

使用logstash将kafka中的元数据写入到es中

Carol
2023-07-26 / 0 评论 / 0 点赞 / 48 阅读 / 0 字 / 正在检测是否收录...

😋特惠💰

多个付费教程捆绑包限时特惠,8折打包购买: 详情点击爱发电 ~~~

使用logstash将kafka中的元数据写入到es中

目前用的是logstash7.3版本,匹配kafka2.0以上。Logstash7.3 kafka input doc

logstash7.3目前支持的元数据只有,header元数据目前logstash暂时不支持

通过源代码可以看出 logstash-input-kafka

if @decorate_events
	event.set("[@metadata][kafka][topic]", record.topic)
	event.set("[@metadata][kafka][consumer_group]", @group_id)
	event.set("[@metadata][kafka][partition]", record.partition)
	event.set("[@metadata][kafka][offset]", record.offset)
	event.set("[@metadata][kafka][key]", record.key)
	event.set("[@metadata][kafka][timestamp]", record.timestamp)
  • [metadata][kafka][topic]: Original Kafka topic from where the message was consumed.
  • [@metadata][kafka][consumer_group]: Consumer group
  • [@metadata][kafka][partition]: Partition info for this message.
  • [@metadata][kafka][offset]: Original record offset for this message.
  • [@metadata][kafka][key]: Record key, if any.
  • [@metadata][kafka][timestamp]: Timestamp in the Record. Depending on your broker

配置文件如下:

  • 重点在于filter中的 mutate 属性的使用
input {
        kafka {
            bootstrap_servers => "127.0.0.1:9092"
            topics => ["test"]
            group_id => "test"
            #如果使用元数据就不能使用下面的byte字节序列化,否则会报错
            #key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
            #value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
            consumer_threads => 1
            #默认为false,只有为true的时候才会获取到元数据
			decorate_events => true
			auto_offset_reset => "earliest"
         }
}
filter {
	mutate {
		#从kafka的key中获取数据并按照逗号切割
		split => ["[@metadata][kafka][key]", ","]
		add_field => {
			#将切割后的第一位数据放入自定义的“index”字段中
			"index" => "%{[@metadata][kafka][key][0]}"
		}
	}
}
output {
   elasticsearch {
          user => elastic
          password => changeme
          pool_max => 1000
          pool_max_per_route => 200
          hosts => ["127.0.0.1:9200"]
          index => "test-%{+YYYY.MM.dd}"
   }
    stdout {
        codec => rubydebug
    }
}
0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区