/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.batch.connectors.pulsar;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.batch.connectors.pulsar.CachedClients;
import org.apache.flink.batch.connectors.pulsar.InputLedger;
import org.apache.flink.batch.connectors.pulsar.InputSplitReader;
import org.apache.flink.batch.connectors.pulsar.PulsarInputSplit;
import org.apache.flink.batch.connectors.pulsar.SplitUtils;
import org.apache.flink.common.ConnectorConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.ByteBufUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarInputFormat<T>
extends RichInputFormat<T, PulsarInputSplit>
implements ResultTypeQueryable<T> {
    private static final Logger log = LoggerFactory.getLogger(PulsarInputFormat.class);
    private static final long serialVersionUID = 1L;
    private ConnectorConfig connectorConfig;
    private DeserializationSchema<T> deserializer;
    private transient Configuration parameters;
    private transient InputSplitReader<T> reader;

    public PulsarInputFormat(ConnectorConfig connectorConfig, DeserializationSchema<T> deserializer) {
        this.connectorConfig = connectorConfig;
        this.deserializer = deserializer;
    }

    public void configure(Configuration parameters) {
        this.parameters = parameters;
    }

    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
        return null;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public PulsarInputSplit[] createInputSplits(int minNumSplits) throws IOException {
        ArrayList<PulsarInputSplit> pulsarSplits = new ArrayList<PulsarInputSplit>();
        ClientConfigurationData clientConf = new ClientConfigurationData();
        clientConf.setAuthPluginClassName(this.connectorConfig.getAuthPluginClassName());
        clientConf.setAuthParams(this.connectorConfig.getAuthParams());
        clientConf.setServiceUrl(this.connectorConfig.getServiceUrl());
        HashMap caseInsensitiveParams = Maps.newHashMap();
        Optional.ofNullable(this.connectorConfig.getTopic()).ifPresent(value -> caseInsensitiveParams.put("topic", value));
        Optional.ofNullable(this.connectorConfig.getTopics()).ifPresent(value -> caseInsensitiveParams.put("topics", value));
        Optional.ofNullable(this.connectorConfig.getTopicsPattern()).ifPresent(value -> caseInsensitiveParams.put("topicspattern", value));
        try (PulsarMetadataReader reader = new PulsarMetadataReader(this.connectorConfig.getAdminUrl(), clientConf, "", caseInsensitiveParams, -1, -1);){
            Set topics = reader.getTopicPartitionsAll().stream().map(TopicRange::getTopic).collect(Collectors.toSet());
            ArrayList<InputLedger> allLedgers = new ArrayList<InputLedger>();
            for (String topic : topics) {
                Collection<InputLedger> ledgers = SplitUtils.getLedgersInBetween(topic, (MessageIdImpl)MessageId.earliest, (MessageIdImpl)MessageId.latest, CachedClients.getInstance(this.connectorConfig));
                allLedgers.addAll(ledgers);
            }
            List<List<InputLedger>> ldSplits = SplitUtils.partitionToNSplits(allLedgers, this.connectorConfig.getTargetNumSplits());
            for (int i = 0; i < ldSplits.size(); ++i) {
                pulsarSplits.add(this.genSplit(i, ldSplits.get(i)));
            }
            PulsarInputSplit[] pulsarInputSplitArray = pulsarSplits.toArray(new PulsarInputSplit[0]);
            return pulsarInputSplitArray;
        }
        catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    protected PulsarInputSplit genSplit(int index, List<InputLedger> ledgers) {
        return new PulsarInputSplit(index, ledgers);
    }

    public InputSplitAssigner getInputSplitAssigner(PulsarInputSplit[] inputSplits) {
        return new DefaultInputSplitAssigner((InputSplit[])inputSplits);
    }

    public void open(PulsarInputSplit split) throws IOException {
        try {
            this.reader = new GenericSplitReader(this.connectorConfig, split.getSplitNumber(), split.getLedgersToRead());
        }
        catch (Exception e) {
            throw new IOException(String.format("Failed to open split %d to read", split.getSplitNumber()), e);
        }
    }

    public boolean reachedEnd() throws IOException {
        return !this.reader.next();
    }

    public T nextRecord(T reuse) throws IOException {
        return this.reader.get();
    }

    public void close() throws IOException {
        if (this.reader != null) {
            try {
                this.reader.close();
            }
            catch (Exception e) {
                throw new IOException();
            }
        }
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    public String toString() {
        return "PulsarInputFormat(connectorConfig=" + this.connectorConfig + ", deserializer=" + this.deserializer + ", parameters=" + this.parameters + ", reader=" + this.reader + ")";
    }

    private class GenericSplitReader
    extends InputSplitReader {
        public GenericSplitReader(ConnectorConfig connectorConfig, int partitionId, List<InputLedger> ledgersToRead) throws Exception {
            super(connectorConfig, partitionId, ledgersToRead);
        }

        @Override
        public T deserialize(RawMessage currentMessage) throws IOException {
            return PulsarInputFormat.this.deserializer.deserialize(ByteBufUtil.getBytes((ByteBuf)currentMessage.getData()));
        }
    }
}

