思考并回答以下问题:
Docker安装Kafka
拉取Zookeeper镜像1
docker pull zookeeper
创建网络1
docker network create beyond --driver bridge
创建Zookeeper容器1
docker run -d --name zookeeper --network beyond -p 2181:2181 -t zookeeper
拉取Kafka镜像1
docker pull wurstmeister/kafka
创建kafka容器1
docker run -d --name kafka --network beyond -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
进入kafka容器1
docker exec -it {{容器ID}} /bin/bash
进入kafka执行命令目录1
cd /opt/kafka/bin
创建topic1
./kafka-topics.sh --create --topic topic-beyond-like --bootstrap-server localhost:9092
查看topic信息1
./kafka-topics.sh --describe --topic topic-beyond-like --bootstrap-server localhost:9092
生产消息1
./kafka-console-producer.sh --topic topic-beyond-like --bootstrap-server localhost:9092
消费消息1
./kafka-console-consumer.sh --topic topic-beyond-like --from-beginning --bootstrap-server localhost:9092
生产者
配置1
2
3
4KqPusherConf:
Brokers:
- 127.0.0.1:9092
Topic: topic-beyond-like
初始化1
2
3
4
5
6
7
8
9
10
11type ServiceContext struct {
Config config.Config
KqPusherClient *kq.Pusher
}
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
KqPusherClient: kq.NewPusher(c.KqPusherConf.Brokers, c.KqPusherConf.Topic),
}
}
发送Kafka消息1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27func (l *ThumbupLogic) Thumbup(in *service.ThumbupRequest) (*service.ThumbupResponse, error) {
// TODO 逻辑暂时忽略
// 1. 查询是否点过赞
// 2. 计算当前内容的总点赞数和点踩数
msg := &types.ThumbupMsg{
BizId: in.BizId,
ObjId: in.ObjId,
UserId: in.UserId,
LikeType: in.LikeType,
}
// 发送kafka消息,异步
threading.GoSafe(func() {
data, err := json.Marshal(msg)
if err != nil {
l.Logger.Errorf("[Thumbup] marshal msg: %+v error: %v", msg, err)
return
}
err = l.svcCtx.KqPusherClient.Push(string(data))
if err != nil {
l.Logger.Errorf("[Thumbup] kq push data: %s error: %v", data, err)
}
})
return &service.ThumbupResponse{}, nil
}
消费者
配置1
2
3
4
5
6
7
8
9
10Name: mq
KqConsumerConf:
Name: like-kq-consumer
Brokers:
- 127.0.1:9092
Group: group-beyond-like
Topic: topic-beyond-like
Offset: last
Consumers: 1
Processors: 1
消费kafka消息1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24type ThumbupLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewThumbupLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ThumbupLogic {
return &ThumbupLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
func (l *ThumbupLogic) Consume(key, val string) error {
fmt.Printf("get key: %s val: %s\n", key, val)
return nil
}
func Consumers(ctx context.Context, svcCtx *svc.ServiceContext) []service.Service {
return []service.Service{
kq.MustNewQueue(svcCtx.Config.KqConsumerConf, NewThumbupLogic(ctx, svcCtx)),
}
}
grpcurl工具
启动反射服务1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
ctx := svc.NewServiceContext(c)
s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
service.RegisterLikeServer(grpcServer, server.NewLikeServer(ctx))
if c.Mode == zs.DevMode || c.Mode == zs.TestMode {
reflection.Register(grpcServer)
}
})
defer s.Stop()
fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
s.Start()
}
配置1
Mode: test
安装1
go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest
查看服务列表1
grpcurl -plaintext 127.0.0.1:8080 list
查看服务方法列表1
grpcurl -plaintext 127.0.0.1:8080 list service.Like
调用方法1
grpcurl -plaintext -d '{"bizId": "article", "objId": 123, "userId": 234, "likeType": 1}' 127.0.0.1:8080 service.Like/Thumbup