/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
import org.apache.flink.connectors.hive.ContinuousHivePendingSplitsCheckpoint;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.HiveSourceFileEnumerator;
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.connectors.hive.read.HiveContinuousPartitionContext;
import org.apache.flink.connectors.hive.read.HiveSourceSplit;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ContinuousHiveSplitEnumerator<T extends Comparable<T>>
implements SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class);
    private final SplitEnumeratorContext<HiveSourceSplit> enumeratorContext;
    private final LinkedHashMap<Integer, String> readersAwaitingSplit;
    private final FileSplitAssigner splitAssigner;
    private final long discoveryInterval;
    private final HiveTableSource.HiveContinuousPartitionFetcherContext<T> fetcherContext;
    private T currentReadOffset;
    private Collection<List<String>> seenPartitionsSinceOffset;
    private final PartitionMonitor<T> monitor;

    public ContinuousHiveSplitEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumeratorContext, T currentReadOffset, Collection<List<String>> seenPartitionsSinceOffset, FileSplitAssigner splitAssigner, long discoveryInterval, JobConf jobConf, ObjectPath tablePath, ContinuousPartitionFetcher<Partition, T> fetcher, HiveTableSource.HiveContinuousPartitionFetcherContext<T> fetcherContext) {
        this.enumeratorContext = enumeratorContext;
        this.currentReadOffset = currentReadOffset;
        this.seenPartitionsSinceOffset = new ArrayList<List<String>>(seenPartitionsSinceOffset);
        this.splitAssigner = splitAssigner;
        this.discoveryInterval = discoveryInterval;
        this.fetcherContext = fetcherContext;
        this.readersAwaitingSplit = new LinkedHashMap();
        this.monitor = new PartitionMonitor<T>(currentReadOffset, seenPartitionsSinceOffset, tablePath, jobConf, fetcher, fetcherContext);
    }

    public void start() {
        try {
            this.fetcherContext.open();
            this.enumeratorContext.callAsync(this.monitor, this::handleNewSplits, this.discoveryInterval, this.discoveryInterval);
        }
        catch (Exception e) {
            throw new FlinkHiveException("Failed to start continuous split enumerator", e);
        }
    }

    public void handleSplitRequest(int subtaskId, @Nullable String hostName) {
        this.readersAwaitingSplit.put(subtaskId, hostName);
        this.assignSplits();
    }

    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        LOG.error("Received unrecognized event: {}", (Object)sourceEvent);
    }

    public void addSplitsBack(List<HiveSourceSplit> splits, int subtaskId) {
        LOG.debug("Continuous Hive Source Enumerator adds splits back: {}", splits);
        this.splitAssigner.addSplits(new ArrayList<HiveSourceSplit>(splits));
    }

    public void addReader(int subtaskId) {
    }

    public PendingSplitsCheckpoint<HiveSourceSplit> snapshotState(long checkpointId) throws Exception {
        Collection remainingSplits = this.splitAssigner.remainingSplits();
        return new ContinuousHivePendingSplitsCheckpoint(remainingSplits, (Comparable<?>)this.currentReadOffset, this.seenPartitionsSinceOffset);
    }

    public void close() throws IOException {
        try {
            this.fetcherContext.close();
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    private void handleNewSplits(NewSplitsAndState<T> newSplitsAndState, Throwable error) {
        if (error != null) {
            throw new FlinkHiveException("Failed to enumerate files", error);
        }
        this.currentReadOffset = ((NewSplitsAndState)newSplitsAndState).offset;
        this.seenPartitionsSinceOffset = ((NewSplitsAndState)newSplitsAndState).seenPartitions;
        this.splitAssigner.addSplits(new ArrayList(((NewSplitsAndState)newSplitsAndState).newSplits));
        this.assignSplits();
    }

    private void assignSplits() {
        Iterator<Map.Entry<Integer, String>> awaitingReader = this.readersAwaitingSplit.entrySet().iterator();
        while (awaitingReader.hasNext()) {
            Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();
            String hostname = nextAwaiting.getValue();
            int awaitingSubtask = nextAwaiting.getKey();
            Optional nextSplit = this.splitAssigner.getNext(hostname);
            if (!nextSplit.isPresent()) break;
            this.enumeratorContext.assignSplit((SourceSplit)((HiveSourceSplit)((Object)nextSplit.get())), awaitingSubtask);
            awaitingReader.remove();
        }
    }

    static class NewSplitsAndState<T extends Comparable<T>> {
        private final T offset;
        private final Collection<List<String>> seenPartitions;
        private final Collection<HiveSourceSplit> newSplits;

        private NewSplitsAndState(Collection<HiveSourceSplit> newSplits, T offset, Collection<List<String>> seenPartitions) {
            this.newSplits = newSplits;
            this.offset = offset;
            this.seenPartitions = new ArrayList<List<String>>(seenPartitions);
        }

        @VisibleForTesting
        Collection<List<String>> getSeenPartitions() {
            return this.seenPartitions;
        }

        /* synthetic */ NewSplitsAndState(Collection x0, Comparable x1, Collection x2, 1 x3) {
            this(x0, x1, x2);
        }
    }

    static class PartitionMonitor<T extends Comparable<T>>
    implements Callable<NewSplitsAndState<T>> {
        private T currentReadOffset;
        private final Set<List<String>> seenPartitionsSinceOffset;
        private final ObjectPath tablePath;
        private final JobConf jobConf;
        private final ContinuousPartitionFetcher<Partition, T> fetcher;
        private final HiveContinuousPartitionContext<Partition, T> fetcherContext;

        PartitionMonitor(T currentReadOffset, Collection<List<String>> seenPartitionsSinceOffset, ObjectPath tablePath, JobConf jobConf, ContinuousPartitionFetcher<Partition, T> fetcher, HiveContinuousPartitionContext<Partition, T> fetcherContext) {
            this.currentReadOffset = currentReadOffset;
            this.seenPartitionsSinceOffset = new HashSet<List<String>>(seenPartitionsSinceOffset);
            this.tablePath = tablePath;
            this.jobConf = jobConf;
            this.fetcher = fetcher;
            this.fetcherContext = fetcherContext;
        }

        @Override
        public NewSplitsAndState<T> call() throws Exception {
            List partitions = this.fetcher.fetchPartitions(this.fetcherContext, this.currentReadOffset);
            if (partitions.isEmpty()) {
                return new NewSplitsAndState(Collections.emptyList(), (Comparable)this.currentReadOffset, this.seenPartitionsSinceOffset, null);
            }
            partitions.sort(Comparator.comparing(o -> (Comparable)o.f1));
            ArrayList<HiveSourceSplit> newSplits = new ArrayList<HiveSourceSplit>();
            Object maxOffset = this.currentReadOffset;
            HashSet<List<String>> nextSeen = new HashSet<List<String>>();
            for (Tuple2 tuple2 : partitions) {
                Partition partition = (Partition)tuple2.f0;
                List<String> partSpec = partition.getValues();
                if (!this.seenPartitionsSinceOffset.add(partSpec)) continue;
                Comparable offset = (Comparable)tuple2.f1;
                if (offset.compareTo(this.currentReadOffset) >= 0) {
                    nextSeen.add(partSpec);
                }
                if (offset.compareTo(maxOffset) >= 0) {
                    maxOffset = offset;
                }
                LOG.info("Found new partition {} of table {}, generating splits for it", partSpec, (Object)this.tablePath.getFullName());
                newSplits.addAll(HiveSourceFileEnumerator.createInputSplits(0, Collections.singletonList(this.fetcherContext.toHiveTablePartition(partition)), this.jobConf));
            }
            this.currentReadOffset = maxOffset;
            if (!nextSeen.isEmpty()) {
                this.seenPartitionsSinceOffset.clear();
                this.seenPartitionsSinceOffset.addAll(nextSeen);
            }
            return new NewSplitsAndState(newSplits, (Comparable)this.currentReadOffset, this.seenPartitionsSinceOffset, null);
        }
    }
}

