网站首页 > 博客文章 正文
继续分享下以前在gitchat上发布的文章:filebeat改造支持rocketmq
1.概述
1.1问题概述
现在越来越多的日志采集使用 FileBeat,FileBeat 是个轻量型日志采集器,采用 Go 语言实现,性能稳健,占用资源少。FileBeat 现在支持采集的日志内容发送到 Redis、Elasticsearch、Kafka、Logstash。
那么我们如果想通过 FileBeat 采集日志到 RocketMQ 怎么办呢?不好意思,官方现在并不支持, 搜索下,也没有现成的实现。
让我给大家介绍下如何用 FileBeat 源码实现自己的 output.rocketmq。
在本场 Chat 中,会讲到如下内容:
- 如何基于 Beat 源码,实现自己的 output
- 实现输出到 RocketMQ的 output
适合人群: 对 ELK、FileBeat、RocketMQ 日志采集感感兴趣的技术人员
1.2 名词解析
名词 | 解释 | 备注 |
ELK | ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。 | |
Rocketmq | 高性能消息队列,采用java实现 | |
Beats | Beats是ELK Stack技术栈中负责单一用途数据采集并推送给Logstash或Elasticsearch的轻量级产品。 | |
Filebeat | Beats中轻量型日志采集器 |
2. 设计
2.1 filebeat介绍
Filebeat 简化了常见格式的日志的收集,支持采集的日志内容发送到 Redis、Elasticsearch、Kafka、Logstash中。
2.2 filebeat+rocketmq实际需求
实际项目有这样的需求,想利用现有的定制日志,采集内容到rocketmq,由rocketmq慢慢消费存到业务表中。
这样的好处,能利用现有的rocketmq集群,业务系统的请求性能提高,日志异步filebeat到rocketmq处理。
3.实现
让我们开始实现filebeat和rocketmq的集成。
3.1 准备工作
当前代码实现是基于beats源码 7.10.0实现的。
大家可以点击就行下载beats7.10.0
别的版本未经验证,有兴趣的读者需要自行测试验证。
3.2 output实现
a. 按beats规范,创建代码目录
## 解压下载的源码
cd beats-7.10.0/libbeat/outputs/
mkdir rocketmq ##目录名还是直接用rocketmq命名了
b. 按beats代码规范在rocketmq目录下增加config.go
package rocketmq
import "github.com/elastic/beats/v7/libbeat/outputs/codec"
type Config struct {
Codec codec.Config `config:"codec"`
Host string `config:"host"`
Topic string `config:"topic"`
}
var defaultConfig = Config{
Host: "", //rocketmq的host和port,比如192.168.10.100:9876
Topic: "", //rocketmq的topic
}
host和topic的定义是为了获取配置文件里配置的rocketmq地址和topic
c. 代码实现
package rocketmq
import (
"context"
"fmt"
"os"
"runtime"
"strings"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/beats/v7/libbeat/publisher"
)
type rocketmqOutput struct {
log *logp.Logger
out *os.File
observer outputs.Observer
index string
codec codec.Codec
host string
topic string
mq *RocketMq
}
func init() {//初始化,把我们定义的rocketmq struct注册进来
outputs.RegisterType("rocketmq", makerocketmq)
}
func makerocketmq(
_ outputs.IndexManager,
beat beat.Info,
observer outputs.Observer,
cfg *common.Config,
) (outputs.Group, error) {
config := defaultConfig
err := cfg.Unpack(&config)
if err != nil {
return outputs.Fail(err)
}
index := beat.Beat
enc, err := codec.CreateEncoder(beat, config.Codec)
//创建rocketmq struct, 传入配置文件的rocketmq host和topic进行保存。
out := &rocketmqOutput{log: logp.NewLogger("rocketmq"), out: os.Stdout, observer: observer, index: index, codec: enc, host: config.Host, topic: config.Topic}
arr:=strings.Split(config.Host,",")//针对rocketmq可能集群配置xxx:9876,xxxx:9876
//rocketmq的生产者开始注册,其中group写死了= logByFilebeat,重新
out.mq = RegisterRocketProducerMust(arr, "logByFilebeat", 1)
// check stdout actually being available
if runtime.GOOS != "windows" {
if _, err = out.out.Stat(); err != nil {
err = fmt.Errorf("rocketmq output initialization failed with: %v", err)
return outputs.Fail(err)
}
}
//没有大小限制=-1,不尝试重试=0
return outputs.Success(-1, 0, out)
}
//关闭触发函数
func (c *rocketmqOutput) Close() error {
if c.mq != nil {
c.mq.Shutdown()
}
return nil
}
//有新的日志信息产生,会触发该函数
func (c *rocketmqOutput) Publish(_ context.Context, batch publisher.Batch) error {
st := c.observer
events := batch.Events()
st.NewBatch(len(events))
dropped := 0
for i := range events {
ok := c.publishEvent(&events[i])
if !ok {
dropped++
}
}
batch.ACK()
st.Dropped(dropped)
st.Acked(len(events) - dropped)
return nil
}
func (c *rocketmqOutput) publishEvent(event *publisher.Event) bool {
serializedEvent, err := c.codec.Encode(c.index, &event.Content)
if err != nil {
if !event.Guaranteed() {
return false
}
c.log.Errorf("Unable to encode event: %+v", err)
c.log.Debugf("Failed event: %v", event)
return false
}
c.observer.WriteBytes(len(serializedEvent) + 1)
//判断生产者是否为空,如果为空重新初始化注册到rocketmq中
if c.mq.isShutdown() == true {
arr:=strings.Split(c.host,",")
c.mq = RegisterRocketProducerMust(arr, "logByFilebeat", 1)
}
//新增日志内容
str := string(serializedEvent)
//发送内容到rocketmq
msg, err := c.mq.SendMsg(c.topic, str)
c.log.Debug("msg:%v", msg)
if err != nil {
c.log.Errorf("send to rocketmq is error %+v", err)
return false
}
return true
}
//接口规范
func (c *rocketmqOutput) String() string {
return "rocketmq"
}
- 具体信息都在代码里增加了注释。
- 代码发送rocketmq底层采用了 rocketmq-client,并采用了orange框架里的rocketmq封装,该框架包转的很好用,但是有几点和实际需求不满足的地方。
- a. 当前不需要mq的消费者实现。
- b. 需要判断生产者是否为空,如果为空需要重新注册生产者
- 对此从新改造了下,改造的代码存放rocketmq目录下的queue.go中,具体见代码:
package rocketmq
import (
"context"
"errors"
"fmt"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
const (
_ = iota
SendMsg
)
type MqProducer interface {
SendMsg(topic string, body string) (mqMsg MqMsg, err error)
SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error)
isShutdown() (b bool, err error)
Shutdown() (err error)
}
type MqMsg struct {
RunType int `json:"run_type"`
Topic string `json:"topic"`
MsgId string `json:"msg_id"`
Offset int64 `json:"offset"`
Partition int32 `json:"partition"`
Timestamp time.Time `json:"timestamp"`
Body []byte `json:"body"`
}
type RocketMq struct {
endPoints []string
producerIns rocketmq.Producer
consumerIns rocketmq.PushConsumer
}
// RegisterRocketProducerMust 注册并启动生产者接口实现
func RegisterRocketProducerMust(endPoints []string, groupName string, retry int) (client *RocketMq) {
var err error
client, err = RegisterRocketMqProducer(endPoints, groupName, retry)
if err != nil {
panic(err)
}
return client
}
// 是否生产者为空
func (r *RocketMq) isShutdown() (b bool) {
if r.producerIns == nil {
return true
} else {
return false
}
}
// 关闭生产者
func (r *RocketMq) Shutdown() {
if r.producerIns != nil {
r.producerIns.Shutdown()
}
}
// SendMsg 按字符串类型生产数据
func (r *RocketMq) SendMsg(topic string, body string) (mqMsg MqMsg, err error) {
return r.SendByteMsg(topic, []byte(body))
}
// SendByteMsg 生产数据
func (r *RocketMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error) {
if r.producerIns == nil {
return mqMsg, errors.New("RocketMq producer not register")
}
result, err := r.producerIns.SendSync(context.Background(), &primitive.Message{
Topic: topic,
Body: body,
})
if err != nil {
return
}
if result.Status != primitive.SendOK {
return mqMsg, errors.New(fmt.Sprintf("RocketMq producer send msg error status:%v", result.Status))
}
mqMsg = MqMsg{
RunType: SendMsg,
Topic: topic,
MsgId: result.MsgID,
Body: body,
}
return mqMsg, nil
}
// RegisterRocketMqProducer 注册rocketmq生产者
func RegisterRocketMqProducer(endPoints []string, groupName string, retry int) (mqIns *RocketMq, err error) {
addr, err := primitive.NewNamesrvAddr(endPoints...)
if err != nil {
return nil, err
}
mqIns = &RocketMq{
endPoints: endPoints,
}
if retry <= 0 {
retry = 0
}
mqIns.producerIns, err = rocketmq.NewProducer(
producer.WithNameServer(addr),
producer.WithRetry(retry),
producer.WithGroupName(groupName),
)
if err != nil {
return nil, err
}
err = mqIns.producerIns.Start()
if err != nil {
return nil, err
}
return mqIns, nil
}
3.3 注册output
开发好的output,需要在includes.go里注册才能使用。
- 编辑文件 beats-7.10.0/libbeat/publisher/includes/includes.go
- 在代码 github.com/elastic/beats/v7/libbeat/outputs/redis 下增加 github.com/elastic/beats/v7/libbeat/outputs/rocketmq
4.使用
4.1 命令行编译
- beats编译
- 由于https://proxy.golang.org代理非常容易超时。
- make编译之前输入命令
- go env -w GOPROXY=https://goproxy.cn
- 编译
- cd beats-7.10.0 make
- 此时会出现“Installing mage v1.10.0.” 这里需要花费一点时间.
- filebeat编译
- cd beates-7.10.0/filebeat make
- 正常编译通过的话,会在filebeat目录下生成filebeat。
- 不实用动态库
- 以上编译后默认采用动态库,比如glibc,实际linux服务器上会可能出现glibc不一致,导致无法运行。接近方案采用静态库。
go build编译时,CGO_ENABLED=1的,自动添加了一些动态库链接,所以编译时吧CGO_ENABLED=0就OK了;
CGO_ENABLED=0 go build -a -ldflags '-extldflags "-static"' .
make
4.2 配置
开始自己的output配置
output.rocketmq:
host: ip:9876
topic: topic名称
如果是rocketmq集群,可以通过逗号区分
output.rocketmq:
host: ip1:9876,ip2:9876
topic: topic名称
4.3 启动
./filebeat -c 配置文件 -e
4.4 快速测试
4.4.1 写个java的消费者测试类
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketMQConsumer {
public static void main(String[] args) throws MQClientException {
//创建消费者
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("default");
//设置NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置实例名称
consumer.setInstanceName("consumer");
//订阅topic
consumer.subscribe("tbs_log_mq_topic_dev","*");//topic和配置文件对应
//监听消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//获取消息
for (MessageExt messageExt:list){
//RocketMQ由于是集群环境,所有产生的消息ID可能会重复
System.out.println(messageExt.getMsgId()+"---"+new String(messageExt.getBody()));
}
//接受消息状态 1.消费成功 2.消费失败 队列还有
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
System.out.println("consumer Started!");
}
}
4.4.2 filebeat测试配置
filebeat.inputs:
- type: log
enabled: true
paths:
- /opt/dir/logs/*_mq.log
fields:
log_topic: trans_log
level: debug
multiline.pattern: ^\[
multiline.negate: true
multiline.match: after
filebeat.registry.path: /opt/dir/logs/filebeatrocketmq
output.rocketmq:
host: 127.0.0.1:9876
topic: tbs_log_mq_topic_dev
5.注意事项
前面提到的几个点,这边注意事项再强调下
- 代理下载的问题 :go env -w GOPROXY=https://goproxy.cn。
- 不实用系统的glibc: CGO_ENABLED=0 go build -a -ldflags ‘-extldflags “-static”’ 。
- 测试配置文件参数filebeat.registry.path 对应目录创建,filebeat保存数据的目录。
- filebeat测试配置,设置multiline.pattern: ^[ ,匹配是否多行。
- 支持mq重启后,继续自动发送。
猜你喜欢
- 2025-07-21 开源|一款类excel报表设计系统,支持拖拽式和word模板设计
- 2025-07-21 SpringBoot利用ThreadPoolTaskExecutor批量插入百万级数据实测!
- 2025-07-21 云端藏经阁:一款开源、精美、可独立部署的知识管理神器
- 2025-07-21 电商秒杀/库存扣减:基于JUC的并发控制实战案例
- 2025-07-21 简单易用的.NET免费开源RabbitMQ操作组件EasyNetQ
- 2025-07-21 亿级分库分表,如何丝滑扩容、如何双写灰度
- 2025-07-21 使用mq实现分布式事务-补偿事务一致性
- 2025-07-21 【RocketMQ】消息的拉取(rocketmq消息大小)
- 2025-07-21 RocketMQ消息消费-客户端拉取消息前的准备工作
- 2025-07-21 RocketMQ消费限流的几种方式(rocketmq并发消费与顺序消费)
你 发表评论:
欢迎- 最近发表
-
- 谷歌云推出印度尼西亚“BerdAIa for Security”网络安全计划
- 谷歌:已解决全球服务中断问题,受影响平台涉及Spotify、Discord等
- 不再单一依赖英伟达,OpenAI被曝开始租用谷歌AI芯片训练ChatGPT
- 谷歌云代理商:怎样通过谷歌云服务器搭建社交平台?
- 谷歌云服务遭遇全球性宕机,影响多家互联网巨头
- OpenAI正式将谷歌云纳入供应商名单
- 谷歌给Agent造了个“微信”,和MCP功能互补,多智能体协作更顺畅了
- OpenAI“去微软化”加速:最新引入谷歌(GOOGL.US)构建混合云生态
- 谷歌给Agent造了个“微信”,和MCP功能互补,多智能体协作更顺畅
- 谷歌与OpenAI携手:云合作背后的机遇与隐忧
- 标签列表
-
- ifneq (61)
- 字符串长度在线 (61)
- googlecloud (64)
- flutterrun (59)
- 系统设计图 (58)
- powershellfor (73)
- messagesource (71)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- qcombobox样式表 (68)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)