package org.apache.pekko.kafka.internal;

import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.kafka.internal.SubSourceLogic;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import scala.PartialFunction;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.IterableOnce;
import scala.collection.Iterator;
import scala.collection.StringOps$;
import scala.collection.immutable.Set;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;

/* compiled from: SubSourceLogic.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/kafka/internal/SubSourceStageLogic.class */
public abstract class SubSourceStageLogic<K, V, Msg> extends GraphStageLogic implements PromiseControl, MetricsControl, StageIdLogging, MessageBuilder<K, V, Msg>, SourceLogicBuffer<K, V, Msg>, StageIdLogging, MessageBuilder, SourceLogicBuffer {
    private Promise org$apache$pekko$kafka$internal$PromiseControl$$shutdownPromise;
    private Promise org$apache$pekko$kafka$internal$PromiseControl$$stopPromise;
    private AsyncCallback org$apache$pekko$kafka$internal$PromiseControl$$controlCallback;
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private String org$apache$pekko$kafka$internal$InstanceId$$instanceId;
    private LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$_log;
    private Iterator buffer;
    private AsyncCallback filterRevokedPartitionsCB;
    private final SourceShape shape;
    public final TopicPartition org$apache$pekko$kafka$internal$SubSourceStageLogic$$tp;
    public final ActorRef org$apache$pekko$kafka$internal$SubSourceStageLogic$$consumerActor;
    private final AsyncCallback<SubSourceLogic.SubSourceStageLogicControl> subSourceStartedCb;
    public final AsyncCallback<Tuple2<TopicPartition, SubSourceLogic.SubSourceCancellationStrategy>> org$apache$pekko$kafka$internal$SubSourceStageLogic$$subSourceCancelledCb;
    private final int actorNumber;
    private final KafkaConsumerActor$Internal$RequestMessages requestMessages;
    public boolean org$apache$pekko$kafka$internal$SubSourceStageLogic$$requested;
    private GraphStageLogic.StageActor subSourceActor;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SubSourceStageLogic(SourceShape<Msg> sourceShape, TopicPartition topicPartition, ActorRef actorRef, AsyncCallback<SubSourceLogic.SubSourceStageLogicControl> asyncCallback, AsyncCallback<Tuple2<TopicPartition, SubSourceLogic.SubSourceCancellationStrategy>> asyncCallback2, int i) {
        super(sourceShape);
        this.shape = sourceShape;
        this.org$apache$pekko$kafka$internal$SubSourceStageLogic$$tp = topicPartition;
        this.org$apache$pekko$kafka$internal$SubSourceStageLogic$$consumerActor = actorRef;
        this.subSourceStartedCb = asyncCallback;
        this.org$apache$pekko$kafka$internal$SubSourceStageLogic$$subSourceCancelledCb = asyncCallback2;
        this.actorNumber = i;
        PromiseControl.$init$(this);
        StageLogging.$init$(this);
        org$apache$pekko$kafka$internal$InstanceId$_setter_$org$apache$pekko$kafka$internal$InstanceId$$instanceId_$eq(StringOps$.MODULE$.take$extension(Predef$.MODULE$.augmentString(UUID.randomUUID().toString()), 5));
        StageIdLogging.$init$((StageIdLogging) this);
        SourceLogicBuffer.$init$(this);
        this.requestMessages = KafkaConsumerActor$Internal$RequestMessages$.MODULE$.apply(0, (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition})));
        this.org$apache$pekko$kafka$internal$SubSourceStageLogic$$requested = false;
        setHandler(sourceShape.out(), new OutHandler(this) { // from class: org.apache.pekko.kafka.internal.SubSourceStageLogic$$anon$5
            private final /* synthetic */ SubSourceStageLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public void onPull() {
                this.$outer.org$apache$pekko$kafka$internal$SubSourceStageLogic$$pump();
            }

            public void onDownstreamFinish(Throwable th) {
                this.$outer.org$apache$pekko$kafka$internal$SubSourceStageLogic$$subSourceCancelledCb.invoke(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((TopicPartition) Predef$.MODULE$.ArrowAssoc(this.$outer.org$apache$pekko$kafka$internal$SubSourceStageLogic$$tp), this.$outer.onDownstreamFinishSubSourceCancellationStrategy()));
                OutHandler.onDownstreamFinish$(this, th);
            }
        });
        Statics.releaseFence();
    }

    @Override // org.apache.pekko.kafka.scaladsl.Consumer.Control
    public /* bridge */ /* synthetic */ Future drainAndShutdown(Future future, ExecutionContext executionContext) {
        Future drainAndShutdown;
        drainAndShutdown = drainAndShutdown(future, executionContext);
        return drainAndShutdown;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public Promise org$apache$pekko$kafka$internal$PromiseControl$$shutdownPromise() {
        return this.org$apache$pekko$kafka$internal$PromiseControl$$shutdownPromise;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public Promise org$apache$pekko$kafka$internal$PromiseControl$$stopPromise() {
        return this.org$apache$pekko$kafka$internal$PromiseControl$$stopPromise;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public AsyncCallback org$apache$pekko$kafka$internal$PromiseControl$$controlCallback() {
        return this.org$apache$pekko$kafka$internal$PromiseControl$$controlCallback;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public void org$apache$pekko$kafka$internal$PromiseControl$_setter_$org$apache$pekko$kafka$internal$PromiseControl$$shutdownPromise_$eq(Promise promise) {
        this.org$apache$pekko$kafka$internal$PromiseControl$$shutdownPromise = promise;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public void org$apache$pekko$kafka$internal$PromiseControl$_setter_$org$apache$pekko$kafka$internal$PromiseControl$$stopPromise_$eq(Promise promise) {
        this.org$apache$pekko$kafka$internal$PromiseControl$$stopPromise = promise;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public void org$apache$pekko$kafka$internal$PromiseControl$_setter_$org$apache$pekko$kafka$internal$PromiseControl$$controlCallback_$eq(AsyncCallback asyncCallback) {
        this.org$apache$pekko$kafka$internal$PromiseControl$$controlCallback = asyncCallback;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public /* bridge */ /* synthetic */ void performStop() {
        performStop();
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public /* bridge */ /* synthetic */ boolean onStop() {
        boolean onStop;
        onStop = onStop();
        return onStop;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public /* bridge */ /* synthetic */ boolean onShutdown() {
        boolean onShutdown;
        onShutdown = onShutdown();
        return onShutdown;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl, org.apache.pekko.kafka.scaladsl.Consumer.Control
    public /* bridge */ /* synthetic */ Future stop() {
        Future stop;
        stop = stop();
        return stop;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl, org.apache.pekko.kafka.scaladsl.Consumer.Control
    public /* bridge */ /* synthetic */ Future shutdown() {
        Future shutdown;
        shutdown = shutdown();
        return shutdown;
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl, org.apache.pekko.kafka.scaladsl.Consumer.Control
    public /* bridge */ /* synthetic */ Future isShutdown() {
        Future isShutdown;
        isShutdown = isShutdown();
        return isShutdown;
    }

    @Override // org.apache.pekko.kafka.scaladsl.Consumer.Control, org.apache.pekko.kafka.internal.MetricsControl
    public /* bridge */ /* synthetic */ Future metrics() {
        Future metrics;
        metrics = metrics();
        return metrics;
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public /* bridge */ /* synthetic */ Class logSource() {
        return StageLogging.logSource$(this);
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public String org$apache$pekko$kafka$internal$InstanceId$$instanceId() {
        return this.org$apache$pekko$kafka$internal$InstanceId$$instanceId;
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public void org$apache$pekko$kafka$internal$InstanceId$_setter_$org$apache$pekko$kafka$internal$InstanceId$$instanceId_$eq(String str) {
        this.org$apache$pekko$kafka$internal$InstanceId$$instanceId = str;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$_log() {
        return this.org$apache$pekko$kafka$internal$StageIdLogging$$_log;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public void org$apache$pekko$kafka$internal$StageIdLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$kafka$internal$StageIdLogging$$_log = loggingAdapter;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* bridge */ /* synthetic */ String idLogPrefix() {
        String idLogPrefix;
        idLogPrefix = idLogPrefix();
        return idLogPrefix;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        LoggingAdapter log;
        log = log();
        return log;
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicBuffer
    public Iterator buffer() {
        return this.buffer;
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicBuffer
    public AsyncCallback filterRevokedPartitionsCB() {
        return this.filterRevokedPartitionsCB;
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicBuffer
    public void buffer_$eq(Iterator iterator) {
        this.buffer = iterator;
    }

    @Override // org.apache.pekko.kafka.internal.SourceLogicBuffer
    public void org$apache$pekko$kafka$internal$SourceLogicBuffer$_setter_$filterRevokedPartitionsCB_$eq(AsyncCallback asyncCallback) {
        this.filterRevokedPartitionsCB = asyncCallback;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* synthetic */ LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$super$log() {
        return StageLogging.log$(this);
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public SourceShape<Msg> shape() {
        return this.shape;
    }

    @Override // org.apache.pekko.kafka.internal.MetricsControl
    public ExecutionContext executionContext() {
        return materializer().executionContext();
    }

    @Override // org.apache.pekko.kafka.internal.MetricsControl
    public Future<ActorRef> consumerFuture() {
        return Future$.MODULE$.successful(this.org$apache$pekko$kafka$internal$SubSourceStageLogic$$consumerActor);
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public String id() {
        String id;
        StringBuilder sb = new StringBuilder(1);
        id = id();
        return sb.append(id).append("#").append(this.actorNumber).toString();
    }

    public GraphStageLogic.StageActor subSourceActor() {
        return this.subSourceActor;
    }

    public void subSourceActor_$eq(GraphStageLogic.StageActor stageActor) {
        this.subSourceActor = stageActor;
    }

    public void preStart() {
        super.preStart();
        log().info("Starting. Partition {}", this.org$apache$pekko$kafka$internal$SubSourceStageLogic$$tp);
        subSourceActor_$eq(getStageActor(messageHandling()));
        subSourceActor().watch(this.org$apache$pekko$kafka$internal$SubSourceStageLogic$$consumerActor);
        this.subSourceStartedCb.invoke(SubSourceLogic$SubSourceStageLogicControl$.MODULE$.apply(this.org$apache$pekko$kafka$internal$SubSourceStageLogic$$tp, SubSourceLogic$ControlAndStageActor$.MODULE$.apply(this, subSourceActor().ref()), filterRevokedPartitionsCB()));
        this.org$apache$pekko$kafka$internal$SubSourceStageLogic$$consumerActor.tell(KafkaConsumerActor$Internal$RegisterSubStage$.MODULE$.apply(this.requestMessages.tps()), subSourceActor().ref());
    }

    public PartialFunction<Tuple2<ActorRef, Object>, BoxedUnit> messageHandling() {
        return new SubSourceStageLogic$$anon$6(this);
    }

    public SubSourceLogic.SubSourceCancellationStrategy onDownstreamFinishSubSourceCancellationStrategy() {
        return buffer().hasNext() ? SubSourceLogic$SeekToOffsetAndReEmit$.MODULE$.apply(((ConsumerRecord) buffer().next()).offset()) : SubSourceLogic$ReEmit$.MODULE$;
    }

    public void postStop() {
        onShutdown();
        super.postStop();
    }

    @Override // org.apache.pekko.kafka.internal.PromiseControl
    public void performShutdown() {
        log().info("Completing. Partition {}", this.org$apache$pekko$kafka$internal$SubSourceStageLogic$$tp);
        completeStage();
    }

    public void org$apache$pekko$kafka$internal$SubSourceStageLogic$$pump() {
        while (isAvailable(shape().out())) {
            if (!buffer().hasNext()) {
                if (this.org$apache$pekko$kafka$internal$SubSourceStageLogic$$requested) {
                    return;
                }
                this.org$apache$pekko$kafka$internal$SubSourceStageLogic$$requested = true;
                this.org$apache$pekko$kafka$internal$SubSourceStageLogic$$consumerActor.tell(this.requestMessages, subSourceActor().ref());
                return;
            }
            push(shape().out(), createMessage((ConsumerRecord) buffer().next()));
        }
    }

    public static final IterableOnce org$apache$pekko$kafka$internal$SubSourceStageLogic$$anon$6$$_$applyOrElse$$anonfun$1(KafkaConsumerActor$Internal$Messages kafkaConsumerActor$Internal$Messages) {
        return kafkaConsumerActor$Internal$Messages.messages();
    }
}
