/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.AsyncCommitPolicy;
import cn.leancloud.kafka.consumer.AutoCommitConsumerConfigs;
import cn.leancloud.kafka.consumer.AutoCommitPolicy;
import cn.leancloud.kafka.consumer.BasicConsumerConfigs;
import cn.leancloud.kafka.consumer.CommitPolicy;
import cn.leancloud.kafka.consumer.ConsumerRecordHandler;
import cn.leancloud.kafka.consumer.ImmediateExecutorService;
import cn.leancloud.kafka.consumer.KafkaConfigsChecker;
import cn.leancloud.kafka.consumer.LcKafkaConsumer;
import cn.leancloud.kafka.consumer.NoOpCommitPolicy;
import cn.leancloud.kafka.consumer.PartialAsyncCommitPolicy;
import cn.leancloud.kafka.consumer.PartialSyncCommitPolicy;
import cn.leancloud.kafka.consumer.SyncCommitPolicy;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class LcKafkaConsumerBuilder<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(LcKafkaConsumerBuilder.class);
    private Duration pollTimeout = Duration.ofMillis(100L);
    private int maxPendingAsyncCommits = 10;
    private Duration gracefulShutdownTimeout = Duration.ofSeconds(10L);
    private ExecutorService workerPool = ImmediateExecutorService.INSTANCE;
    private boolean shutdownWorkerPoolOnStop = false;
    private Duration handleRecordTimeout = Duration.ZERO;
    private Map<String, Object> configs;
    private ConsumerRecordHandler<K, V> consumerRecordHandler;
    @Nullable
    private Consumer<K, V> consumer;
    @Nullable
    private Deserializer<K> keyDeserializer;
    @Nullable
    private Deserializer<V> valueDeserializer;
    @Nullable
    private CommitPolicy<K, V> policy;
    @Nullable
    private Duration recommitInterval;

    public static <K, V> LcKafkaConsumerBuilder<K, V> newBuilder(Map<String, Object> kafkaConfigs, ConsumerRecordHandler<K, V> consumerRecordHandler) {
        Objects.requireNonNull(kafkaConfigs, "kafkaConfigs");
        Objects.requireNonNull(consumerRecordHandler, "consumerRecordHandler");
        return new LcKafkaConsumerBuilder<K, V>(new HashMap<String, Object>(kafkaConfigs), consumerRecordHandler);
    }

    public static <K, V> LcKafkaConsumerBuilder<K, V> newBuilder(Map<String, Object> kafkaConfigs, ConsumerRecordHandler<K, V> consumerRecordHandler, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        Objects.requireNonNull(kafkaConfigs, "kafkaConfigs");
        Objects.requireNonNull(consumerRecordHandler, "consumerRecordHandler");
        Objects.requireNonNull(keyDeserializer, "keyDeserializer");
        Objects.requireNonNull(valueDeserializer, "valueDeserializer");
        return new LcKafkaConsumerBuilder<K, V>(new HashMap<String, Object>(kafkaConfigs), consumerRecordHandler, keyDeserializer, valueDeserializer);
    }

    private static void requireArgument(boolean expression, String template, Object ... args) {
        if (!expression) {
            throw new IllegalArgumentException(String.format(template, args));
        }
    }

    private LcKafkaConsumerBuilder(Map<String, Object> kafkaConsumerConfigs, ConsumerRecordHandler<K, V> consumerRecordHandler) {
        this(kafkaConsumerConfigs, consumerRecordHandler, null, null);
    }

    private LcKafkaConsumerBuilder(Map<String, Object> kafkaConsumerConfigs, ConsumerRecordHandler<K, V> consumerRecordHandler, @Nullable Deserializer<K> keyDeserializer, @Nullable Deserializer<V> valueDeserializer) {
        this.configs = kafkaConsumerConfigs;
        this.consumerRecordHandler = consumerRecordHandler;
        this.keyDeserializer = keyDeserializer;
        this.valueDeserializer = valueDeserializer;
    }

    public LcKafkaConsumerBuilder<K, V> pollTimeoutMillis(long pollTimeoutMillis) {
        LcKafkaConsumerBuilder.requireArgument(pollTimeoutMillis >= 0L, "pollTimeoutMillis: %s (expect >= 0)", pollTimeoutMillis);
        this.pollTimeout = Duration.ofMillis(pollTimeoutMillis);
        return this;
    }

    public LcKafkaConsumerBuilder<K, V> pollTimeout(Duration pollTimeout) {
        Objects.requireNonNull(pollTimeout, "pollTimeout");
        LcKafkaConsumerBuilder.requireArgument(!pollTimeout.isNegative(), "pollTimeout: %s (expect positive or zero duration)", pollTimeout);
        this.pollTimeout = pollTimeout;
        return this;
    }

    public LcKafkaConsumerBuilder<K, V> handleRecordTimeoutMillis(long handleRecordTimeoutMillis) {
        LcKafkaConsumerBuilder.requireArgument(handleRecordTimeoutMillis >= 0L, "handleRecordTimeoutMillis: %s (expect >= 0)", handleRecordTimeoutMillis);
        this.handleRecordTimeout = Duration.ofMillis(handleRecordTimeoutMillis);
        return this;
    }

    public LcKafkaConsumerBuilder<K, V> handleRecordTimeout(Duration handleRecordTimeout) {
        Objects.requireNonNull(handleRecordTimeout, "handleRecordTimeout");
        LcKafkaConsumerBuilder.requireArgument(!handleRecordTimeout.isNegative(), "handleRecordTimeout: %s (expect positive or zero duration)", handleRecordTimeout);
        this.handleRecordTimeout = handleRecordTimeout;
        return this;
    }

    public LcKafkaConsumerBuilder<K, V> gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
        LcKafkaConsumerBuilder.requireArgument(gracefulShutdownTimeoutMillis >= 0L, "gracefulShutdownTimeoutMillis: %s (expected >= 0)", gracefulShutdownTimeoutMillis);
        this.gracefulShutdownTimeout = Duration.ofMillis(gracefulShutdownTimeoutMillis);
        return this;
    }

    public LcKafkaConsumerBuilder<K, V> gracefulShutdownTimeout(Duration gracefulShutdownTimeout) {
        Objects.requireNonNull(gracefulShutdownTimeout, "gracefulShutdownTimeout");
        LcKafkaConsumerBuilder.requireArgument(!gracefulShutdownTimeout.isNegative(), "gracefulShutdownTimeout: %s (expect positive or zero duration)", gracefulShutdownTimeout);
        this.gracefulShutdownTimeout = gracefulShutdownTimeout;
        return this;
    }

    public LcKafkaConsumerBuilder<K, V> maxPendingAsyncCommits(int maxPendingAsyncCommits) {
        LcKafkaConsumerBuilder.requireArgument(maxPendingAsyncCommits > 0, "maxPendingAsyncCommits: %s (expect > 0)", maxPendingAsyncCommits);
        this.maxPendingAsyncCommits = maxPendingAsyncCommits;
        return this;
    }

    public LcKafkaConsumerBuilder<K, V> recommitIntervalInMillis(long recommitIntervalInMillis) {
        LcKafkaConsumerBuilder.requireArgument(recommitIntervalInMillis > 0L, "recommitIntervalInMillis: %s (expected > 0)", recommitIntervalInMillis);
        this.recommitInterval = Duration.ofMillis(recommitIntervalInMillis);
        return this;
    }

    public LcKafkaConsumerBuilder<K, V> recommitInterval(Duration recommitInterval) {
        Objects.requireNonNull(recommitInterval, "recommitInterval");
        LcKafkaConsumerBuilder.requireArgument(!recommitInterval.isNegative() && !recommitInterval.isZero(), "recommitInterval: %s (expect positive duration)", recommitInterval);
        this.recommitInterval = recommitInterval;
        return this;
    }

    LcKafkaConsumerBuilder<K, V> mockKafkaConsumer(Consumer<K, V> mockedConsumer) {
        Objects.requireNonNull(mockedConsumer, "consumer");
        if (mockedConsumer instanceof KafkaConsumer) {
            throw new IllegalArgumentException("need a mocked Consumer");
        }
        this.consumer = mockedConsumer;
        return this;
    }

    public LcKafkaConsumerBuilder<K, V> workerPool(ExecutorService workerPool, boolean shutdownOnStop) {
        Objects.requireNonNull(workerPool, "workerPool");
        this.workerPool = workerPool;
        this.shutdownWorkerPoolOnStop = shutdownOnStop;
        return this;
    }

    public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildAuto() {
        this.checkConfigs(AutoCommitConsumerConfigs.values());
        this.consumer = this.buildConsumer(true);
        this.policy = this.workerPool == ImmediateExecutorService.INSTANCE ? NoOpCommitPolicy.getInstance() : new AutoCommitPolicy<K, V>(this.consumer);
        return this.doBuild();
    }

    public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildSync() {
        this.consumer = this.buildConsumer(false);
        this.policy = new SyncCommitPolicy<K, V>(this.consumer, this.getRecommitInterval());
        return this.doBuild();
    }

    public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildPartialSync() {
        this.consumer = this.buildConsumer(false);
        this.policy = new PartialSyncCommitPolicy<K, V>(this.consumer, this.getRecommitInterval());
        return this.doBuild();
    }

    public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildAsync() {
        this.consumer = this.buildConsumer(false);
        this.policy = new AsyncCommitPolicy<K, V>(this.consumer, this.getRecommitInterval(), this.maxPendingAsyncCommits);
        return this.doBuild();
    }

    public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildPartialAsync() {
        this.consumer = this.buildConsumer(false);
        this.policy = new PartialAsyncCommitPolicy<K, V>(this.consumer, this.getRecommitInterval(), this.maxPendingAsyncCommits);
        return this.doBuild();
    }

    Consumer<K, V> getConsumer() {
        assert (this.consumer != null);
        return this.consumer;
    }

    ConsumerRecordHandler<K, V> getConsumerRecordHandler() {
        return this.consumerRecordHandler;
    }

    ExecutorService getWorkerPool() {
        return this.workerPool;
    }

    boolean isShutdownWorkerPoolOnStop() {
        return this.shutdownWorkerPoolOnStop;
    }

    Duration getPollTimeout() {
        return this.pollTimeout;
    }

    Duration gracefulShutdownTimeout() {
        return this.gracefulShutdownTimeout;
    }

    Duration handleRecordTimeout() {
        return this.handleRecordTimeout;
    }

    CommitPolicy<K, V> getPolicy() {
        assert (this.policy != null);
        return this.policy;
    }

    private Consumer<K, V> buildConsumer(boolean autoCommit) {
        this.checkConfigs(BasicConsumerConfigs.values());
        BasicConsumerConfigs.ENABLE_AUTO_COMMIT.set(this.configs, Boolean.toString(autoCommit));
        if (this.keyDeserializer != null ? !$assertionsDisabled && this.valueDeserializer == null : !$assertionsDisabled && this.valueDeserializer != null) {
            throw new AssertionError();
        }
        if (this.consumer != null) {
            assert (!(this.consumer instanceof KafkaConsumer));
            return this.consumer;
        }
        return new KafkaConsumer(this.configs, this.keyDeserializer, this.valueDeserializer);
    }

    private Duration getRecommitInterval() {
        if (this.recommitInterval == null) {
            logger.warn("Recommit interval is not set for a non-automatic commit consumer, the default interval of 1 hour will be used.");
            this.recommitInterval = Duration.ofHours(1L);
        }
        return this.recommitInterval;
    }

    private void checkConfigs(KafkaConfigsChecker[] checkers) {
        for (KafkaConfigsChecker check : checkers) {
            check.check(this.configs);
        }
    }

    private <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> doBuild() {
        LcKafkaConsumer c = new LcKafkaConsumer(this);
        return c;
    }
}

