package org.apache.pekko.projection.kafka.internal;

import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.pekko.NotUsed;
import org.apache.pekko.NotUsed$;
import org.apache.pekko.actor.Scheduler;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.kafka.AutoSubscription;
import org.apache.pekko.kafka.ConsumerSettings;
import org.apache.pekko.kafka.RestrictedConsumer;
import org.apache.pekko.kafka.Subscriptions$;
import org.apache.pekko.kafka.scaladsl.Consumer;
import org.apache.pekko.kafka.scaladsl.Consumer$;
import org.apache.pekko.kafka.scaladsl.PartitionAssignmentHandler;
import org.apache.pekko.pattern.package$;
import org.apache.pekko.projection.MergeableOffset;
import org.apache.pekko.projection.OffsetVerification;
import org.apache.pekko.projection.OffsetVerification$VerificationFailure$;
import org.apache.pekko.projection.OffsetVerification$VerificationSuccess$;
import org.apache.pekko.projection.javadsl.MergeableOffsetSourceProvider;
import org.apache.pekko.projection.javadsl.SourceProvider;
import org.apache.pekko.projection.javadsl.VerifiableSourceProvider;
import org.apache.pekko.projection.kafka.KafkaOffsets$;
import org.apache.pekko.stream.scaladsl.Keep$;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.util.FutureConverters$;
import org.apache.pekko.util.FutureConverters$CompletionStageOps$;
import org.apache.pekko.util.FutureConverters$FutureOps$;
import org.apache.pekko.util.OptionConverters$;
import org.apache.pekko.util.OptionConverters$RichOptional$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Some$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaSourceProviderImpl.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl.class */
public class KafkaSourceProviderImpl<K, V> extends SourceProvider<MergeableOffset<Long>, ConsumerRecord<K, V>> implements VerifiableSourceProvider<MergeableOffset<Long>, ConsumerRecord<K, V>>, org.apache.pekko.projection.scaladsl.VerifiableSourceProvider<MergeableOffset<Long>, ConsumerRecord<K, V>>, MergeableOffsetSourceProvider<MergeableOffset<Long>, ConsumerRecord<K, V>>, org.apache.pekko.projection.scaladsl.MergeableOffsetSourceProvider<MergeableOffset<Long>, ConsumerRecord<K, V>> {
    private final ConsumerSettings<K, V> settings;
    private final Set<String> topics;
    private final Function0<MetadataClientAdapter> metadataClientFactory;
    private final KafkaSourceProviderSettings sourceProviderSettings;
    private final ExecutionContext executionContext;
    private final Scheduler scheduler;
    private final AutoSubscription subscription;
    private final ProjectionPartitionHandler partitionHandler = new ProjectionPartitionHandler(this);
    private Option control = None$.MODULE$;
    public volatile Set<TopicPartition> org$apache$pekko$projection$kafka$internal$KafkaSourceProviderImpl$$assignedPartitions = Predef$.MODULE$.Set().empty();

    /* compiled from: KafkaSourceProviderImpl.scala */
    /* loaded from: input_file:org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl$ProjectionPartitionHandler.class */
    public class ProjectionPartitionHandler implements PartitionAssignmentHandler {
        private final /* synthetic */ KafkaSourceProviderImpl $outer;

        public ProjectionPartitionHandler(KafkaSourceProviderImpl kafkaSourceProviderImpl) {
            if (kafkaSourceProviderImpl == null) {
                throw new NullPointerException();
            }
            this.$outer = kafkaSourceProviderImpl;
        }

        public void onRevoke(Set<TopicPartition> set, RestrictedConsumer restrictedConsumer) {
            this.$outer.org$apache$pekko$projection$kafka$internal$KafkaSourceProviderImpl$$assignedPartitions = this.$outer.org$apache$pekko$projection$kafka$internal$KafkaSourceProviderImpl$$assignedPartitions.diff(set);
        }

        public void onAssign(Set<TopicPartition> set, RestrictedConsumer restrictedConsumer) {
            this.$outer.org$apache$pekko$projection$kafka$internal$KafkaSourceProviderImpl$$assignedPartitions = set;
        }

        public void onLost(Set<TopicPartition> set, RestrictedConsumer restrictedConsumer) {
            this.$outer.org$apache$pekko$projection$kafka$internal$KafkaSourceProviderImpl$$assignedPartitions = this.$outer.org$apache$pekko$projection$kafka$internal$KafkaSourceProviderImpl$$assignedPartitions.diff(set);
        }

        public void onStop(Set<TopicPartition> set, RestrictedConsumer restrictedConsumer) {
            this.$outer.org$apache$pekko$projection$kafka$internal$KafkaSourceProviderImpl$$assignedPartitions = KafkaSourceProviderImpl$.org$apache$pekko$projection$kafka$internal$KafkaSourceProviderImpl$$$EmptyTps;
        }

        public final /* synthetic */ KafkaSourceProviderImpl org$apache$pekko$projection$kafka$internal$KafkaSourceProviderImpl$ProjectionPartitionHandler$$$outer() {
            return this.$outer;
        }
    }

    public KafkaSourceProviderImpl(ActorSystem<?> actorSystem, ConsumerSettings<K, V> consumerSettings, Set<String> set, Function0<MetadataClientAdapter> function0, KafkaSourceProviderSettings kafkaSourceProviderSettings) {
        this.settings = consumerSettings;
        this.topics = set;
        this.metadataClientFactory = function0;
        this.sourceProviderSettings = kafkaSourceProviderSettings;
        this.executionContext = actorSystem.executionContext();
        this.scheduler = actorSystem.classicSystem().scheduler();
        this.subscription = Subscriptions$.MODULE$.topics(set).withPartitionAssignmentHandler(partitionHandler());
    }

