B's Corner

Let's make an AMQP consumer

· brandon lee

From a high level, the AMQP consumer shares the same basic design as the Kinesis consumer but instead of streams and shards we now have queues and receivers. Since the AMQP protocol maintains an active connection that can be severed from either end, we need to make sure we are diligent in managing the link and re-establish connection when the link is broken.

In order to not complicate the code and keep things simple and elegant, I opted to keep the goroutines tightly coupled so any broken link will trigger a graceful shutdown of all receivers. After shutdown, Kubernetes ensures the app is brought back up quickly, where connections to all queues can re-established.

 // create channels to listen for OS interrupts & AMQP Link errors
 chSig := make(chan os.Signal, 1)
 chLink := make(chan int)
 signal.Notify(chSig, os.Interrupt, syscall.SIGTERM)
 go func() {
  var once sync.Once
  for {
   select {
   case <-chSig:
    log.Println("OS INTERRUPT: notifying all goroutines to stop")
    cancel()
   case <-chLink:
    once.Do(func() {
     log.Println("LINK CLOSED: notifying all remaining receivers to close")
     cancel()
    })
   }
  }
 }()

Here, the chLink channel is used by all receivers to notify the main routine of a closed link. Once a link has been closed, we signal the remaining receivers to shutdown by calling cancel() on the context.