package org.apache.pekko.kafka.internal;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.kafka.ConsumerMessage;
import org.apache.pekko.kafka.ConsumerSettings;
import org.apache.pekko.kafka.internal.SubSourceLogic;
import org.apache.pekko.kafka.internal.TransactionalSourceLogic;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.stage.AsyncCallback;
import scala.None$;
import scala.PartialFunction;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyVals;
import scala.runtime.LazyVals$;
import scala.runtime.LazyVals$Evaluating$;
import scala.runtime.LazyVals$NullValue$;
import scala.runtime.ScalaRunTime$;

/* compiled from: TransactionalSources.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/kafka/internal/TransactionalSubSourceStageLogic.class */
public final class TransactionalSubSourceStageLogic<K, V> extends SubSourceStageLogic<K, V, ConsumerMessage.TransactionalMessage<K, V>> implements TransactionalMessageBuilder<K, V> {
    public static final long OFFSET$0 = LazyVals$.MODULE$.getOffsetStatic(TransactionalSubSourceStageLogic.class.getDeclaredField("committedMarker$lzy2"));
    private final TopicPartition tp;
    public final ActorRef org$apache$pekko$kafka$internal$TransactionalSubSourceStageLogic$$consumerActor;
    private final int actorNumber;
    public final ConsumerSettings<K, V> org$apache$pekko$kafka$internal$TransactionalSubSourceStageLogic$$consumerSettings;
    public final TransactionalSourceLogic.InFlightRecords.Impl org$apache$pekko$kafka$internal$TransactionalSubSourceStageLogic$$inFlightRecords;
    private final boolean fromPartitionedSource;
    private volatile Object committedMarker$lzy2;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public TransactionalSubSourceStageLogic(SourceShape<ConsumerMessage.TransactionalMessage<K, V>> sourceShape, TopicPartition topicPartition, ActorRef actorRef, AsyncCallback<SubSourceLogic.SubSourceStageLogicControl> asyncCallback, AsyncCallback<Tuple2<TopicPartition, SubSourceLogic.SubSourceCancellationStrategy>> asyncCallback2, int i, ConsumerSettings<K, V> consumerSettings) {
        super(sourceShape, topicPartition, actorRef, asyncCallback, asyncCallback2, i);
        this.tp = topicPartition;
        this.org$apache$pekko$kafka$internal$TransactionalSubSourceStageLogic$$consumerActor = actorRef;
        this.actorNumber = i;
        this.org$apache$pekko$kafka$internal$TransactionalSubSourceStageLogic$$consumerSettings = consumerSettings;
        this.org$apache$pekko$kafka$internal$TransactionalSubSourceStageLogic$$inFlightRecords = TransactionalSourceLogic$InFlightRecords$.MODULE$.empty();
        this.fromPartitionedSource = true;
    }

    @Override // org.apache.pekko.kafka.internal.MessageBuilder
    public /* bridge */ /* synthetic */ ConsumerMessage.TransactionalMessage createMessage(ConsumerRecord consumerRecord) {
        ConsumerMessage.TransactionalMessage createMessage;
        createMessage = createMessage(consumerRecord);
        return createMessage;
    }

    private SourceShape<ConsumerMessage.TransactionalMessage<K, V>> shape$accessor() {
        return super.shape();
    }

    @Override // org.apache.pekko.kafka.internal.TransactionalMessageBuilderBase
    public String groupId() {
        return (String) this.org$apache$pekko$kafka$internal$TransactionalSubSourceStageLogic$$consumerSettings.properties().apply("group.id");
    }