    public KafkaSourceProviderImpl<K, V>.ProjectionPartitionHandler partitionHandler() {
        return this.partitionHandler;
    }

    public Option<Consumer.Control> control() {
        return this.control;
    }

    public void control_$eq(Option<Consumer.Control> option) {
        this.control = option;
    }

    public Source<ConsumerRecord<K, V>, Consumer.Control> _source(Function0<Future<Option<MergeableOffset<Long>>>> function0, int i, MetadataClientAdapter metadataClientAdapter) {
        return Consumer$.MODULE$.plainPartitionedManualOffsetSource(this.settings, this.subscription, getOffsetsOnAssign(function0, metadataClientAdapter), Consumer$.MODULE$.plainPartitionedManualOffsetSource$default$4()).flatMapMerge(i, tuple2 -> {
            if (tuple2 != null) {
                return (Source) tuple2._2();
            }
            throw new MatchError(tuple2);
        });
    }

    public Future<Source<ConsumerRecord<K, V>, NotUsed>> source(Function0<Future<Option<MergeableOffset<Long>>>> function0) {
        MetadataClientAdapter metadataClientAdapter = (MetadataClientAdapter) this.metadataClientFactory.apply();
        Future<Object> numPartitions = metadataClientAdapter.numPartitions(this.topics);
        numPartitions.failed().foreach(th -> {
            metadataClientAdapter.stop();
        }, this.executionContext);
        return numPartitions.map(obj -> {
            return source$$anonfun$2(function0, metadataClientAdapter, BoxesRunTime.unboxToInt(obj));
        }, this.executionContext);
    }

    public CompletionStage<org.apache.pekko.stream.javadsl.Source<ConsumerRecord<K, V>, NotUsed>> source(Supplier<CompletionStage<Optional<MergeableOffset<Long>>>> supplier) {
        return FutureConverters$FutureOps$.MODULE$.asJava$extension(FutureConverters$.MODULE$.FutureOps(source(() -> {
            return FutureConverters$CompletionStageOps$.MODULE$.asScala$extension(FutureConverters$.MODULE$.CompletionStageOps((CompletionStage) supplier.get())).map(optional -> {
                return OptionConverters$RichOptional$.MODULE$.toScala$extension(OptionConverters$.MODULE$.RichOptional(optional));
            }, this.executionContext);
        }).map(source -> {
            return source.asJava();
        }, this.executionContext)));
    }

    public MergeableOffset<Long> extractOffset(ConsumerRecord<K, V> consumerRecord) {
        return KafkaOffsets$.MODULE$.toMergeableOffset(consumerRecord);
    }

    public OffsetVerification verifyOffset(MergeableOffset<Long> mergeableOffset) {
        Set<TopicPartition> partitions = KafkaOffsets$.MODULE$.partitions(mergeableOffset);
        Set<TopicPartition> set = this.org$apache$pekko$projection$kafka$internal$KafkaSourceProviderImpl$$assignedPartitions;
        return (OffsetVerification) (partitions.forall(topicPartition -> {
            return set.contains(topicPartition);
        }) ? OffsetVerification$VerificationSuccess$.MODULE$ : OffsetVerification$VerificationFailure$.MODULE$.apply("The offset contains Kafka topic partitions that were revoked or lost in a previous rebalance"));
    }

    public long extractCreationTime(ConsumerRecord<K, V> consumerRecord) {
        TimestampType timestampType = consumerRecord.timestampType();
        TimestampType timestampType2 = TimestampType.CREATE_TIME;
        if (timestampType == null) {
            if (timestampType2 != null) {
                return 0L;
            }
        } else if (!timestampType.equals(timestampType2)) {
            return 0L;
        }
        return consumerRecord.timestamp();
    }

    private Function1<Set<TopicPartition>, Future<Map<TopicPartition, Object>>> getOffsetsOnAssign(Function0<Future<Option<MergeableOffset<Long>>>> function0, MetadataClientAdapter metadataClientAdapter) {
        return set -> {
            return package$.MODULE$.after(this.sourceProviderSettings.readOffsetDelay(), this.scheduler, () -> {
                return r3.getOffsetsOnAssign$$anonfun$1$$anonfun$1(r4, r5, r6);
            }, this.executionContext);
        };
    }

    private final /* synthetic */ Source source$$anonfun$2(Function0 function0, MetadataClientAdapter metadataClientAdapter, int i) {
        return _source(function0, i, metadataClientAdapter).mapMaterializedValue(control -> {
            control_$eq(Some$.MODULE$.apply(control));
            return control;
        }).watchTermination(Keep$.MODULE$.right()).mapMaterializedValue(future -> {
            future.onComplete(r3 -> {
                metadataClientAdapter.stop();
            }, this.executionContext);
            return NotUsed$.MODULE$;
        });
    }

    private final Future getOffsetsOnAssign$$anonfun$1$$anonfun$1(Function0 function0, MetadataClientAdapter metadataClientAdapter, Set set) {
        return ((Future) function0.apply()).flatMap(option -> {
            if (option instanceof Some) {
                return Future$.MODULE$.successful(((MergeableOffset) ((Some) option).value()).entries().collect(new KafkaSourceProviderImpl$$anon$1(set)));
            }
            if (None$.MODULE$.equals(option)) {
                return metadataClientAdapter.getBeginningOffsets(set);
            }
            throw new MatchError(option);
        }, this.executionContext).recover(new KafkaSourceProviderImpl$$anon$2(), this.executionContext);
    }
}
