专业的编程技术博客社区

网站首页 > 博客文章 正文

filebeat改造支持rocketmq(filebeat集群配置)

baijin 2025-07-21 12:34:52 博客文章 6 ℃ 0 评论

继续分享下以前在gitchat上发布的文章:filebeat改造支持rocketmq

1.概述

1.1问题概述

现在越来越多的日志采集使用 FileBeat,FileBeat 是个轻量型日志采集器,采用 Go 语言实现,性能稳健,占用资源少。FileBeat 现在支持采集的日志内容发送到 Redis、Elasticsearch、Kafka、Logstash。

那么我们如果想通过 FileBeat 采集日志到 RocketMQ 怎么办呢?不好意思,官方现在并不支持, 搜索下,也没有现成的实现。

让我给大家介绍下如何用 FileBeat 源码实现自己的 output.rocketmq。

在本场 Chat 中,会讲到如下内容:

  • 如何基于 Beat 源码,实现自己的 output
  • 实现输出到 RocketMQ的 output

适合人群: 对 ELK、FileBeat、RocketMQ 日志采集感感兴趣的技术人员

1.2 名词解析

名词

解释

备注

ELK

ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。


Rocketmq

高性能消息队列,采用java实现


Beats

Beats是ELK Stack技术栈中负责单一用途数据采集并推送给Logstash或Elasticsearch的轻量级产品。


Filebeat

Beats中轻量型日志采集器


2. 设计

2.1 filebeat介绍

Filebeat 简化了常见格式的日志的收集,支持采集的日志内容发送到 Redis、Elasticsearch、Kafka、Logstash中。

2.2 filebeat+rocketmq实际需求

实际项目有这样的需求,想利用现有的定制日志,采集内容到rocketmq,由rocketmq慢慢消费存到业务表中。

这样的好处,能利用现有的rocketmq集群,业务系统的请求性能提高,日志异步filebeat到rocketmq处理。

3.实现

让我们开始实现filebeat和rocketmq的集成。

3.1 准备工作

当前代码实现是基于beats源码 7.10.0实现的。
大家可以点击就行下载beats7.10.0
别的版本未经验证,有兴趣的读者需要自行测试验证。

3.2 output实现

a. 按beats规范,创建代码目录

## 解压下载的源码
cd beats-7.10.0/libbeat/outputs/
mkdir rocketmq  ##目录名还是直接用rocketmq命名了

b. 按beats代码规范在rocketmq目录下增加config.go

package rocketmq

import "github.com/elastic/beats/v7/libbeat/outputs/codec"

type Config struct {
        Codec codec.Config `config:"codec"`
        Host  string       `config:"host"`
        Topic string       `config:"topic"`
}

var defaultConfig = Config{
  			Host:  "",  //rocketmq的host和port,比如192.168.10.100:9876
        Topic: "",  //rocketmq的topic
}

host和topic的定义是为了获取配置文件里配置的rocketmq地址和topic

c. 代码实现

package rocketmq

import (
	"context"
	"fmt"
	"os"
	"runtime"
	"strings"
	"github.com/elastic/beats/v7/libbeat/beat"
	"github.com/elastic/beats/v7/libbeat/common"
	"github.com/elastic/beats/v7/libbeat/logp"
	"github.com/elastic/beats/v7/libbeat/outputs"
	"github.com/elastic/beats/v7/libbeat/outputs/codec"
	"github.com/elastic/beats/v7/libbeat/publisher"
)

type rocketmqOutput struct {
	log        *logp.Logger
	out        *os.File
	observer   outputs.Observer
	index      string
	codec      codec.Codec
	host       string
	topic      string
	mq 	       *RocketMq
}

func init() {//初始化,把我们定义的rocketmq struct注册进来
	outputs.RegisterType("rocketmq", makerocketmq)
}

func makerocketmq(
	_ outputs.IndexManager,
	beat beat.Info,
	observer outputs.Observer,
	cfg *common.Config,
) (outputs.Group, error) {
	config := defaultConfig
	err := cfg.Unpack(&config)
	if err != nil {
		return outputs.Fail(err)
	}

	index := beat.Beat

	enc, err := codec.CreateEncoder(beat, config.Codec)

	//创建rocketmq struct, 传入配置文件的rocketmq host和topic进行保存。
	out := &rocketmqOutput{log: logp.NewLogger("rocketmq"), out: os.Stdout, observer: observer, index: index, codec: enc, host: config.Host, topic: config.Topic}

	arr:=strings.Split(config.Host,",")//针对rocketmq可能集群配置xxx:9876,xxxx:9876
	//rocketmq的生产者开始注册,其中group写死了= logByFilebeat,重新
	out.mq = RegisterRocketProducerMust(arr, "logByFilebeat", 1)

	// check stdout actually being available
	if runtime.GOOS != "windows" {
		if _, err = out.out.Stat(); err != nil {
			err = fmt.Errorf("rocketmq output initialization failed with: %v", err)
			return outputs.Fail(err)
		}
	}
	//没有大小限制=-1,不尝试重试=0
	return outputs.Success(-1, 0, out)
}