    @Override // org.apache.pekko.kafka.internal.TransactionalMessageBuilderBase
    public void onMessage(ConsumerRecord<K, V> consumerRecord) {
        this.org$apache$pekko$kafka$internal$TransactionalSubSourceStageLogic$$inFlightRecords.add((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((TopicPartition) Predef$.MODULE$.ArrowAssoc(new TopicPartition(consumerRecord.topic(), consumerRecord.partition())), BoxesRunTime.boxToLong(consumerRecord.offset()))})));
    }

    @Override // org.apache.pekko.kafka.internal.TransactionalMessageBuilderBase
    public boolean fromPartitionedSource() {
        return this.fromPartitionedSource;
    }

    @Override // org.apache.pekko.kafka.internal.SubSourceStageLogic
    public PartialFunction<Tuple2<ActorRef, Object>, BoxedUnit> messageHandling() {
        return super.messageHandling().orElse(drainHandling()).orElse(new TransactionalSubSourceStageLogic$$anon$11(this));
    }

    @Override // org.apache.pekko.kafka.internal.SubSourceStageLogic
    public SubSourceLogic.SubSourceCancellationStrategy onDownstreamFinishSubSourceCancellationStrategy() {
        return SubSourceLogic$DoNothing$.MODULE$;
    }

    private PartialFunction<Tuple2<ActorRef, Object>, BoxedUnit> shuttingDownReceive() {
        return drainHandling().orElse(new TransactionalSubSourceStageLogic$$anon$12(this));
    }

    @Override // org.apache.pekko.kafka.internal.SubSourceStageLogic, org.apache.pekko.kafka.internal.PromiseControl
    public void performShutdown() {
        log().debug("#{} Completing SubSource for partition {}", BoxesRunTime.boxToInteger(this.actorNumber), this.tp);
        setKeepGoing(true);
        if (!isClosed(shape$accessor().out())) {
            complete(shape$accessor().out());
        }
        subSourceActor().become(shuttingDownReceive());
        drainAndComplete();
    }

    private void drainAndComplete() {
        subSourceActor().ref().tell(TransactionalSourceLogic$Drain$.MODULE$.apply(this.org$apache$pekko$kafka$internal$TransactionalSubSourceStageLogic$$inFlightRecords.assigned(), None$.MODULE$, TransactionalSubSourceStageLogic$DrainingComplete$.MODULE$), subSourceActor().ref());
    }

    private PartialFunction<Tuple2<ActorRef, Object>, BoxedUnit> drainHandling() {
        return new TransactionalSubSourceStageLogic$$anon$13(this);
    }

    @Override // org.apache.pekko.kafka.internal.TransactionalMessageBuilderBase
    public CommittedMarker committedMarker() {
        Object obj = this.committedMarker$lzy2;
        if (obj instanceof CommittedMarker) {
            return (CommittedMarker) obj;
        }
        if (obj == LazyVals$NullValue$.MODULE$) {
            return null;
        }
        return (CommittedMarker) committedMarker$lzyINIT2();
    }

    private Object committedMarker$lzyINIT2() {
        while (true) {
            Object obj = this.committedMarker$lzy2;
            if (obj == null) {
                if (LazyVals$.MODULE$.objCAS(this, OFFSET$0, (Object) null, LazyVals$Evaluating$.MODULE$)) {
                    LazyVals$NullValue$ lazyVals$NullValue$ = null;
                    try {
                        LazyVals$NullValue$ apply = TransactionalSourceLogic$CommittedMarkerRef$.MODULE$.apply(subSourceActor().ref(), this.org$apache$pekko$kafka$internal$TransactionalSubSourceStageLogic$$consumerSettings.commitTimeout(), materializer().executionContext());
                        if (apply == null) {
                            lazyVals$NullValue$ = LazyVals$NullValue$.MODULE$;
                        } else {
                            lazyVals$NullValue$ = apply;
                        }
                        return apply;
                    } finally {
                        if (!LazyVals$.MODULE$.objCAS(this, OFFSET$0, LazyVals$Evaluating$.MODULE$, lazyVals$NullValue$)) {
                            LazyVals.Waiting waiting = (LazyVals.Waiting) this.committedMarker$lzy2;
                            LazyVals$.MODULE$.objCAS(this, OFFSET$0, waiting, lazyVals$NullValue$);
                            waiting.countDown();
                        }
                    }
                }
            } else {
                if (!(obj instanceof LazyVals.LazyValControlState)) {
                    return obj;
                }
                if (obj == LazyVals$Evaluating$.MODULE$) {
                    LazyVals$.MODULE$.objCAS(this, OFFSET$0, obj, new LazyVals.Waiting());
                } else {
                    if (!(obj instanceof LazyVals.Waiting)) {
                        return null;
                    }
                    ((LazyVals.Waiting) obj).await();
                }
            }
        }
    }

    public Materializer protected$materializer() {
        return materializer();
    }

    public static final /* synthetic */ long org$apache$pekko$kafka$internal$TransactionalSubSourceStageLogic$$anon$13$$_$applyOrElse$$anonfun$3(OffsetAndMetadata offsetAndMetadata) {
        return offsetAndMetadata.offset() - 1;
    }

    public static final ActorRef org$apache$pekko$kafka$internal$TransactionalSubSourceStageLogic$$anon$13$$_$_$$anonfun$3(ActorRef actorRef) {
        return actorRef;
    }
}
