/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TableViewConfigurationData;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.shade.client.api.v2.Reader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableViewImpl<T>
implements TableView<T> {
    private static final Logger log = LoggerFactory.getLogger(TableViewImpl.class);
    private final TableViewConfigurationData conf;
    private final ConcurrentMap<String, T> data;
    private final Map<String, T> immutableData;
    private final CompletableFuture<Reader<T>> reader;
    private final List<BiConsumer<String, T>> listeners;
    private final ReentrantLock listenersMutex;
    private final boolean isPersistentTopic;
    private TopicCompactionStrategy<T> compactionStrategy;

    TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) {
        CryptoKeyReader cryptoKeyReader;
        this.conf = conf;
        this.isPersistentTopic = conf.getTopicName().startsWith(TopicDomain.persistent.toString());
        this.data = new ConcurrentHashMap<String, T>();
        this.immutableData = Collections.unmodifiableMap(this.data);
        this.listeners = new ArrayList<BiConsumer<String, T>>();
        this.listenersMutex = new ReentrantLock();
        this.compactionStrategy = TopicCompactionStrategy.load("table-view", conf.getTopicCompactionStrategyClassName());
        ReaderBuilder<T> readerBuilder = client.newReader(schema).topic(conf.getTopicName()).startMessageId(MessageId.earliest).autoUpdatePartitions(true).autoUpdatePartitionsInterval((int)conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS).poolMessages(true).subscriptionName(conf.getSubscriptionName());
        if (this.isPersistentTopic) {
            readerBuilder.readCompacted(true);
        }
        if ((cryptoKeyReader = conf.getCryptoKeyReader()) != null) {
            readerBuilder.cryptoKeyReader(cryptoKeyReader);
        }
        readerBuilder.cryptoFailureAction(conf.getCryptoFailureAction());
        this.reader = readerBuilder.createAsync();
    }

    CompletableFuture<TableView<T>> start() {
        return ((CompletableFuture)this.reader.thenCompose(reader -> {
            if (!this.isPersistentTopic) {
                this.readTailMessages((Reader<T>)reader);
                return CompletableFuture.completedFuture(reader);
            }
            return this.readAllExistingMessages((Reader<T>)reader);
        })).thenApply(__ -> this);
    }

    @Override
    public int size() {
        return this.data.size();
    }

    @Override
    public boolean isEmpty() {
        return this.data.isEmpty();
    }

    @Override
    public boolean containsKey(String key) {
        return this.data.containsKey(key);
    }

    @Override
    public T get(String key) {
        return (T)this.data.get(key);
    }

    @Override
    public Set<Map.Entry<String, T>> entrySet() {
        return this.immutableData.entrySet();
    }

    @Override
    public Set<String> keySet() {
        return this.immutableData.keySet();
    }

    @Override
    public Collection<T> values() {
        return this.immutableData.values();
    }

    @Override
    public void forEach(BiConsumer<String, T> action) {
        this.data.forEach(action);
    }

    @Override
    public void listen(BiConsumer<String, T> action) {
        try {
            this.listenersMutex.lock();
            this.listeners.add(action);
        }
        finally {
            this.listenersMutex.unlock();
        }
    }

    @Override
    public void forEachAndListen(BiConsumer<String, T> action) {
        try {
            this.listenersMutex.lock();
            this.forEach(action);
            this.listeners.add(action);
        }
        finally {
            this.listenersMutex.unlock();
        }
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        return this.reader.thenCompose(Reader::closeAsync);
    }

    @Override
    public void close() throws PulsarClientException {
        try {
            this.closeAsync().get();
        }
        catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleMessage(Message<T> msg) {
        block14: {
            try {
                Object cur;
                if (!msg.hasKey()) break block14;
                String key = msg.getKey();
                Object t2 = cur = msg.size() > 0 ? (Object)msg.getValue() : null;
                if (log.isDebugEnabled()) {
                    log.debug("Applying message from topic {}. key={} value={}", new Object[]{this.conf.getTopicName(), key, cur});
                }
                boolean update = true;
                if (this.compactionStrategy != null) {
                    Object prev = this.data.get(key);
                    boolean bl = update = !this.compactionStrategy.shouldKeepLeft(prev, cur);
                    if (!update) {
                        log.info("Skipped the message from topic {}. key={} value={} prev={}", new Object[]{this.conf.getTopicName(), key, cur, prev});
                        this.compactionStrategy.handleSkippedMessage(key, cur);
                    }
                }
                if (!update) break block14;
                try {
                    this.listenersMutex.lock();
                    if (null == cur) {
                        this.data.remove(key);
                    } else {
                        this.data.put(key, cur);
                    }
                    for (BiConsumer<String, String> biConsumer : this.listeners) {
                        try {
                            biConsumer.accept(key, (String)cur);
                        }
                        catch (Throwable t3) {
                            log.error("Table view listener raised an exception", t3);
                        }
                    }
                }
                finally {
                    this.listenersMutex.unlock();
                }
            }
            finally {
                msg.release();
            }
        }
    }

    private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
        long startTime = System.nanoTime();
        AtomicLong messagesRead = new AtomicLong();
        CompletableFuture<Reader<T>> future = new CompletableFuture<Reader<T>>();
        this.readAllExistingMessages(reader, future, startTime, messagesRead);
        return future;
    }

    private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Reader<T>> future, long startTime, AtomicLong messagesRead) {
        reader.hasMessageAvailableAsync().thenAccept(hasMessage -> {
            if (hasMessage.booleanValue()) {
                ((CompletableFuture)reader.readNextAsync().thenAccept(msg -> {
                    messagesRead.incrementAndGet();
                    this.handleMessage((Message<T>)msg);
                    this.readAllExistingMessages(reader, future, startTime, messagesRead);
                })).exceptionally(ex -> {
                    if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
                        log.error("Reader {} was closed while reading existing messages.", (Object)reader.getTopic(), ex);
                    } else {
                        log.warn("Reader {} was interrupted while reading existing messages. ", (Object)reader.getTopic(), ex);
                    }
                    future.completeExceptionally((Throwable)ex);
                    return null;
                });
            } else {
                long endTime = System.nanoTime();
                long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
                log.info("Started table view for topic {} - Replayed {} messages in {} seconds", new Object[]{reader.getTopic(), messagesRead, (double)durationMillis / 1000.0});
                future.complete(reader);
                this.readTailMessages(reader);
            }
        });
    }

    private void readTailMessages(Reader<T> reader) {
        ((CompletableFuture)reader.readNextAsync().thenAccept(msg -> {
            this.handleMessage((Message<T>)msg);
            this.readTailMessages(reader);
        })).exceptionally(ex -> {
            if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
                log.error("Reader {} was closed while reading tail messages.", (Object)reader.getTopic(), ex);
            } else {
                log.warn("Reader {} was interrupted while reading tail messages. Retrying..", (Object)reader.getTopic(), ex);
                this.readTailMessages(reader);
            }
            return null;
        });
    }
}