//关闭触发函数
func (c *rocketmqOutput) Close() error {
	if c.mq != nil {
		c.mq.Shutdown()
	}
	return nil
}
//有新的日志信息产生,会触发该函数
func (c *rocketmqOutput) Publish(_ context.Context, batch publisher.Batch) error {
	st := c.observer
	events := batch.Events()
	st.NewBatch(len(events))

	dropped := 0
	for i := range events {
		ok := c.publishEvent(&events[i])
		if !ok {
			dropped++
		}
	}

	batch.ACK()

	st.Dropped(dropped)
	st.Acked(len(events) - dropped)

	return nil
}

func (c *rocketmqOutput) publishEvent(event *publisher.Event) bool {

	serializedEvent, err := c.codec.Encode(c.index, &event.Content)
	if err != nil {
		if !event.Guaranteed() {
			return false
		}
		c.log.Errorf("Unable to encode event: %+v", err)
		c.log.Debugf("Failed event: %v", event)
		return false
	}

	c.observer.WriteBytes(len(serializedEvent) + 1)

	//判断生产者是否为空,如果为空重新初始化注册到rocketmq中
	if c.mq.isShutdown() == true {
        arr:=strings.Split(c.host,",")
		c.mq = RegisterRocketProducerMust(arr, "logByFilebeat", 1)
	}
	//新增日志内容
	str := string(serializedEvent)
	//发送内容到rocketmq
	msg, err := c.mq.SendMsg(c.topic, str)
	c.log.Debug("msg:%v", msg)

	if err != nil {
		c.log.Errorf("send to rocketmq  is error %+v", err)
		return false
	}
	return true
}

//接口规范
func (c *rocketmqOutput) String() string {
	return "rocketmq"
}
  1. 具体信息都在代码里增加了注释。
  2. 代码发送rocketmq底层采用了 rocketmq-client,并采用了orange框架里的rocketmq封装,该框架包转的很好用,但是有几点和实际需求不满足的地方。
  3. a. 当前不需要mq的消费者实现。
  4. b. 需要判断生产者是否为空,如果为空需要重新注册生产者
  5. 对此从新改造了下,改造的代码存放rocketmq目录下的queue.go中,具体见代码:
   package rocketmq
   
   import (
   	"context"
   	"errors"
   	"fmt"
   	"time"
   	"github.com/apache/rocketmq-client-go/v2"
   	"github.com/apache/rocketmq-client-go/v2/primitive"
   	"github.com/apache/rocketmq-client-go/v2/producer"
   )
   
   const (
   	_ = iota
   	SendMsg
   )
   
   type MqProducer interface {
   	SendMsg(topic string, body string) (mqMsg MqMsg, err error)
   	SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error)
   	isShutdown() (b bool, err error)
   	Shutdown() (err error)
   }
   
   type MqMsg struct {
   	RunType   int       `json:"run_type"`
   	Topic     string    `json:"topic"`
   	MsgId     string    `json:"msg_id"`
   	Offset    int64     `json:"offset"`
   	Partition int32     `json:"partition"`
   	Timestamp time.Time `json:"timestamp"`
   
   	Body []byte `json:"body"`
   }
   
   type RocketMq struct {
   	endPoints   []string
   	producerIns rocketmq.Producer
   	consumerIns rocketmq.PushConsumer
   }
   
   // RegisterRocketProducerMust 注册并启动生产者接口实现
   func RegisterRocketProducerMust(endPoints []string, groupName string, retry int) (client *RocketMq) {
   	var err error
   	client, err = RegisterRocketMqProducer(endPoints, groupName, retry)
   	if err != nil {
   		panic(err)
   	}
   	return client
   }
   // 是否生产者为空
   func (r *RocketMq) isShutdown() (b bool) {
   	if r.producerIns == nil {
   		return true
   	} else {
   		return false
   	}
   }
   // 关闭生产者
   func (r *RocketMq) Shutdown() {
   	if r.producerIns != nil {
   		r.producerIns.Shutdown()
   	} 
   }
   
   // SendMsg 按字符串类型生产数据
   func (r *RocketMq) SendMsg(topic string, body string) (mqMsg MqMsg, err error) {
   	return r.SendByteMsg(topic, []byte(body))
   }
   
   // SendByteMsg 生产数据
   func (r *RocketMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error) {
   	if r.producerIns == nil {
   		return mqMsg, errors.New("RocketMq producer not register")
   	}
   
   	result, err := r.producerIns.SendSync(context.Background(), &primitive.Message{
   		Topic: topic,
   		Body:  body,
   	})
   
   	if err != nil {
   		return
   	}
   	if result.Status != primitive.SendOK {
   		return mqMsg, errors.New(fmt.Sprintf("RocketMq producer send msg error status:%v", result.Status))
   	}
   
   	mqMsg = MqMsg{
   		RunType: SendMsg,
   		Topic:   topic,
   		MsgId:   result.MsgID,
   		Body:    body,
   	}
   	return mqMsg, nil
   }
   
   // RegisterRocketMqProducer 注册rocketmq生产者
   func RegisterRocketMqProducer(endPoints []string, groupName string, retry int) (mqIns *RocketMq, err error) {
   	addr, err := primitive.NewNamesrvAddr(endPoints...)
   	if err != nil {
   		return nil, err
   	}
   	mqIns = &RocketMq{
   		endPoints: endPoints,
   	}
   
   	if retry <= 0 {
   		retry = 0
   	}
   
   	mqIns.producerIns, err = rocketmq.NewProducer(
   		producer.WithNameServer(addr),
   		producer.WithRetry(retry),
   		producer.WithGroupName(groupName),
   	)
   
   	if err != nil {
   		return nil, err
   	}
   
   	err = mqIns.producerIns.Start()
   	if err != nil {
   		return nil, err
	}
   
   	return mqIns, nil
   }
   

3.3 注册output

开发好的output,需要在includes.go里注册才能使用。

  1. 编辑文件 beats-7.10.0/libbeat/publisher/includes/includes.go
  2. 在代码 github.com/elastic/beats/v7/libbeat/outputs/redis 下增加 github.com/elastic/beats/v7/libbeat/outputs/rocketmq

4.使用

4.1 命令行编译

  1. beats编译
  2. 由于https://proxy.golang.org代理非常容易超时。
  3. make编译之前输入命令
  4. go env -w GOPROXY=https://goproxy.cn
  5. 编译
  6. cd beats-7.10.0 make
  7. 此时会出现“Installing mage v1.10.0.” 这里需要花费一点时间.
  8. filebeat编译
  9. cd beates-7.10.0/filebeat make
  10. 正常编译通过的话,会在filebeat目录下生成filebeat。
  11. 不实用动态库
  12. 以上编译后默认采用动态库,比如glibc,实际linux服务器上会可能出现glibc不一致,导致无法运行。接近方案采用静态库。

go build编译时,CGO_ENABLED=1的,自动添加了一些动态库链接,所以编译时吧CGO_ENABLED=0就OK了;

    CGO_ENABLED=0 go build -a -ldflags '-extldflags "-static"' .
		make

4.2 配置

开始自己的output配置

output.rocketmq:
  host: ip:9876
  topic: topic名称

如果是rocketmq集群,可以通过逗号区分

output.rocketmq:
  host: ip1:9876,ip2:9876
  topic: topic名称

4.3 启动

./filebeat  -c 配置文件 -e

4.4 快速测试

4.4.1 写个java的消费者测试类

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.util.List;

public class RocketMQConsumer {
    public static void main(String[] args) throws MQClientException {
        //创建消费者
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("default");
        //设置NameServer地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //设置实例名称
        consumer.setInstanceName("consumer");
        //订阅topic
        consumer.subscribe("tbs_log_mq_topic_dev","*");//topic和配置文件对应
        //监听消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //获取消息
                for (MessageExt messageExt:list){
                    //RocketMQ由于是集群环境,所有产生的消息ID可能会重复
                    System.out.println(messageExt.getMsgId()+"---"+new String(messageExt.getBody()));
                }
                //接受消息状态 1.消费成功    2.消费失败   队列还有
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
        System.out.println("consumer Started!");
    }
}

4.4.2 filebeat测试配置

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /opt/dir/logs/*_mq.log
  fields:
     log_topic: trans_log
     level: debug
  multiline.pattern: ^\[
  multiline.negate: true
  multiline.match: after
filebeat.registry.path: /opt/dir/logs/filebeatrocketmq
output.rocketmq:
  host: 127.0.0.1:9876
  topic: tbs_log_mq_topic_dev

5.注意事项

前面提到的几个点,这边注意事项再强调下

  • 代理下载的问题 :go env -w GOPROXY=https://goproxy.cn。
  • 不实用系统的glibc: CGO_ENABLED=0 go build -a -ldflags ‘-extldflags “-static”’ 。
  • 测试配置文件参数filebeat.registry.path 对应目录创建,filebeat保存数据的目录。
  • filebeat测试配置,设置multiline.pattern: ^[ ,匹配是否多行。
  • 支持mq重启后,继续自动发送。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表