package org.apache.pekko.kafka.internal;

import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.kafka.ManualSubscription;
import org.apache.pekko.kafka.Subscriptions;
import org.apache.pekko.kafka.Subscriptions$Assignment$;
import org.apache.pekko.kafka.Subscriptions$AssignmentOffsetsForTimes$;
import org.apache.pekko.kafka.Subscriptions$AssignmentWithOffset$;
import org.apache.pekko.kafka.scaladsl.PartitionAssignmentHandler;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import scala.MatchError;
import scala.PartialFunction;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.StringOps$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Statics;

/* compiled from: BaseSingleSourceLogic.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/kafka/internal/BaseSingleSourceLogic.class */
public abstract class BaseSingleSourceLogic<K, V, Msg> extends GraphStageLogic implements PromiseControl, MetricsControl, StageIdLogging, SourceLogicSubscription, MessageBuilder<K, V, Msg>, SourceLogicBuffer<K, V, Msg>, SourceLogicSubscription, MessageBuilder, SourceLogicBuffer {
    private Promise org$apache$pekko$kafka$internal$PromiseControl$$shutdownPromise;
    private Promise org$apache$pekko$kafka$internal$PromiseControl$$stopPromise;
    private AsyncCallback org$apache$pekko$kafka$internal$PromiseControl$$controlCallback;
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private String org$apache$pekko$kafka$internal$InstanceId$$instanceId;
    private LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$_log;
    private Iterator buffer;
    private AsyncCallback filterRevokedPartitionsCB;
    private final SourceShape shape;
    private ActorRef consumerActor;
    private GraphStageLogic.StageActor sourceActor;
    private Set tps;
    public boolean org$apache$pekko$kafka$internal$BaseSingleSourceLogic$$requested;
    public int org$apache$pekko$kafka$internal$BaseSingleSourceLogic$$requestId;
    private final AsyncCallback<Set<TopicPartition>> assignedCB;
    private final AsyncCallback<Set<TopicPartition>> revokedCB;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public BaseSingleSourceLogic(SourceShape<Msg> sourceShape) {
        super(sourceShape);
        this.shape = sourceShape;
        PromiseControl.$init$(this);
        StageLogging.$init$(this);
        org$apache$pekko$kafka$internal$InstanceId$_setter_$org$apache$pekko$kafka$internal$InstanceId$$instanceId_$eq(StringOps$.MODULE$.take$extension(Predef$.MODULE$.augmentString(UUID.randomUUID().toString()), 5));
        StageIdLogging.$init$((StageIdLogging) this);
        SourceLogicBuffer.$init$(this);
        this.tps = Predef$.MODULE$.Set().empty();
        this.org$apache$pekko$kafka$internal$BaseSingleSourceLogic$$requested = false;
        this.org$apache$pekko$kafka$internal$BaseSingleSourceLogic$$requestId = 0;
        this.assignedCB = getAsyncCallback(set -> {
            tps_$eq((Set) tps().$plus$plus(set));
            log().debug("Assigned partitions: {}. All partitions: {}", set, tps());
            requestMessages();
        });
        this.revokedCB = getAsyncCallback(set2 -> {
            tps_$eq((Set) tps().$minus$minus(set2));
            log().debug("Revoked partitions: {}. All partitions: {}", set2, tps());
        });
        setHandler(sourceShape.out(), new OutHandler(this) { // from class: org.apache.pekko.kafka.internal.BaseSingleSourceLogic$$anon$1
            private final /* synthetic */ BaseSingleSourceLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public void onPull() {
                this.$outer.org$apache$pekko$kafka$internal$BaseSingleSourceLogic$$pump();
            }

            public void onDownstreamFinish(Throwable th) {
                this.$outer.performShutdown();
            }
        });
        Statics.releaseFence();
    }

