Files
fallen-angle 9e3638885d finish
2022-05-16 19:55:59 +08:00

71 lines
1.9 KiB
Go

package initialize
import (
"nCovTrack-Backend/global"
"nCovTrack-Backend/service/notify"
"time"
"github.com/Shopify/sarama"
)
func initProducer() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
producer, err := sarama.NewSyncProducer(global.ServerSettings.Kafka.Servers, config)
if err != nil {
panic("kafka init failed")
}
// defer producer.Close()
for {
select {
case value := <-global.KafkaProducerChan:
producer.SendMessage(&sarama.ProducerMessage{Topic: global.ServerSettings.Kafka.Topic, Key: nil, Value: sarama.ByteEncoder(value)})
}
}
}
func initConsumer() {
config := sarama.NewConfig()
config.Consumer.Offsets.CommitInterval = 1 * time.Second
client, err := sarama.NewClient(global.ServerSettings.Kafka.Servers, config)
if err != nil {
panic("kafka init failed")
}
defer client.Close()
offsetManager, err := sarama.NewOffsetManagerFromClient("", client)
if err != nil {
panic("kafka init failed")
}
defer offsetManager.Close()
partitionOffsetManager, err := offsetManager.ManagePartition(global.ServerSettings.Kafka.Topic, int32(global.ServerSettings.Kafka.Partition))
if err != nil {
panic("kafka init failed")
}
defer partitionOffsetManager.Close()
offsetNewest, _ := partitionOffsetManager.NextOffset()
consumer, err := sarama.NewConsumer(global.ServerSettings.Kafka.Servers, config)
if err != nil {
panic("kafka init failed")
}
defer consumer.Close()
partitionConsumer, err := consumer.ConsumePartition(global.ServerSettings.Kafka.Topic, int32(global.ServerSettings.Kafka.Partition), offsetNewest)
if err != nil {
panic("kafka init failed")
}
defer partitionConsumer.Close()
for {
select {
case msg := <-partitionConsumer.Messages():
ConsumeFunc(msg.Value)
}
}
}
func ConsumeFunc(msg []byte) {
notify.HandleKafkaNotify(msg)
}