Profile picture

Based in Austin, TX, Brandon Thompson is the Senior Director of Engineering at Conversion Logix where he leads the technological vision and delivery of The Conversion Cloud along with a growing suite of both internal and customer-facing applications.

PubSub Pull Subscriptions on Google Cloud Run with NestJS

May 08, 2023

PubSub Pull Subscriptions on Google Cloud Run with NestJS

I needed to implement a queueing solution for my enterprise notification services that live mostly in Google Cloud Platform. The requirements were relatively simple and GCP offers a couple services that fit some of these points in one way or another:

Client systems can drop a notification on to an asynchronous queue and trust that it will be delivered eventually:

✅ Cloud Tasks ✅ Pub/Sub Push ✅ Pub/Sub Pull

Only scale the notification service on request load, not queue depth:

✅ Cloud Tasks ❌ Pub/Sub Push ✅ Pub/Sub Pull

Handle variable payload sizes potentially in excess of 100k:

❌ Cloud Tasks ✅ Pub/Sub Push ✅ Pub/Sub Pull

The first one is easy. The client system doesn’t really care if it is pushing to a Pub/Sub or Cloud Tasks. It just wants to package up the notification, submit it and move on. And if queueing were the beginning and end of the requirements, then any of the 3 main options might suffice. It is the next bit that is slightly more tricky.

We do not want excess queue depth to contribute to concurrent load. That means for instance that we want the notification service to process 1 message per second (hypothetical numbers for the sake of simplicity) no matter how many messages are waiting to be sent. Remember, requirement #1 states that messages are “delivered eventually” — so the fact that something might wait in the queue for a few minutes it not a problem.

We also want to be able to handle any size of notification within reasonable limits. The question for each approach then becomes “what are those limits and are they reasonable?”

Push vs Pull

GCP Pub/Sub lets you configure your subscribers as either Push or Pull while Cloud Tasks are always Push.

Screengrab1

“Push” is handy because it does not require your application to do anything other than accept POST requests at a predetermined endpoint to handle the incoming messages. One big distinction between Pub/Sub and Cloud Tasks is what controls the flow of how they are processed.

Cloud Tasks gives explicit control (shown below) over the number of concurrent requests and requests per second.

Screengrab2

This allows us tight control over how our service scales in the event of a burst in queue load. It puts a predetermined constraint on the number of concurrent messages the service is expected to handle.

Pub/Sub with a Push subscriber does not give the same configuration options. It relies on the service itself to scale accordingly and reject messages that it cannot handle. So depending on the size of the queue and your “max instance” settings, it is possible for your service to scale up very quickly which potentially means 💰.

Given that we want to maintain tight controls on our service throughput, Cloud Tasks take the edge when using Push. Unfortunately, that means our message payload is limited to 100k. Even though in our case the average message size (shown below) hovers around the 5k mark, we have to allow for outliers in the 100k+ range. That alone means Cloud Tasks are out.

Screengrab3

Pub/Sub is much more generous in this regard with its 10mg limit. But since the concurrency constraints (or lack thereof) on the Push config won’t work for us, the only remaining option is to configure a Pull subscriber.

Configuring a Pub/Sub Pull Subscriber on Google Cloud Run

One of the great things about Cloud Run is that it can be configured to spin down to zero instances when idle. Another good thing is that you can allow your CPU to be idle (ie, no cost) when not serving a response. However, since a Pull subscription is a background process on the server, that option must be set as “CPU is always allocated” as shown below:

Screengrab4

Another quirk is the fact that all Cloud Run services are required to listen for requests, so a “worker-only” service with zero endpoints is not an option. Luckily, NestJS makes spinning up this exact type of hybrid service a breeze.

async function bootstrap() {
  const app = await NestFactory.create<NestExpressApplication>(AppModule);

  app.connectMicroservice<MicroserviceOptions>({
    strategy: new GCPubSubServer({
      subscription: 'send-message',
      subscriber: {
        flowControl: {
          maxMessages: 1
        }
      }
    })
  });

  app.connectMicroservice<MicroserviceOptions>({
    strategy: new GCPubSubServer({
      subscription: 'send-message-bulk',
      subscriber: {
        flowControl: {
          maxMessages: 1
        }
      }
    })
  });

  const PORT = Number(process.env.PORT) || 8087;
  await app.startAllMicroservices();
  await app.listen(PORT, '0.0.0.0');
}

The above connects to two separate topic subscriptions: the “send-message” queue for regular priority messages and the “send-message-bulk” queue for bulk.

In a controller, define an endpoint as shown below. Note that example below is extremely optimistic and you should implement your own message validation, parsing and ack according to your specific use case. For more details take a look at the docs for the nestjs-google-pubsub-microservice package.

    @MessagePattern(undefined)
    async getNotifications(@Ctx() context: GCPubSubContext) {
        const msg: NotificationMessage = JSON.parse(context.getMessage().data.toString());
        const mailMessage = this.payloadToMailMessage(msg.payload);
        await this.emailService.send(msg.sender_guid, mailMessage);
        originalMsg.ack();
    }

Cannot Pull If Instance Is Idle

The final piece depends on the load profile expected by the service. In our case, it is under near constant load serving webhook requests such that there is no 1-minute window without at least 1 request. So even with autoscaling options set to 0 minimum, there is always at least 1 instance running. That is important since the Pull subscriber is a background service and messages will not trigger a scaling event. That means if your service is not handling HTTP traffic sufficient to scale up to at least 1 instance…your queue might accumulate messages since there is no active Pull subscriber to grab them.

One way to handle this is with GCP’s Monitoring Alerts. You can set up an alert on your queue depth and unacked messages then have it post to a Webhook Notification Channel (shown below).

Screengrab5

The endpoint would be a simple ping on your service which is all you need to scale your Cloud Run service from 0 instances up to 1. After startup of the fresh instance, your Pull begins and pulls the message that originally triggered the alert webhook. You can play with the settings and thresholds to avoid sending unnecessary alerts such as waiting a certain number of seconds for a message to be waiting before triggering.

Wrap Up

The main takeaways are:

  • You have to set CPU always allocated for background services such as Pull subscribers to work.
  • Cloud Tasks are wonderful for throughput control but lousy for larger payloads.
  • Google Cloud Monitoring and Alerts is wildly powerful — using it for autoscaling background services doesn’t even scratch the surface.

Profile picture

Based in Austin, TX, Brandon Thompson is the Senior Director of Engineering at Conversion Logix where he leads the technological vision and delivery of The Conversion Cloud along with a growing suite of both internal and customer-facing applications.