package org.apache.samza.checkpoint.kafka;

import kafka.api.FetchRequestBuilder;
import kafka.api.FetchResponse;
import kafka.api.OffsetRequest$;
import kafka.common.ErrorMapping$;
import kafka.common.TopicAndPartition;
import kafka.consumer.SimpleConsumer;
import org.apache.samza.util.ExponentialSleepStrategy;
import org.apache.samza.util.KafkaUtil$;
import scala.Function1;
import scala.Function2;
import scala.Serializable;
import scala.Unit$;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.NonLocalReturnControl;

/* compiled from: KafkaCheckpointManager.scala */
/* loaded from: input_file:org/apache/samza/checkpoint/kafka/KafkaCheckpointManager$$anonfun$readLog$1.class */
public class KafkaCheckpointManager$$anonfun$readLog$1 extends AbstractFunction1<ExponentialSleepStrategy.RetryLoop, BoxedUnit> implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ KafkaCheckpointManager $outer;
    public final String entryType$1;
    public final Function1 shouldHandleEntry$2;
    public final Function2 handleEntry$1;
    private final Object nonLocalReturnKey1$1;

    public final void apply(ExponentialSleepStrategy.RetryLoop retryLoop) {
        SimpleConsumer org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$getConsumer = this.$outer.org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$getConsumer();
        TopicAndPartition topicAndPartition = new TopicAndPartition(this.$outer.org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$checkpointTopic, 0);
        try {
            LongRef longRef = new LongRef(BoxesRunTime.unboxToLong(this.$outer.startingOffset().getOrElse(new KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$1(this, org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$getConsumer, topicAndPartition))));
            this.$outer.info(new KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$4(this, longRef));
            long org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$getOffset = this.$outer.org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$getOffset(org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$getConsumer, topicAndPartition, OffsetRequest$.MODULE$.LatestTime());
            this.$outer.info(new KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$5(this, org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$getOffset));
            if (longRef.elem < 0) {
                this.$outer.info(new KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$6(this));
                throw new NonLocalReturnControl.mcV.sp(this.nonLocalReturnKey1$1, BoxedUnit.UNIT);
            }
            while (longRef.elem < org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$getOffset) {
                FetchResponse fetch = org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$getConsumer.fetch(new FetchRequestBuilder().addFetch(this.$outer.org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$checkpointTopic, 0, longRef.elem, this.$outer.org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$fetchSize).maxWait(500).minBytes(1).clientId(this.$outer.org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$clientId).build());
                if (fetch.hasError()) {
                    this.$outer.warn(new KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$7(this, fetch));
                    short errorCode = fetch.errorCode(this.$outer.org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$checkpointTopic, 0);
                    if (BoxesRunTime.boxToShort(ErrorMapping$.MODULE$.OffsetOutOfRangeCode()).equals(BoxesRunTime.boxToShort(errorCode))) {
                        this.$outer.warn(new KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$8(this));
                        throw new NonLocalReturnControl.mcV.sp(this.nonLocalReturnKey1$1, BoxedUnit.UNIT);
                    }
                    KafkaUtil$.MODULE$.maybeThrowException(errorCode);
                }
                fetch.messageSet(this.$outer.org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$checkpointTopic, 0).foreach(new KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9(this, longRef));
            }
            org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$getConsumer.close();
            retryLoop.done();
            Unit$ unit$ = Unit$.MODULE$;
        } catch (Throwable th) {
            org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$getConsumer.close();
            throw th;
        }
    }

    public /* synthetic */ KafkaCheckpointManager org$apache$samza$checkpoint$kafka$KafkaCheckpointManager$$anonfun$$$outer() {
        return this.$outer;
    }

    public final /* bridge */ /* synthetic */ Object apply(Object obj) {
        apply((ExponentialSleepStrategy.RetryLoop) obj);
        return BoxedUnit.UNIT;
    }

    public KafkaCheckpointManager$$anonfun$readLog$1(KafkaCheckpointManager kafkaCheckpointManager, String str, Function1 function1, Function2 function2, Object obj) {
        if (kafkaCheckpointManager == null) {
            throw new NullPointerException();
        }
        this.$outer = kafkaCheckpointManager;
        this.entryType$1 = str;
        this.shouldHandleEntry$2 = function1;
        this.handleEntry$1 = function2;
        this.nonLocalReturnKey1$1 = obj;
    }
}