    @Override // org.apache.pekko.kafka.scaladsl.Consumer.Control
    public /* bridge */ /* synthetic */ Future drainAndShutdown(Future future, ExecutionContext executionContext) {
        Future drainAndShutdown;
        drainAndShutdown = drainAndShutdown(future, executionContext);
        return drainAndShutdown;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public Promise org$apache$pekko$kafka$internal$PromiseControl$$shutdownPromise() {
        return this.org$apache$pekko$kafka$internal$PromiseControl$$shutdownPromise;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public Promise org$apache$pekko$kafka$internal$PromiseControl$$stopPromise() {
        return this.org$apache$pekko$kafka$internal$PromiseControl$$stopPromise;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public AsyncCallback org$apache$pekko$kafka$internal$PromiseControl$$controlCallback() {
        return this.org$apache$pekko$kafka$internal$PromiseControl$$controlCallback;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public void org$apache$pekko$kafka$internal$PromiseControl$_setter_$org$apache$pekko$kafka$internal$PromiseControl$$shutdownPromise_$eq(Promise promise) {
        this.org$apache$pekko$kafka$internal$PromiseControl$$shutdownPromise = promise;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public void org$apache$pekko$kafka$internal$PromiseControl$_setter_$org$apache$pekko$kafka$internal$PromiseControl$$stopPromise_$eq(Promise promise) {
        this.org$apache$pekko$kafka$internal$PromiseControl$$stopPromise = promise;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public void org$apache$pekko$kafka$internal$PromiseControl$_setter_$org$apache$pekko$kafka$internal$PromiseControl$$controlCallback_$eq(AsyncCallback asyncCallback) {
        this.org$apache$pekko$kafka$internal$PromiseControl$$controlCallback = asyncCallback;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public /* bridge */ /* synthetic */ void performStop() {
        performStop();
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public /* bridge */ /* synthetic */ boolean onStop() {
        boolean onStop;
        onStop = onStop();
        return onStop;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public /* bridge */ /* synthetic */ boolean onShutdown() {
        boolean onShutdown;
        onShutdown = onShutdown();
        return onShutdown;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl, org.apache.pekko.kafka.scaladsl.Consumer.Control
    public /* bridge */ /* synthetic */ Future stop() {
        Future stop;
        stop = stop();
        return stop;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl, org.apache.pekko.kafka.scaladsl.Consumer.Control
    public /* bridge */ /* synthetic */ Future shutdown() {
        Future shutdown;
        shutdown = shutdown();
        return shutdown;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl, org.apache.pekko.kafka.scaladsl.Consumer.Control
    public /* bridge */ /* synthetic */ Future isShutdown() {
        Future isShutdown;
        isShutdown = isShutdown();
        return isShutdown;
    }

    @Override // org.apache.pekko.kafka.scaladsl.Consumer.Control, org.apache.pekko.kafka.internal.MetricsControl
    public /* bridge */ /* synthetic */ Future metrics() {
        Future metrics;
        metrics = metrics();
        return metrics;
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public /* bridge */ /* synthetic */ Class logSource() {
        return StageLogging.logSource$(this);
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public String org$apache$pekko$kafka$internal$InstanceId$$instanceId() {
        return this.org$apache$pekko$kafka$internal$InstanceId$$instanceId;
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public void org$apache$pekko$kafka$internal$InstanceId$_setter_$org$apache$pekko$kafka$internal$InstanceId$$instanceId_$eq(String str) {
        this.org$apache$pekko$kafka$internal$InstanceId$$instanceId = str;
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public /* bridge */ /* synthetic */ String id() {
        String id;
        id = id();
        return id;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$_log() {
        return this.org$apache$pekko$kafka$internal$StageIdLogging$$_log;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public void org$apache$pekko$kafka$internal$StageIdLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$kafka$internal$StageIdLogging$$_log = loggingAdapter;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* bridge */ /* synthetic */ String idLogPrefix() {
        String idLogPrefix;
        idLogPrefix = idLogPrefix();
        return idLogPrefix;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        LoggingAdapter log;
        log = log();
        return log;
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicSubscription
    public /* bridge */ /* synthetic */ void configureSubscription(AsyncCallback asyncCallback, AsyncCallback asyncCallback2) {
        SourceLogicSubscription.configureSubscription$(this, asyncCallback, asyncCallback2);
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicSubscription
    public /* bridge */ /* synthetic */ PartitionAssignmentHandler addToPartitionAssignmentHandler(PartitionAssignmentHandler partitionAssignmentHandler) {
        return SourceLogicSubscription.addToPartitionAssignmentHandler$(this, partitionAssignmentHandler);
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicBuffer
    public Iterator buffer() {
        return this.buffer;
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicBuffer
    public AsyncCallback filterRevokedPartitionsCB() {
        return this.filterRevokedPartitionsCB;
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicBuffer
    public void buffer_$eq(Iterator iterator) {
        this.buffer = iterator;
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicBuffer
    public void org$apache$pekko$kafka$internal$SourceLogicBuffer$_setter_$filterRevokedPartitionsCB_$eq(AsyncCallback asyncCallback) {
        this.filterRevokedPartitionsCB = asyncCallback;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public /* synthetic */ void org$apache$pekko$kafka$internal$PromiseControl$$super$setKeepGoing(boolean z) {
        super.setKeepGoing(z);
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public /* synthetic */ void org$apache$pekko$kafka$internal$PromiseControl$$super$complete(Outlet outlet) {
        super.complete(outlet);
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* synthetic */ LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$super$log() {
        return StageLogging.log$(this);
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public SourceShape<Msg> shape() {
        return this.shape;
    }

    @Override // org.apache.pekko.kafka.internal.MetricsControl
    public ExecutionContext executionContext() {
        return materializer().executionContext();
    }

    @Override // org.apache.pekko.kafka.internal.MetricsControl
    public abstract Future<ActorRef> consumerFuture();

    @Override // org.apache.pekko.kafka.internal.SourceLogicSubscription
    public final ActorRef consumerActor() {
        return this.consumerActor;
    }

    public final void consumerActor_$eq(ActorRef actorRef) {
        this.consumerActor = actorRef;
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicSubscription
    public GraphStageLogic.StageActor sourceActor() {
        return this.sourceActor;
    }

    public void sourceActor_$eq(GraphStageLogic.StageActor stageActor) {
        this.sourceActor = stageActor;
    }

    public Set<TopicPartition> tps() {
        return this.tps;
    }

    public void tps_$eq(Set<TopicPartition> set) {
        this.tps = set;
    }

    public void preStart() {
        super.preStart();
        sourceActor_$eq(getStageActor(messageHandling()));
        log().info("Starting. StageActor {}", sourceActor().ref());
        consumerActor_$eq(createConsumerActor());
        sourceActor().watch(consumerActor());
        configureSubscription(this.assignedCB, this.revokedCB);
    }

    public PartialFunction<Tuple2<ActorRef, Object>, BoxedUnit> messageHandling() {
        return new BaseSingleSourceLogic$$anon$2(this);
    }

    public abstract ActorRef createConsumerActor();

    @Override // org.apache.pekko.kafka.internal.SourceLogicSubscription
    public void configureManualSubscription(ManualSubscription manualSubscription) {
        if (manualSubscription instanceof Subscriptions.Assignment) {
            Set<TopicPartition> _1 = Subscriptions$Assignment$.MODULE$.unapply((Subscriptions.Assignment) manualSubscription)._1();
            consumerActor().tell(KafkaConsumerActor$Internal$Assign$.MODULE$.apply(_1), sourceActor().ref());
            tps_$eq((Set) tps().$plus$plus(_1));
        } else if (manualSubscription instanceof Subscriptions.AssignmentWithOffset) {
            Map<TopicPartition, Object> _12 = Subscriptions$AssignmentWithOffset$.MODULE$.unapply((Subscriptions.AssignmentWithOffset) manualSubscription)._1();
            consumerActor().tell(KafkaConsumerActor$Internal$AssignWithOffset$.MODULE$.apply(_12), sourceActor().ref());
            tps_$eq((Set) tps().$plus$plus(_12.keySet()));
        } else {
            if (!(manualSubscription instanceof Subscriptions.AssignmentOffsetsForTimes)) {
                throw new MatchError(manualSubscription);
            }
            Map<TopicPartition, Object> _13 = Subscriptions$AssignmentOffsetsForTimes$.MODULE$.unapply((Subscriptions.AssignmentOffsetsForTimes) manualSubscription)._1();
            consumerActor().tell(KafkaConsumerActor$Internal$AssignOffsetsForTimes$.MODULE$.apply(_13), sourceActor().ref());
            tps_$eq((Set) tps().$plus$plus(_13.keySet()));
        }
    }

    public void org$apache$pekko$kafka$internal$BaseSingleSourceLogic$$pump() {
        while (isAvailable(shape().out())) {
            if (!buffer().hasNext()) {
                if (this.org$apache$pekko$kafka$internal$BaseSingleSourceLogic$$requested || !tps().nonEmpty()) {
                    return;
                }
                requestMessages();
                return;
            }
            push(shape().out(), createMessage((ConsumerRecord) buffer().next()));
        }
    }

    public void requestMessages() {
        this.org$apache$pekko$kafka$internal$BaseSingleSourceLogic$$requested = true;
        this.org$apache$pekko$kafka$internal$BaseSingleSourceLogic$$requestId++;
        log().debug("Requesting messages, requestId: {}, partitions: {}", BoxesRunTime.boxToInteger(this.org$apache$pekko$kafka$internal$BaseSingleSourceLogic$$requestId), tps());
        consumerActor().tell(KafkaConsumerActor$Internal$RequestMessages$.MODULE$.apply(this.org$apache$pekko$kafka$internal$BaseSingleSourceLogic$$requestId, tps()), sourceActor().ref());
    }

    public void postStop() {
        onShutdown();
        super.postStop();
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public void performShutdown() {
        log().info("Completing");
    }
}
