package initialize import ( "nCovTrack-Backend/global" "nCovTrack-Backend/service/notify" "time" "github.com/Shopify/sarama" ) func initProducer() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true config.Producer.Return.Errors = true producer, err := sarama.NewSyncProducer(global.ServerSettings.Kafka.Servers, config) if err != nil { panic("kafka init failed") } // defer producer.Close() for { select { case value := <-global.KafkaProducerChan: producer.SendMessage(&sarama.ProducerMessage{Topic: global.ServerSettings.Kafka.Topic, Key: nil, Value: sarama.ByteEncoder(value)}) } } } func initConsumer() { config := sarama.NewConfig() config.Consumer.Offsets.CommitInterval = 1 * time.Second client, err := sarama.NewClient(global.ServerSettings.Kafka.Servers, config) if err != nil { panic("kafka init failed") } defer client.Close() offsetManager, err := sarama.NewOffsetManagerFromClient("", client) if err != nil { panic("kafka init failed") } defer offsetManager.Close() partitionOffsetManager, err := offsetManager.ManagePartition(global.ServerSettings.Kafka.Topic, int32(global.ServerSettings.Kafka.Partition)) if err != nil { panic("kafka init failed") } defer partitionOffsetManager.Close() offsetNewest, _ := partitionOffsetManager.NextOffset() consumer, err := sarama.NewConsumer(global.ServerSettings.Kafka.Servers, config) if err != nil { panic("kafka init failed") } defer consumer.Close() partitionConsumer, err := consumer.ConsumePartition(global.ServerSettings.Kafka.Topic, int32(global.ServerSettings.Kafka.Partition), offsetNewest) if err != nil { panic("kafka init failed") } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): ConsumeFunc(msg.Value) } } } func ConsumeFunc(msg []byte) { notify.HandleKafkaNotify(msg) }