/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.sql.presto;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.netty.buffer.ByteBuf;
import io.prestosql.spi.ErrorCodeSupplier;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.StandardErrorCode;
import io.prestosql.spi.connector.RecordCursor;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.DateTimeEncoding;
import io.prestosql.spi.type.DateType;
import io.prestosql.spi.type.IntegerType;
import io.prestosql.spi.type.RealType;
import io.prestosql.spi.type.SmallintType;
import io.prestosql.spi.type.TimeType;
import io.prestosql.spi.type.TimestampType;
import io.prestosql.spi.type.TimestampWithTimeZoneType;
import io.prestosql.spi.type.TinyintType;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.VarbinaryType;
import io.prestosql.spi.type.VarcharType;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.sql.presto.KeyValueSchemaHandler;
import org.apache.pulsar.sql.presto.PulsarColumnHandle;
import org.apache.pulsar.sql.presto.PulsarConnectorCache;
import org.apache.pulsar.sql.presto.PulsarConnectorConfig;
import org.apache.pulsar.sql.presto.PulsarConnectorMetricsTracker;
import org.apache.pulsar.sql.presto.PulsarInternalColumn;
import org.apache.pulsar.sql.presto.PulsarSchemaHandlers;
import org.apache.pulsar.sql.presto.PulsarSplit;
import org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider;
import org.apache.pulsar.sql.presto.SchemaHandler;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;

