package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.pattern.Patterns;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.util.Timeout;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.SerializedLambda;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer;
import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;

/* loaded from: input_file:BOOT-INF/lib/alpha-fsm-0.6.0.jar:org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.class */
public class KafkaSagaEventConsumer extends AbstractEventConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    final String groupId = "servicecomb-pack";
    final ObjectMapper jsonMapper;

    public KafkaSagaEventConsumer(ActorSystem actorSystem, ActorRef actorRef, MetricsService metricsService, String str, String str2) {
        super(actorSystem, actorRef, metricsService);
        this.groupId = "servicecomb-pack";
        this.jsonMapper = new ObjectMapper();
        Consumer.committableSource(ConsumerSettings.create(actorSystem.settings().config().getConfig("akka.kafka.consumer"), new StringDeserializer(), new StringDeserializer()).withBootstrapServers(str).withGroupId("servicecomb-pack").withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false").withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest").withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class").withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class"), Subscriptions.topics(str2)).mapAsync(20, committableMessage -> {
            BaseEvent baseEvent = (BaseEvent) this.jsonMapper.readValue((String) committableMessage.record().value(), BaseEvent.class);
            if (LOG.isDebugEnabled()) {
                LOG.debug("receive [{}] {} {}", baseEvent.getGlobalTxId(), baseEvent.getType(), baseEvent.getLocalTxId());
            }
            return sendSagaActor(baseEvent).thenApply(str3 -> {
                return committableMessage.committableOffset();
            });
        }).batch(100L, ConsumerMessage::createCommittableOffsetBatch, (v0, v1) -> {
            return v0.updated(v1);
        }).mapAsync(20, committableOffsetBatch -> {
            return committableOffsetBatch.commitJavadsl();
        }).to(Sink.ignore()).run(ActorMaterializer.create(actorSystem));
    }

    private CompletionStage<String> sendSagaActor(BaseEvent baseEvent) {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            this.metricsService.metrics().doActorReceived();
            Timeout timeout = new Timeout(Duration.create(10L, "seconds"));
            Await.result(Patterns.ask(this.sagaShardRegionActor, baseEvent, timeout), timeout.duration());
            long currentTimeMillis2 = System.currentTimeMillis();
            this.metricsService.metrics().doActorAccepted();
            this.metricsService.metrics().doActorAvgTime(currentTimeMillis2 - currentTimeMillis);
            return CompletableFuture.completedFuture("OK");
        } catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable) e);
            this.metricsService.metrics().doActorRejected();
            throw new CompletionException(e);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -234430277:
                if (implMethodName.equals("updated")) {
                    z = 3;
                    break;
                }
                break;
            case 120457484:
                if (implMethodName.equals("createCommittableOffsetBatch")) {
                    z = false;
                    break;
                }
                break;
            case 1884384818:
                if (implMethodName.equals("lambda$new$853f352d$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1884384819:
                if (implMethodName.equals("lambda$new$853f352d$2")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("akka/kafka/ConsumerMessage") && serializedLambda.getImplMethodSignature().equals("(Lakka/kafka/ConsumerMessage$CommittableOffset;)Lakka/kafka/ConsumerMessage$CommittableOffsetBatch;")) {
                    return ConsumerMessage::createCommittableOffsetBatch;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer") && serializedLambda.getImplMethodSignature().equals("(Lakka/kafka/ConsumerMessage$CommittableOffsetBatch;)Ljava/util/concurrent/CompletionStage;")) {
                    return committableOffsetBatch -> {
                        return committableOffsetBatch.commitJavadsl();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer") && serializedLambda.getImplMethodSignature().equals("(Lakka/kafka/ConsumerMessage$CommittableMessage;)Ljava/util/concurrent/CompletionStage;")) {
                    KafkaSagaEventConsumer kafkaSagaEventConsumer = (KafkaSagaEventConsumer) serializedLambda.getCapturedArg(0);
                    return committableMessage -> {
                        BaseEvent baseEvent = (BaseEvent) this.jsonMapper.readValue((String) committableMessage.record().value(), BaseEvent.class);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("receive [{}] {} {}", baseEvent.getGlobalTxId(), baseEvent.getType(), baseEvent.getLocalTxId());
                        }
                        return sendSagaActor(baseEvent).thenApply(str3 -> {
                            return committableMessage.committableOffset();
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("akka/kafka/ConsumerMessage$CommittableOffsetBatch") && serializedLambda.getImplMethodSignature().equals("(Lakka/kafka/ConsumerMessage$Committable;)Lakka/kafka/ConsumerMessage$CommittableOffsetBatch;")) {
                    return (v0, v1) -> {
                        return v0.updated(v1);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
