
本文旨在帮助开发者优化 Google Cloud Pub/Sub 中使用 Java 客户端从拉取订阅模型中获取消息时遇到的高延迟问题。我们将分析同步拉取模式的局限性,并推荐使用异步流式拉取模式以实现更低的延迟和更高的吞吐量。
在使用 Google Cloud Pub/Sub 时,开发者可能会遇到从拉取订阅中获取消息时的高延迟问题,尤其是在处理大量消息时。本文将探讨如何通过调整拉取策略来优化消息传递,从而降低延迟并提高吞吐量。
理解同步拉取的局限性
在同步拉取模式下,客户端发送一个拉取请求,并等待服务器响应。虽然这种模式易于实现,但在高负载情况下可能会遇到性能瓶颈。问题在于,每次只发送一个拉取请求,这限制了客户端可以接收消息的速率。Pub/Sub 服务为了平衡延迟和完整性,倾向于快速返回部分消息,而不是等待收集到最大数量的消息。
以下是一个同步拉取的示例代码:
public ListgetMessagesFromSubscription(String projectId, String subscriptionId, int numOfMessages, CredentialsProvider credentialsProvider) { List receivedMessages = new ArrayList<>(); try { SubscriberStubSettings subscriberStubSettings = getSubscriberStubSettings(credentialsProvider); try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) { String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId); PullRequest pullRequest = PullRequest.newBuilder() .setMaxMessages(100) .setSubscription(subscriptionName) .build(); PullResponse pullResponse = subscriber.pullCallable().call(pullRequest); List ackIds = new ArrayList<>(); for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) { ackIds.add(message.getAckId()); ModifyAckDeadlineRequest modifyAckDeadlineRequest = ModifyAckDeadlineRequest.newBuilder() .setSubscription(subscriptionName) .addAckIds(message.getAckId()) .setAckDeadlineSeconds(30) .build(); subscriber.modifyAckDeadlineCallable().call(modifyAckDeadlineRequest); } if (ackIds.isEmpty()) { // my logic } else { AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder() .setSubscription(subscriptionName) .addAllAckIds(ackIds) .build(); subscriber.acknowledgeCallable().call(acknowledgeRequest); receivedMessages = new ArrayList<>(pullResponse.getReceivedMessagesList()); } } LOG.info("getMessagesFromSubscription: Received {} Messages for Project Id: {} and" + " Subscription Id: {}.", receivedMessages.size(), projectId, subscriptionId); } catch (Exception e) { LOG.error("getMessagesFromSubscription: Error while pulling message from Pub/Sub " + "from Project ID: {} and Subscription ID: {}", projectId, subscriptionId, e); } return receivedMessages; } private SubscriberStubSettings getSubscriberStubSettings(CredentialsProvider credentialsProvider) throws IOException { SubscriberStubSettings.Builder subscriberStubSettingsBuilder = SubscriberStubSettings .newBuilder() .setTransportChannelProvider(SubscriberStubSettings .defaultGrpcTransportProviderBuilder() .setMaxInboundMessageSize(20 << 20) .build()); if (credentialsProvider != null) { subscriberStubSettingsBuilder = subscriberStubSettingsBuilder.setCredentialsProvider(credentialsProvider); } return subscriberStubSettingsBuilder.build(); }
这段代码展示了如何使用同步拉取来从 Pub/Sub 订阅中获取消息。它创建了一个 PullRequest,设置了最大消息数量,并调用 subscriber.pullCallable().call(pullRequest) 来同步地获取消息。
优化策略:转向异步流式拉取
为了克服同步拉取的局限性,建议使用异步流式拉取模式。这种模式允许客户端同时发出多个拉取请求,从而显著提高吞吐量并降低延迟。
异步流式拉取的优势:
- 更高的吞吐量: 通过并发发出多个请求,客户端可以更快地接收消息。
- 更低的延迟: Pub/Sub 服务可以识别客户端的处理能力,并更快地推送消息。
- 更好的资源利用: 客户端可以更有效地利用网络和计算资源。
如何实现异步流式拉取 (示例):
Google Cloud Pub/Sub 提供了异步 API,允许开发者实现流式拉取。以下是一个简化的示例,展示了如何使用 Subscriber 类进行异步拉取:
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
public class AsynchronousPullExample {
public static void main(String[] args) throws Exception {
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
System.out.println("Received message: " + message.getData().toStringUtf8());
consumer.ack(); // Acknowledge the message
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:%n", subscriptionName.toString());
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} finally {
if (subscriber != null) {
subscriber.stopAsync();
}
}
}
}代码解释:
- MessageReceiver: 定义了一个接收消息的回调函数。当 Pub/Sub 服务推送消息时,此函数会被调用。
- Subscriber: 使用 Subscriber.newBuilder() 创建一个订阅者实例,并传入订阅名称和消息接收器。
- subscriber.startAsync(): 异步启动订阅者,开始接收消息。
- consumer.ack(): 在消息处理完成后,调用 consumer.ack() 确认消息,防止消息被重复发送。
注意事项:
- 错误处理: 在实际应用中,需要添加适当的错误处理机制,例如重试或将消息发送到死信队列。
- 流控: 在高吞吐量场景下,需要考虑流控,以避免客户端过载。可以使用 FlowControlSettings 来限制未确认消息的数量或大小。
- 确认机制: 选择合适的确认机制(自动或手动)取决于应用的需求。手动确认可以提供更精细的控制,但需要更多的代码。
总结
通过从同步拉取转向异步流式拉取,可以显著降低 Google Cloud Pub/Sub 的消息传递延迟并提高吞吐量。异步拉取允许客户端并发处理多个请求,从而更有效地利用资源并更快地接收消息。在实施异步拉取时,请务必考虑错误处理、流控和确认机制,以确保应用程序的稳定性和可靠性。









