go与kafka(github.com/Shopify/sarama)

论坛 期权论坛 脚本     
匿名技术用户   2020-12-21 16:25   42   0

启动kafka

分别命令行启动zookeeper和kafka

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties

producer

import (
 "fmt"
 "github.com/Shopify/sarama"
)

func main() {
 config := sarama.NewConfig()
 config.Producer.RequiredAcks = sarama.WaitForAll  //赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。
 config.Producer.Partitioner = sarama.NewRandomPartitioner  //写到随机分区中,默认设置8个分区
 config.Producer.Return.Successes = true
 msg := &sarama.ProducerMessage{}
 msg.Topic = `nginx_log`
 msg.Value = sarama.StringEncoder("this is a good test")
 client,err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"},config)
 if err != nil{
  fmt.Println("producer close err, ",err)
  return
 }
 defer client.Close()
 pid, offset, err := client.SendMessage(msg)

 if err != nil{
  fmt.Println("send message failed, ", err)
  return
 }
 fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)
 }

consumer

func main() {
 var wg sync.WaitGroup
 consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
 if err != nil{
  fmt.Println("Failed to start consumer: %s", err)
  return
 }
 partitionList, err := consumer.Partitions("nginx_log")  //获得该topic所有的分区
 if err != nil{
  fmt.Println("Failed to get the list of partition:, ",err)
  return
 }
 fmt.Println(partitionList)

 for partition := range partitionList{
  pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)
  if err != nil{
   fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)
   return
  }
                wg.Add(1)
  go func(sarama.PartitionConsumer) { //为每个分区开一个go协程去取值
   for msg := range pc.Messages(){  //阻塞直到有值发送过来,然后再继续等待
    fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key),string(msg.Value))
    }
                        defer pc.AsyncClose()
   wg.Done()
   }(pc)
 }
 wg.Wait()
 consumer.Close()
}

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP