package org.apache.rocketmq.proxy.grpc.v2.consumer;

import apache.rocketmq.v2.AckMessageEntry;
import apache.rocketmq.v2.AckMessageRequest;
import apache.rocketmq.v2.AckMessageResponse;
import apache.rocketmq.v2.AckMessageResultEntry;
import apache.rocketmq.v2.Code;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;

/* loaded from: input_file:org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.class */
public class AckMessageActivity extends AbstractMessingActivity {
    protected ReceiptHandleProcessor receiptHandleProcessor;

    public AckMessageActivity(MessagingProcessor messagingProcessor, ReceiptHandleProcessor receiptHandleProcessor, GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) {
        super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager);
        this.receiptHandleProcessor = receiptHandleProcessor;
    }

    public CompletableFuture<AckMessageResponse> ackMessage(ProxyContext proxyContext, AckMessageRequest ackMessageRequest) {
        CompletableFuture<AckMessageResponse> completableFuture = new CompletableFuture<>();
        try {
            validateTopicAndConsumerGroup(ackMessageRequest.getTopic(), ackMessageRequest.getGroup());
            CompletableFuture[] completableFutureArr = new CompletableFuture[ackMessageRequest.getEntriesCount()];
            for (int i = 0; i < ackMessageRequest.getEntriesCount(); i++) {
                completableFutureArr[i] = processAckMessage(proxyContext, ackMessageRequest, ackMessageRequest.getEntries(i));
            }
            CompletableFuture.allOf(completableFutureArr).whenComplete((r7, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                    return;
                }
                HashSet hashSet = new HashSet();
                ArrayList arrayList = new ArrayList();
                for (CompletableFuture completableFuture2 : completableFutureArr) {
                    AckMessageResultEntry ackMessageResultEntry = (AckMessageResultEntry) completableFuture2.join();
                    hashSet.add(ackMessageResultEntry.getStatus().getCode());
                    arrayList.add(ackMessageResultEntry);
                }
                AckMessageResponse.Builder addAllEntries = AckMessageResponse.newBuilder().addAllEntries(arrayList);
                if (hashSet.size() > 1) {
                    addAllEntries.setStatus(ResponseBuilder.getInstance().buildStatus(Code.MULTIPLE_RESULTS, Code.MULTIPLE_RESULTS.name()));
                } else if (hashSet.size() == 1) {
                    Code code = (Code) hashSet.stream().findAny().get();
                    addAllEntries.setStatus(ResponseBuilder.getInstance().buildStatus(code, code.name()));
                } else {
                    addAllEntries.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR, "ack message result is empty"));
                }
                completableFuture.complete(addAllEntries.build());
            });
        } catch (Throwable th2) {
            completableFuture.completeExceptionally(th2);
        }
        return completableFuture;
    }

    protected CompletableFuture<AckMessageResultEntry> processAckMessage(ProxyContext proxyContext, AckMessageRequest ackMessageRequest, AckMessageEntry ackMessageEntry) {
        CompletableFuture<AckMessageResultEntry> completableFuture = new CompletableFuture<>();
        try {
            String receiptHandle = ackMessageEntry.getReceiptHandle();
            String wrapResourceWithNamespace = GrpcConverter.getInstance().wrapResourceWithNamespace(ackMessageRequest.getGroup());
            MessageReceiptHandle removeReceiptHandle = this.receiptHandleProcessor.removeReceiptHandle(this.grpcChannelManager.getChannel(proxyContext.getClientID()), wrapResourceWithNamespace, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
            if (removeReceiptHandle != null) {
                receiptHandle = removeReceiptHandle.getReceiptHandleStr();
            }
            this.messagingProcessor.ackMessage(proxyContext, ReceiptHandle.decode(receiptHandle), ackMessageEntry.getMessageId(), wrapResourceWithNamespace, GrpcConverter.getInstance().wrapResourceWithNamespace(ackMessageRequest.getTopic())).thenAccept(ackResult -> {
                completableFuture.complete(convertToAckMessageResultEntry(proxyContext, ackMessageEntry, ackResult));
            }).exceptionally(th -> {
                completableFuture.complete(convertToAckMessageResultEntry(proxyContext, ackMessageEntry, th));
                return null;
            });
        } catch (Throwable th2) {
            completableFuture.complete(convertToAckMessageResultEntry(proxyContext, ackMessageEntry, th2));
        }
        return completableFuture;
    }

    protected AckMessageResultEntry convertToAckMessageResultEntry(ProxyContext proxyContext, AckMessageEntry ackMessageEntry, Throwable th) {
        return AckMessageResultEntry.newBuilder().setStatus(ResponseBuilder.getInstance().buildStatus(th)).setMessageId(ackMessageEntry.getMessageId()).setReceiptHandle(ackMessageEntry.getReceiptHandle()).build();
    }

    protected AckMessageResultEntry convertToAckMessageResultEntry(ProxyContext proxyContext, AckMessageEntry ackMessageEntry, AckResult ackResult) {
        return AckStatus.OK.equals(ackResult.getStatus()) ? AckMessageResultEntry.newBuilder().setMessageId(ackMessageEntry.getMessageId()).setReceiptHandle(ackMessageEntry.getReceiptHandle()).setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name())).build() : AckMessageResultEntry.newBuilder().setMessageId(ackMessageEntry.getMessageId()).setReceiptHandle(ackMessageEntry.getReceiptHandle()).setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR, "ack failed: status is abnormal")).build();
    }
}
