/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar.internal;

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
import org.apache.flink.streaming.connectors.pulsar.internal.ExceptionProxy;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarDeserializationSchema;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicState;
import org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread;
import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils;
import org.apache.flink.types.Row;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
public class RowReaderThread
extends ReaderThread<Row> {
    private static final Logger log = LoggerFactory.getLogger(RowReaderThread.class);
    private final Schema<?> schema;
    private final boolean useExtendField;

    public RowReaderThread(PulsarFetcher owner, PulsarTopicState state, ClientConfigurationData clientConf, Map<String, Object> readerConf, int pollTimeoutMs, SchemaInfo pulsarSchema, PulsarDeserializationSchema<Row> deserializer, ExceptionProxy exceptionProxy, boolean useExtendField) {
        super(owner, state, clientConf, readerConf, deserializer, pollTimeoutMs, exceptionProxy);
        this.schema = SchemaUtils.getPulsarSchema(pulsarSchema);
        this.useExtendField = useExtendField;
    }

    @Override
    protected void createActualReader() throws PulsarClientException, ExecutionException {
        ReaderBuilder readerBuilder = CachedPulsarClient.getOrCreate(this.clientConf).newReader(this.schema).topic(this.topicRange.getTopic()).startMessageId(this.startMessageId).startMessageIdInclusive().loadConf(this.readerConf);
        if (!this.topicRange.isFullRange()) {
            readerBuilder.keyHashRange(new Range[]{this.topicRange.getPulsarRange()});
        }
        this.reader = readerBuilder.create();
    }

    private Row useMetaData(Row origin, Message message) {
        Row resultRow = new Row(origin.getArity() + PulsarOptions.META_FIELD_NAMES.size());
        for (int i = 0; i < origin.getArity(); ++i) {
            resultRow.setField(i, origin.getField(i));
        }
        int metaStartIdx = origin.getArity();
        if (message.hasKey()) {
            resultRow.setField(metaStartIdx, (Object)message.getKeyBytes());
        } else {
            resultRow.setField(metaStartIdx, null);
        }
        resultRow.setField(metaStartIdx + 1, (Object)message.getTopicName());
        resultRow.setField(metaStartIdx + 2, (Object)message.getMessageId().toByteArray());
        resultRow.setField(metaStartIdx + 3, (Object)LocalDateTime.ofInstant(Instant.ofEpochMilli(message.getPublishTime()), ZoneId.systemDefault()));
        if (message.getEventTime() > 0L) {
            resultRow.setField(metaStartIdx + 4, (Object)LocalDateTime.ofInstant(Instant.ofEpochMilli(message.getEventTime()), ZoneId.systemDefault()));
        } else {
            resultRow.setField(metaStartIdx + 4, null);
        }
        return resultRow;
    }

    @Override
    protected void emitRecord(Message<?> message) throws IOException {
        try {
            MessageId messageId = message.getMessageId();
            Row record = (Row)this.deserializer.deserialize(message);
            if (this.useExtendField) {
                record = this.useMetaData(record, message);
            }
            if (this.deserializer.isEndOfStream(record)) {
                return;
            }
            if (record.getField(0) == null) {
                throw new RuntimeException("record index 0 is null");
            }
            this.owner.emitRecord(record, this.state, messageId);
        }
        catch (Throwable e) {
            e.printStackTrace();
            throw e;
        }
    }
}

