/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.CommitPolicy;
import cn.leancloud.kafka.consumer.ConsumerSeekDestination;
import cn.leancloud.kafka.consumer.Fetcher;
import cn.leancloud.kafka.consumer.LcKafkaConsumerBuilder;
import cn.leancloud.kafka.consumer.RebalanceListener;
import cn.leancloud.kafka.consumer.UnsubscribedStatus;
import cn.leancloud.kafka.consumer.VisibleForTesting;
import java.io.Closeable;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;

public final class LcKafkaConsumer<K, V>
implements Closeable {
    private final Consumer<K, V> consumer;
    private final Thread fetcherThread;
    private final Fetcher<K, V> fetcher;
    private final ExecutorService workerPool;
    private final CommitPolicy<K, V> policy;
    private final boolean shutdownWorkerPoolOnStop;
    private final ConsumerSeekDestination forceSeekTo;
    private volatile State state = State.INIT;

    LcKafkaConsumer(LcKafkaConsumerBuilder<K, V> builder) {
        this.consumer = builder.getConsumer();
        this.workerPool = builder.getWorkerPool();
        this.shutdownWorkerPoolOnStop = builder.isShutdownWorkerPoolOnStop();
        this.policy = builder.getPolicy();
        this.fetcher = new Fetcher<K, V>(builder);
        this.forceSeekTo = builder.getForceSeekTo();
        this.fetcherThread = new Thread(this.fetcher);
    }

    public synchronized CompletableFuture<UnsubscribedStatus> subscribe(Collection<String> topics) {
        Objects.requireNonNull(topics, "topics");
        if (topics.isEmpty()) {
            throw new IllegalArgumentException("subscribe empty topics");
        }
        for (String topic : topics) {
            if (topic != null && !topic.trim().isEmpty()) continue;
            throw new IllegalArgumentException("topic collection to subscribe to cannot contain null or empty topic");
        }
        this.ensureInInit();
        this.consumer.subscribe(topics, new RebalanceListener<K, V>(this.consumer, this.policy, this.forceSeekTo));
        this.fetcherThread.setName(this.fetcherThreadName(topics));
        this.fetcherThread.start();
        this.state = State.SUBSCRIBED;
        return this.setupUnsubscribedFuture();
    }

    public synchronized CompletableFuture<UnsubscribedStatus> subscribe(Pattern pattern) {
        Objects.requireNonNull(pattern, "pattern");
        this.ensureInInit();
        this.consumer.subscribe(pattern, new RebalanceListener<K, V>(this.consumer, this.policy, this.forceSeekTo));
        this.fetcherThread.setName(this.fetcherThreadName(pattern));
        this.fetcherThread.start();
        this.state = State.SUBSCRIBED;
        return this.setupUnsubscribedFuture();
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return this.consumer.metrics();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        if (this.closed()) {
            return;
        }
        LcKafkaConsumer lcKafkaConsumer = this;
        synchronized (lcKafkaConsumer) {
            if (this.closed()) {
                return;
            }
            this.state = State.CLOSED;
        }
        try {
            if (Thread.currentThread() != this.fetcherThread) {
                this.fetcher.close();
                this.fetcherThread.join();
            }
            if (this.shutdownWorkerPoolOnStop) {
                this.workerPool.shutdown();
                this.workerPool.awaitTermination(1L, TimeUnit.DAYS);
            }
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }

    @VisibleForTesting
    boolean subscribed() {
        return this.state.code() > State.INIT.code();
    }

    @VisibleForTesting
    boolean closed() {
        return this.state == State.CLOSED;
    }

    @VisibleForTesting
    CommitPolicy<K, V> policy() {
        return this.policy;
    }

    private CompletableFuture<UnsubscribedStatus> setupUnsubscribedFuture() {
        assert (!this.fetcher.unsubscribeStatusFuture().isDone());
        CompletableFuture<UnsubscribedStatus> ret = new CompletableFuture<UnsubscribedStatus>();
        this.fetcher.unsubscribeStatusFuture().thenAccept(status -> {
            try {
                this.close();
            }
            finally {
                ret.complete((UnsubscribedStatus)((Object)status));
            }
        });
        return ret;
    }

    private void ensureInInit() {
        if (this.subscribed() || this.closed()) {
            throw new IllegalStateException("consumer is closed or have subscribed to some topics or pattern");
        }
    }

    private String fetcherThreadName(Collection<String> topics) {
        String firstTopic = topics.iterator().next();
        String postfix = firstTopic.substring(0, Integer.min(50, firstTopic.length()));
        postfix = postfix + (topics.size() > 1 || firstTopic.length() > 50 ? "..." : "");
        return "kafka-fetcher-for-" + postfix;
    }

    private String fetcherThreadName(Pattern pattern) {
        String patternInString = pattern.toString();
        String postfix = patternInString.substring(0, Integer.min(50, patternInString.length()));
        postfix = postfix + (patternInString.length() > 50 ? "..." : "");
        return "kafka-fetcher-for-" + postfix;
    }

    static enum State {
        INIT(0),
        SUBSCRIBED(1),
        CLOSED(2);

        private int code;

        private State(int code) {
            this.code = code;
        }

        int code() {
            return this.code;
        }
    }
}

