在go-zero中使用Kafka

思考并回答以下问题:

Docker安装Kafka

拉取Zookeeper镜像

1
docker pull zookeeper

创建网络
1
docker network create beyond --driver bridge

创建Zookeeper容器
1
docker run -d --name zookeeper --network beyond -p 2181:2181 -t zookeeper

拉取Kafka镜像
1
docker pull wurstmeister/kafka

创建kafka容器
1
docker run -d --name kafka --network beyond -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

进入kafka容器
1
docker exec -it {{容器ID}} /bin/bash

进入kafka执行命令目录
1
cd /opt/kafka/bin

创建topic
1
./kafka-topics.sh --create --topic topic-beyond-like --bootstrap-server localhost:9092

查看topic信息
1
./kafka-topics.sh --describe --topic topic-beyond-like --bootstrap-server localhost:9092

生产消息
1
./kafka-console-producer.sh --topic topic-beyond-like --bootstrap-server localhost:9092

消费消息
1
./kafka-console-consumer.sh --topic topic-beyond-like --from-beginning --bootstrap-server localhost:9092

生产者

配置

1
2
3
4
KqPusherConf:
Brokers:
- 127.0.0.1:9092
Topic: topic-beyond-like

初始化
1
2
3
4
5
6
7
8
9
10
11
type ServiceContext struct {
Config config.Config
KqPusherClient *kq.Pusher
}

func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
KqPusherClient: kq.NewPusher(c.KqPusherConf.Brokers, c.KqPusherConf.Topic),
}
}

发送Kafka消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (l *ThumbupLogic) Thumbup(in *service.ThumbupRequest) (*service.ThumbupResponse, error) {
// TODO 逻辑暂时忽略
// 1. 查询是否点过赞
// 2. 计算当前内容的总点赞数和点踩数

msg := &types.ThumbupMsg{
BizId: in.BizId,
ObjId: in.ObjId,
UserId: in.UserId,
LikeType: in.LikeType,
}
// 发送kafka消息,异步
threading.GoSafe(func() {
data, err := json.Marshal(msg)
if err != nil {
l.Logger.Errorf("[Thumbup] marshal msg: %+v error: %v", msg, err)
return
}
err = l.svcCtx.KqPusherClient.Push(string(data))
if err != nil {
l.Logger.Errorf("[Thumbup] kq push data: %s error: %v", data, err)
}

})

return &service.ThumbupResponse{}, nil
}

消费者

配置

1
2
3
4
5
6
7
8
9
10
Name: mq
KqConsumerConf:
Name: like-kq-consumer
Brokers:
- 127.0.1:9092
Group: group-beyond-like
Topic: topic-beyond-like
Offset: last
Consumers: 1
Processors: 1

消费kafka消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type ThumbupLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}

func NewThumbupLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ThumbupLogic {
return &ThumbupLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}

func (l *ThumbupLogic) Consume(key, val string) error {
fmt.Printf("get key: %s val: %s\n", key, val)
return nil
}

func Consumers(ctx context.Context, svcCtx *svc.ServiceContext) []service.Service {
return []service.Service{
kq.MustNewQueue(svcCtx.Config.KqConsumerConf, NewThumbupLogic(ctx, svcCtx)),
}
}

grpcurl工具

启动反射服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
flag.Parse()

var c config.Config
conf.MustLoad(*configFile, &c)
ctx := svc.NewServiceContext(c)

s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
service.RegisterLikeServer(grpcServer, server.NewLikeServer(ctx))

if c.Mode == zs.DevMode || c.Mode == zs.TestMode {
reflection.Register(grpcServer)
}
})
defer s.Stop()

fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
s.Start()
}

配置
1
Mode: test

安装
1
go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest

查看服务列表
1
grpcurl -plaintext 127.0.0.1:8080 list

查看服务方法列表
1
grpcurl -plaintext 127.0.0.1:8080 list service.Like

调用方法
1
grpcurl -plaintext -d '{"bizId": "article", "objId": 123, "userId": 234, "likeType": 1}' 127.0.0.1:8080 service.Like/Thumbup

0%