package com.goldensource.kafkautils;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.dispatch.MessageDispatcher;
import akka.kafka.AutoSubscription;
import akka.kafka.CommitterSettings;
import akka.kafka.CommitterSettings$;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings;
import akka.kafka.ConsumerSettings$;
import akka.kafka.Subscription;
import akka.kafka.Subscriptions$;
import akka.kafka.scaladsl.Committer$;
import akka.kafka.scaladsl.Consumer;
import akka.kafka.scaladsl.Consumer$;
import akka.kafka.scaladsl.Consumer$DrainingControl$;
import akka.kafka.scaladsl.Consumer$NoopControl$;
import akka.stream.Materializer;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.RestartSource$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import com.goldensource.kafkautils.MessageConsumerFactory;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Function0;
import scala.Function1;
import scala.Tuple2;
import scala.collection.immutable.Seq;
import scala.concurrent.Future;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: MessageConsumerFactory.scala */
/* loaded from: input_file:com/goldensource/kafkautils/MessageConsumerFactory$.class */
public final class MessageConsumerFactory$ implements MessageConsumerFactory {
    public static MessageConsumerFactory$ MODULE$;

    static {
        new MessageConsumerFactory$();
    }

    @Override // com.goldensource.kafkautils.MessageConsumerFactory
    public <T> AtomicReference<Consumer.Control> createConsumerFor(MessageConsumerConfiguration messageConsumerConfiguration, MessageConsumerFactory.ConsumeMessage<T> consumeMessage, ActorSystem actorSystem, Materializer materializer, Function1<ConsumerMessage.CommittableMessage<String, byte[]>, T> function1) {
        CommitterSettings withMaxBatch = CommitterSettings$.MODULE$.apply(actorSystem).withMaxBatch(messageConsumerConfiguration.commitBatchSize());
        AutoSubscription autoSubscription = Subscriptions$.MODULE$.topics(messageConsumerConfiguration.topics());
        AtomicReference<Consumer.Control> atomicReference = new AtomicReference<>(Consumer$NoopControl$.MODULE$);
        atomicReference.set(Consumer$DrainingControl$.MODULE$.apply(new Tuple2(atomicReference.get(), materializeSource((Source) createRestartSource(messageConsumerConfiguration).apply(() -> {
            return MODULE$.createConsumer(withMaxBatch, autoSubscription, atomicReference, messageConsumerConfiguration, consumeMessage, actorSystem, function1);
        }), materializer))));
        return atomicReference;
    }

    private <T> Future<Seq<Done>> materializeSource(Source<Done, NotUsed> source, Materializer materializer) {
        return (Future) ((Tuple2) source.toMat(Sink$.MODULE$.seq(), Keep$.MODULE$.both()).run(materializer))._2();
    }

    private <T> Function1<Function0<Source<T, ?>>, Source<T, NotUsed>> createRestartSource(MessageConsumerConfiguration messageConsumerConfiguration) {
        return function0 -> {
            return RestartSource$.MODULE$.onFailuresWithBackoff(messageConsumerConfiguration.minBackoff(), messageConsumerConfiguration.maxBackoff(), 0.2d, function0);
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> Source<Done, BoxedUnit> createConsumer(CommitterSettings committerSettings, Subscription subscription, AtomicReference<Consumer.Control> atomicReference, MessageConsumerConfiguration messageConsumerConfiguration, Function1<T, Future<Done>> function1, ActorSystem actorSystem, Function1<ConsumerMessage.CommittableMessage<String, byte[]>, T> function12) {
        MessageDispatcher lookup = actorSystem.dispatchers().lookup(messageConsumerConfiguration.dispatcher());
        return Consumer$.MODULE$.committableSource(createConsumerSettings(messageConsumerConfiguration, actorSystem), subscription).mapMaterializedValue(control -> {
            atomicReference.set(control);
            return BoxedUnit.UNIT;
        }).mapAsync(messageConsumerConfiguration.parallelism(), committableMessage -> {
            return ((Future) function1.apply(function12.apply(committableMessage))).map(done -> {
                return committableMessage.committableOffset();
            }, lookup);
        }).via(Committer$.MODULE$.flow(committerSettings.withMaxBatch(1L)));
    }

    private <T> ConsumerSettings<String, byte[]> createConsumerSettings(MessageConsumerConfiguration messageConsumerConfiguration, ActorSystem actorSystem) {
        return ConsumerSettings$.MODULE$.apply(actorSystem, new StringDeserializer(), new ByteArrayDeserializer()).withBootstrapServers(messageConsumerConfiguration.bootstrapServers()).withGroupId(messageConsumerConfiguration.groupId()).withPollInterval(messageConsumerConfiguration.pollInterval()).withPollTimeout(messageConsumerConfiguration.pollTimeout()).withCommitTimeout(messageConsumerConfiguration.messageCommitTimeout()).withDispatcher(messageConsumerConfiguration.dispatcher()).withProperty("session.timeout.ms", BoxesRunTime.boxToInteger(messageConsumerConfiguration.sessionTimeout()).toString()).withProperty("client.id", Unique$.MODULE$.cleanUniqueId()).withProperty("auto.offset.reset", "earliest");
    }

    private MessageConsumerFactory$() {
        MODULE$ = this;
    }
}