public class PulsarRecordCursor
implements RecordCursor {
    private List<PulsarColumnHandle> columnHandles;
    private PulsarSplit pulsarSplit;
    private PulsarConnectorConfig pulsarConnectorConfig;
    private ReadOnlyCursor cursor;
    private SpscArrayQueue<RawMessage> messageQueue;
    private SpscArrayQueue<Entry> entryQueue;
    private Object currentRecord;
    private RawMessage currentMessage;
    private Map<String, PulsarInternalColumn> internalColumnMap = PulsarInternalColumn.getInternalFieldsMap();
    private SchemaHandler schemaHandler;
    private int maxBatchSize;
    private long completedBytes = 0L;
    private ReadEntries readEntries;
    private DeserializeEntries deserializeEntries;
    private TopicName topicName;
    private PulsarConnectorMetricsTracker metricsTracker;
    private boolean readOffloaded;
    private long startTime;
    private final long splitSize;
    private long entriesProcessed = 0L;
    private int partition = -1;
    private static final Logger log = Logger.get(PulsarRecordCursor.class);

    public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig pulsarConnectorConfig) {
        PulsarConnectorCache pulsarConnectorCache;
        this.splitSize = pulsarSplit.getSplitSize();
        this.startTime = System.nanoTime();
        try {
            pulsarConnectorCache = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig);
        }
        catch (Exception e) {
            log.error((Throwable)e, "Failed to initialize Pulsar connector cache");
            this.close();
            throw new RuntimeException(e);
        }
        OffloadPolicies offloadPolicies = pulsarSplit.getOffloadPolicies();
        if (offloadPolicies != null) {
            offloadPolicies.setOffloadersDirectory(pulsarConnectorConfig.getOffloadersDirectory());
            offloadPolicies.setManagedLedgerOffloadMaxThreads(Integer.valueOf(pulsarConnectorConfig.getManagedLedgerOffloadMaxThreads()));
        }
        this.initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, pulsarConnectorCache.getManagedLedgerFactory(), pulsarConnectorCache.getManagedLedgerConfig(TopicName.get((String)"persistent", (NamespaceName)NamespaceName.get((String)pulsarSplit.getSchemaName()), (String)pulsarSplit.getTableName()).getNamespaceObject(), offloadPolicies, pulsarConnectorConfig), new PulsarConnectorMetricsTracker(pulsarConnectorCache.getStatsProvider()));
    }

    PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
        this.splitSize = pulsarSplit.getSplitSize();
        this.initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, managedLedgerConfig, pulsarConnectorMetricsTracker);
    }

    private void initialize(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
        this.columnHandles = columnHandles;
        this.pulsarSplit = pulsarSplit;
        this.partition = TopicName.getPartitionIndex((String)pulsarSplit.getTableName());
        this.pulsarConnectorConfig = pulsarConnectorConfig;
        this.maxBatchSize = pulsarConnectorConfig.getMaxEntryReadBatchSize();
        this.messageQueue = new SpscArrayQueue(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
        this.entryQueue = new SpscArrayQueue(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
        this.topicName = TopicName.get((String)"persistent", (NamespaceName)NamespaceName.get((String)pulsarSplit.getSchemaName()), (String)pulsarSplit.getTableName());
        this.metricsTracker = pulsarConnectorMetricsTracker;
        this.readOffloaded = pulsarConnectorConfig.getManagedLedgerOffloadDriver() != null;
        this.pulsarConnectorConfig = pulsarConnectorConfig;
        this.schemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(this.topicName, this.pulsarConnectorConfig, pulsarSplit.getSchemaInfo(), columnHandles, PulsarSqlSchemaInfoProvider.Type.NONE);
        log.info("Initializing split with parameters: %s", new Object[]{pulsarSplit});
        try {
            this.cursor = this.getCursor(TopicName.get((String)"persistent", (NamespaceName)NamespaceName.get((String)pulsarSplit.getSchemaName()), (String)pulsarSplit.getTableName()), pulsarSplit.getStartPosition(), managedLedgerFactory, managedLedgerConfig);
        }
        catch (InterruptedException | ManagedLedgerException e) {
            log.error((Throwable)e, "Failed to get read only cursor");
            this.close();
            throw new RuntimeException(e);
        }
    }

    private ReadOnlyCursor getCursor(TopicName topicName, Position startPosition, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig) throws ManagedLedgerException, InterruptedException {
        ReadOnlyCursor cursor = managedLedgerFactory.openReadOnlyCursor(topicName.getPersistenceNamingEncoding(), startPosition, managedLedgerConfig);
        return cursor;
    }

    public long getCompletedBytes() {
        return this.completedBytes;
    }

    public long getReadTimeNanos() {
        return 0L;
    }

    public Type getType(int field) {
        Preconditions.checkArgument((field < this.columnHandles.size() ? 1 : 0) != 0, (Object)"Invalid field index");
        return this.columnHandles.get(field).getType();
    }

    public boolean advanceNextPosition() {
        if (this.readEntries == null) {
            this.deserializeEntries = new DeserializeEntries();
            this.deserializeEntries.start();
            this.readEntries = new ReadEntries();
            this.readEntries.run();
        }
        if (this.currentMessage != null) {
            this.currentMessage.release();
            this.currentMessage = null;
        }
        while (true) {
            if (this.readEntries.hasFinished()) {
                return false;
            }
            if (this.messageQueue.capacity() - this.messageQueue.size() > 0) {
                this.readEntries.run();
            }
            this.currentMessage = (RawMessage)this.messageQueue.poll();
            if (this.currentMessage != null) break;
            try {
                Thread.sleep(1L);
                this.metricsTracker.register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(1L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        this.metricsTracker.start_RECORD_DESERIALIZE_TIME();
        if (this.schemaHandler instanceof KeyValueSchemaHandler) {
            ByteBuf keyByteBuf = null;
            if (this.currentMessage.getKeyBytes().isPresent()) {
                keyByteBuf = (ByteBuf)this.currentMessage.getKeyBytes().get();
            }
            this.currentRecord = this.schemaHandler.deserialize(keyByteBuf, this.currentMessage.getData(), this.currentMessage.getSchemaVersion());
        } else {
            this.currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData(), this.currentMessage.getSchemaVersion());
        }
        this.metricsTracker.incr_NUM_RECORD_DESERIALIZED();
        this.metricsTracker.end_RECORD_DESERIALIZE_TIME();
        return true;
    }

    @VisibleForTesting
    Object getRecord(int fieldIndex) {
        String fieldName;
        PulsarInternalColumn pulsarInternalColumn;
        if (this.currentRecord == null) {
            return null;
        }
        PulsarColumnHandle pulsarColumnHandle = this.columnHandles.get(fieldIndex);
        Object data = pulsarColumnHandle.isInternal() ? ((pulsarInternalColumn = this.internalColumnMap.get(fieldName = this.columnHandles.get(fieldIndex).getName())) instanceof PulsarInternalColumn.PartitionColumn ? Integer.valueOf(this.partition) : pulsarInternalColumn.getData(this.currentMessage)) : this.schemaHandler.extractField(fieldIndex, this.currentRecord);
        return data;
    }

    public boolean getBoolean(int field) {
        this.checkFieldType(field, Boolean.TYPE);
        return (Boolean)this.getRecord(field);
    }

    public long getLong(int field) {
        this.checkFieldType(field, Long.TYPE);
        Object record = this.getRecord(field);
        Type type = this.getType(field);
        if (type.equals(BigintType.BIGINT)) {
            return ((Number)record).longValue();
        }
        if (type.equals(DateType.DATE)) {
            return ((Number)record).longValue();
        }
        if (type.equals(IntegerType.INTEGER)) {
            return ((Number)record).intValue();
        }
        if (type.equals(RealType.REAL)) {
            return Float.floatToIntBits(((Number)record).floatValue());
        }
        if (type.equals(SmallintType.SMALLINT)) {
            return ((Number)record).shortValue();
        }
        if (type.equals(TimeType.TIME)) {
            return ((Number)record).longValue();
        }
        if (type.equals(TimestampType.TIMESTAMP)) {
            if (record instanceof String) {
                return Long.parseLong((String)record);
            }
            return ((Number)record).longValue();
        }
        if (type.equals(TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE)) {
            return DateTimeEncoding.packDateTimeWithZone((long)((Number)record).longValue(), (int)0);
        }
        if (type.equals(TinyintType.TINYINT)) {
            return Byte.parseByte(record.toString());
        }
        throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.NOT_SUPPORTED, "Unsupported type " + this.getType(field));
    }

    public double getDouble(int field) {
        this.checkFieldType(field, Double.TYPE);
        Object record = this.getRecord(field);
        return (Double)record;
    }

    public Slice getSlice(int field) {
        this.checkFieldType(field, Slice.class);
        Object record = this.getRecord(field);
        Type type = this.getType(field);
        if (type == VarcharType.VARCHAR) {
            return Slices.utf8Slice((String)record.toString());
        }
        if (type == VarbinaryType.VARBINARY) {
            return Slices.wrappedBuffer((byte[])this.toBytes(record));
        }
        throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.NOT_SUPPORTED, "Unsupported type " + type);
    }

    private byte[] toBytes(Object record) {
        if (record instanceof ByteBuffer) {
            ByteBuffer byteBuffer = (ByteBuffer)record;
            if (byteBuffer.hasArray()) {
                return byteBuffer.array();
            }
            byte[] bytes = new byte[byteBuffer.position()];
            byteBuffer.flip();
            byteBuffer.get(bytes);
            return bytes;
        }
        if (record instanceof ByteBuf) {
            ByteBuf byteBuf = (ByteBuf)record;
            if (byteBuf.hasArray()) {
                return byteBuf.array();
            }
            byte[] bytes = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(bytes);
            return bytes;
        }
        try {
            return (byte[])record;
        }
        catch (Exception e) {
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.NOT_SUPPORTED, "Unsupported type " + record.getClass().getName());
        }
    }

    public Object getObject(int field) {
        throw new UnsupportedOperationException();
    }

    public boolean isNull(int field) {
        Object record = this.getRecord(field);
        return record == null;
    }

    public void close() {
        log.info("Closing cursor record");
        if (this.currentMessage != null) {
            this.currentMessage.release();
        }
        if (this.messageQueue != null) {
            this.messageQueue.drain(RawMessage::release);
        }
        if (this.entryQueue != null) {
            this.entryQueue.drain(Entry::release);
        }
        if (this.deserializeEntries != null) {
            this.deserializeEntries.interrupt();
        }
        if (this.cursor != null) {
            try {
                this.cursor.close();
            }
            catch (Exception e) {
                log.error((Throwable)e);
            }
        }
        if (this.metricsTracker != null) {
            this.metricsTracker.register_TOTAL_EXECUTION_TIME(System.nanoTime() - this.startTime);
            this.metricsTracker.close();
        }
    }

    private void checkFieldType(int field, Class<?> expected) {
        Class actual = this.getType(field).getJavaType();
        Preconditions.checkArgument((actual == expected ? 1 : 0) != 0, (String)"Expected field %s to be type %s but is %s", (Object)field, expected, (Object)actual);
    }

    @VisibleForTesting
    SchemaHandler getSchemaHandler() {
        return this.schemaHandler;
    }

    @VisibleForTesting
    class ReadEntries
    implements AsyncCallbacks.ReadEntriesCallback {
        private boolean isDone = false;
        private final AtomicLong outstandingReadsRequests = new AtomicLong(1L);

        ReadEntries() {
        }

        public void run() {
            if (this.outstandingReadsRequests.get() > 0L) {
                if (!PulsarRecordCursor.this.cursor.hasMoreEntries() || ((PositionImpl)PulsarRecordCursor.this.cursor.getReadPosition()).compareTo(PulsarRecordCursor.this.pulsarSplit.getEndPosition()) >= 0) {
                    this.isDone = true;
                } else {
                    int batchSize = Math.min(PulsarRecordCursor.this.maxBatchSize, PulsarRecordCursor.this.entryQueue.capacity() - PulsarRecordCursor.this.entryQueue.size());
                    if (batchSize > 0) {
                        ReadOnlyCursorImpl readOnlyCursorImpl = (ReadOnlyCursorImpl)PulsarRecordCursor.this.cursor;
                        if (!PulsarRecordCursor.this.readOffloaded && readOnlyCursorImpl.getCurrentLedgerInfo().hasOffloadContext()) {
                            log.warn("Ledger %s is offloaded for topic %s. Ignoring it because offloader is not configured", new Object[]{readOnlyCursorImpl.getCurrentLedgerInfo().getLedgerId(), PulsarRecordCursor.this.pulsarSplit.getTableName()});
                            long numEntries = readOnlyCursorImpl.getCurrentLedgerInfo().getEntries();
                            long entriesToSkip = numEntries - ((PositionImpl)PulsarRecordCursor.this.cursor.getReadPosition()).getEntryId() + 1L;
                            PulsarRecordCursor.this.cursor.skipEntries(Math.toIntExact(entriesToSkip));
                            PulsarRecordCursor.this.entriesProcessed = PulsarRecordCursor.this.entriesProcessed + entriesToSkip;
                        } else {
                            this.outstandingReadsRequests.decrementAndGet();
                            PulsarRecordCursor.this.cursor.asyncReadEntries(batchSize, this, System.nanoTime());
                        }
                        PulsarRecordCursor.this.metricsTracker.incr_READ_ATTEMPTS_SUCCESS();
                    } else {
                        PulsarRecordCursor.this.metricsTracker.incr_READ_ATTEMPTS_FAIL();
                    }
                }
            }
        }

        @Override
        public void readEntriesComplete(final List<Entry> entries, Object ctx) {
            PulsarRecordCursor.this.entryQueue.fill((MessagePassingQueue.Supplier)new MessagePassingQueue.Supplier<Entry>(){
                private int i = 0;

                public Entry get() {
                    Entry entry = (Entry)entries.get(this.i);
                    ++this.i;
                    return entry;
                }
            }, entries.size());
            this.outstandingReadsRequests.incrementAndGet();
            PulsarRecordCursor.this.metricsTracker.register_READ_LATENCY_PER_BATCH_SUCCESS(System.nanoTime() - (Long)ctx);
            PulsarRecordCursor.this.metricsTracker.incr_NUM_ENTRIES_PER_BATCH_SUCCESS(entries.size());
        }

        public boolean hasFinished() {
            return PulsarRecordCursor.this.messageQueue.isEmpty() && this.isDone && this.outstandingReadsRequests.get() >= 1L && PulsarRecordCursor.this.splitSize <= PulsarRecordCursor.this.entriesProcessed;
        }

        @Override
        public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
            log.debug((Throwable)exception, "Failed to read entries from topic %s", new Object[]{PulsarRecordCursor.this.topicName.toString()});
            this.outstandingReadsRequests.incrementAndGet();
            PulsarRecordCursor.this.metricsTracker.register_READ_LATENCY_PER_BATCH_FAIL(System.nanoTime() - (Long)ctx);
            PulsarRecordCursor.this.metricsTracker.incr_NUM_ENTRIES_PER_BATCH_FAIL(PulsarRecordCursor.this.maxBatchSize);
        }
    }

    @VisibleForTesting
    class DeserializeEntries
    implements Runnable {
        protected boolean isRunning = false;
        private final Thread thread;

        public DeserializeEntries() {
            this.thread = new Thread((Runnable)this, "derserialize-thread-split-" + PulsarRecordCursor.this.pulsarSplit.getSplitId());
        }

        public void interrupt() {
            this.isRunning = false;
            this.thread.interrupt();
        }

        public void start() {
            this.thread.start();
        }

        @Override
        public void run() {
            this.isRunning = true;
            while (this.isRunning) {
                int read = PulsarRecordCursor.this.entryQueue.drain((MessagePassingQueue.Consumer)new MessagePassingQueue.Consumer<Entry>(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void accept(Entry entry) {
                        try {
                            long bytes = entry.getDataBuffer().readableBytes();
                            PulsarRecordCursor.this.completedBytes = PulsarRecordCursor.this.completedBytes + bytes;
                            PulsarRecordCursor.this.metricsTracker.register_BYTES_READ(bytes);
                            if (((PositionImpl)entry.getPosition()).compareTo(PulsarRecordCursor.this.pulsarSplit.getEndPosition()) >= 0) {
                                return;
                            }
                            PulsarRecordCursor.this.metricsTracker.start_ENTRY_DESERIALIZE_TIME();
                            try {
                                MessageParser.parseMessage((TopicName)PulsarRecordCursor.this.topicName, (long)entry.getLedgerId(), (long)entry.getEntryId(), (ByteBuf)entry.getDataBuffer(), message -> {
                                    try {
                                        PulsarRecordCursor.this.metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
                                        while (!PulsarRecordCursor.this.messageQueue.offer((Object)message)) {
                                            Thread.sleep(1L);
                                        }
                                        PulsarRecordCursor.this.metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
                                        PulsarRecordCursor.this.metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
                                    }
                                    catch (InterruptedException interruptedException) {
                                        // empty catch block
                                    }
                                }, (int)PulsarRecordCursor.this.pulsarConnectorConfig.getMaxMessageSize());
                            }
                            catch (IOException e) {
                                log.error((Throwable)e, "Failed to parse message from pulsar topic %s", new Object[]{PulsarRecordCursor.this.topicName.toString()});
                                throw new RuntimeException(e);
                            }
                            PulsarRecordCursor.this.metricsTracker.end_ENTRY_DESERIALIZE_TIME();
                            PulsarRecordCursor.this.metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
                        }
                        finally {
                            PulsarRecordCursor.this.entriesProcessed++;
                            entry.release();
                        }
                    }
                });
                if (read > 0) continue;
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException e) {
                    return;
                }
            }
        }
    }
}

