/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.batch.connectors.pulsar;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.batch.connectors.pulsar.CachedClients;
import org.apache.flink.batch.connectors.pulsar.InputLedger;
import org.apache.flink.common.ConnectorConfig;
import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.api.BookKeeper;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.pulsar.shade.org.jctools.queues.MessagePassingQueue;
import org.apache.pulsar.shade.org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class InputSplitReader<T> {
    private static final Logger log = LoggerFactory.getLogger(InputSplitReader.class);
    private ConnectorConfig connectorConfig;
    private int partitionId;
    private List<InputLedger> ledgersToRead;
    private CachedClients cachedClients;
    private BookKeeper bkClient;
    private ManagedLedgerConfig mlConfig;
    private LedgerOffloader offloader;
    private Executor executor;
    private SpscArrayQueue<RawMessage> messageQueue;
    private SpscArrayQueue<Entry> entryQueue;
    private Thread deserializerThread;
    private RawMessage currentMessage;
    private AtomicLong outstandingLedgerReads = new AtomicLong(0L);
    private int ledgerSize;
    private int currentLedgerIdx = 0;
    private long partitionSize;
    private long entriesProcessed = 0L;
    private Map<Long, TopicName> ledger2Topic = new ConcurrentHashMap<Long, TopicName>();

    public InputSplitReader(ConnectorConfig connectorConfig, int partitionId, List<InputLedger> ledgersToRead) throws Exception {
        this.connectorConfig = connectorConfig;
        this.partitionId = partitionId;
        this.ledgersToRead = ledgersToRead;
        this.cachedClients = CachedClients.getInstance(connectorConfig);
        this.bkClient = this.cachedClients.getManagedLedgerFactory().getBookKeeper();
        this.mlConfig = this.cachedClients.getManagedLedgerConfig();
        this.offloader = this.mlConfig.getLedgerOffloader();
        this.executor = Executors.newSingleThreadExecutor();
        this.messageQueue = new SpscArrayQueue(connectorConfig.getMaxSplitMessageQueueSize());
        this.entryQueue = new SpscArrayQueue(connectorConfig.getMaxSplitEntryQueueSize());
        this.ledgerSize = ledgersToRead.size();
        this.partitionSize = ledgersToRead.stream().mapToLong(InputLedger::ledgerSize).sum();
    }

    public boolean next() throws IOException {
        if (this.deserializerThread == null) {
            this.deserializerThread = new DeserializeEntries();
            this.deserializerThread.start();
            this.getEntries(this.ledgersToRead.get(this.currentLedgerIdx));
            ++this.currentLedgerIdx;
        }
        if (this.currentMessage != null) {
            this.currentMessage.release();
            this.currentMessage = null;
        }
        while (!this.messageQueue.isEmpty() || this.entriesProcessed < this.partitionSize) {
            if (this.currentLedgerIdx < this.ledgerSize && this.outstandingLedgerReads.get() == 0L) {
                this.getEntries(this.ledgersToRead.get(this.currentLedgerIdx));
                ++this.currentLedgerIdx;
            }
            this.currentMessage = (RawMessage)this.messageQueue.poll();
            if (this.currentMessage != null) {
                return true;
            }
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        return false;
    }

    public abstract T deserialize(RawMessage var1) throws IOException;

    public T get() throws IOException {
        return this.deserialize(this.currentMessage);
    }

    public void close() throws Exception {
        if (this.currentMessage != null) {
            this.currentMessage.release();
        }
        if (this.messageQueue != null) {
            this.messageQueue.drain(RawMessage::release);
        }
        if (this.entryQueue != null) {
            this.entryQueue.drain(Entry::release);
        }
        if (this.deserializerThread != null) {
            this.deserializerThread.interrupt();
        }
    }

    CompletableFuture<Object> getEntries(InputLedger info) {
        this.outstandingLedgerReads.incrementAndGet();
        return ((CompletableFuture)this.getLedgerHandle(info).thenComposeAsync(readHandle -> {
            try (final LedgerEntries entries = readHandle.read(info.getStartEntryId(), info.getEndEntryId());){
                this.entryQueue.fill((MessagePassingQueue.Supplier)new MessagePassingQueue.Supplier<Entry>(){
                    private int i = 0;

                    public Entry get() {
                        EntryImpl impl = EntryImpl.create((LedgerEntry)entries.getEntry((long)this.i));
                        ++this.i;
                        return impl;
                    }
                }, Lists.newArrayList((Iterator)entries.iterator()).size());
            }
            catch (Exception e) {
                throw new CompletionException(e);
            }
            return null;
        }, this.executor)).whenComplete((t, throwable) -> {
            if (throwable != null) {
                log.error(String.format("Get entry failed due to %s", throwable.getMessage()), throwable);
            } else {
                log.info(String.format("Finished extracting entries for ledger %s", info.toString()));
                this.outstandingLedgerReads.decrementAndGet();
            }
        });
    }

    CompletableFuture<ReadHandle> getLedgerHandle(InputLedger ledger) {
        this.ledger2Topic.put(ledger.getLedgerId(), TopicName.get((String)ledger.getTopic()));
        if (ledger.getUuid() != null) {
            return this.offloader.readOffloaded(ledger.getLedgerId(), ledger.getUuid(), ledger.getOffloaderDrvierMeta());
        }
        return this.bkClient.newOpenLedgerOp().withRecovery(false).withLedgerId(ledger.getLedgerId()).withDigestType(this.mlConfig.getDigestType()).withPassword(this.mlConfig.getPassword()).execute();
    }

    class DeserializeEntries
    extends Thread {
        protected boolean isRunning;

        DeserializeEntries() {
            super("derserialize-thread-split-" + InputSplitReader.this.partitionId);
            this.isRunning = false;
        }

        @Override
        public void interrupt() {
            this.isRunning = false;
        }

        @Override
        public void run() {
            this.isRunning = true;
            while (this.isRunning) {
                int read = InputSplitReader.this.entryQueue.drain(entry -> {
                    TopicName tp = InputSplitReader.this.ledger2Topic.getOrDefault(entry.getLedgerId(), TopicName.get((String)"DUMMY"));
                    try {
                        MessageParser.parseMessage((TopicName)tp, (long)entry.getLedgerId(), (long)entry.getEntryId(), (ByteBuf)entry.getDataBuffer(), message -> {
                            try {
                                while (!InputSplitReader.this.messageQueue.offer((Object)message)) {
                                    Thread.sleep(1L);
                                }
                            }
                            catch (InterruptedException interruptedException) {
                                // empty catch block
                            }
                        }, (int)InputSplitReader.this.connectorConfig.getMaxMessageSize());
                    }
                    catch (IOException e) {
                        log.error(String.format("Failed to parse message from pulsar topic %s", tp.toString()), (Throwable)e);
                    }
                    finally {
                        InputSplitReader.this.entriesProcessed++;
                        entry.release();
                    }
                });
                if (read > 0) continue;
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException e) {
                    return;
                }
            }
        }
    }
}

