package org.apache.flink.connectors.hive;

import java.io.IOException;
import java.lang.Comparable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.connectors.hive.read.HiveSourceSplit;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.class */
public class ContinuousHiveSplitEnumerator<T extends Comparable<T>> implements SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class);
    private final SplitEnumeratorContext<HiveSourceSplit> enumeratorContext;
    private final LinkedHashMap<Integer, String> readersAwaitingSplit = new LinkedHashMap<>();
    private final FileSplitAssigner splitAssigner;
    private final long discoveryInterval;
    private final HiveTableSource.HiveContinuousPartitionFetcherContext<T> fetcherContext;
    private T currentReadOffset;
    private Collection<List<String>> seenPartitionsSinceOffset;
    private final PartitionMonitor<T> monitor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator$NewSplitsAndState.class */
    public static class NewSplitsAndState<T extends Comparable<T>> {
        private final T offset;
        private final Collection<List<String>> seenPartitions;
        private final Collection<HiveSourceSplit> newSplits;

        private NewSplitsAndState(Collection<HiveSourceSplit> collection, T t, Collection<List<String>> collection2) {
            this.newSplits = collection;
            this.offset = t;
            this.seenPartitions = new ArrayList(collection2);
        }
    }

    /* loaded from: input_file:org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator$PartitionMonitor.class */
    private static class PartitionMonitor<T extends Comparable<T>> implements Callable<NewSplitsAndState<T>> {
        private T currentReadOffset;
        private final Set<List<String>> seenPartitionsSinceOffset;
        private final ObjectPath tablePath;
        private final JobConf jobConf;
        private final ContinuousPartitionFetcher<Partition, T> fetcher;
        private final HiveTableSource.HiveContinuousPartitionFetcherContext<T> fetcherContext;

        private PartitionMonitor(T t, Collection<List<String>> collection, ObjectPath objectPath, JobConf jobConf, ContinuousPartitionFetcher<Partition, T> continuousPartitionFetcher, HiveTableSource.HiveContinuousPartitionFetcherContext<T> hiveContinuousPartitionFetcherContext) {
            this.currentReadOffset = t;
            this.seenPartitionsSinceOffset = new HashSet(collection);
            this.tablePath = objectPath;
            this.jobConf = jobConf;
            this.fetcher = continuousPartitionFetcher;
            this.fetcherContext = hiveContinuousPartitionFetcherContext;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.util.concurrent.Callable
        public NewSplitsAndState<T> call() throws Exception {
            List<Tuple2> fetchPartitions = this.fetcher.fetchPartitions(this.fetcherContext, this.currentReadOffset);
            if (fetchPartitions.isEmpty()) {
                return new NewSplitsAndState<>(Collections.emptyList(), this.currentReadOffset, this.seenPartitionsSinceOffset);
            }
            fetchPartitions.sort(Comparator.comparing(tuple2 -> {
                return (Comparable) tuple2.f1;
            }));
            ArrayList arrayList = new ArrayList();
            T t = this.currentReadOffset;
            HashSet hashSet = new HashSet();
            for (Tuple2 tuple22 : fetchPartitions) {
                Partition partition = (Partition) tuple22.f0;
                List<String> values = partition.getValues();
                if (this.seenPartitionsSinceOffset.add(values)) {
                    Comparable comparable = (Comparable) tuple22.f1;
                    if (comparable.compareTo(this.currentReadOffset) > 0) {
                        hashSet.add(values);
                    }
                    if (comparable.compareTo(t) > 0) {
                        t = comparable;
                    }
                    ContinuousHiveSplitEnumerator.LOG.info("Found new partition {} of table {}, generating splits for it", values, this.tablePath.getFullName());
                    arrayList.addAll(HiveSourceFileEnumerator.createInputSplits(0, Collections.singletonList(this.fetcherContext.toHiveTablePartition(partition)), this.jobConf));
                }
            }
            this.currentReadOffset = t;
            if (!hashSet.isEmpty()) {
                this.seenPartitionsSinceOffset.clear();
                this.seenPartitionsSinceOffset.addAll(hashSet);
            }
            return new NewSplitsAndState<>(arrayList, this.currentReadOffset, this.seenPartitionsSinceOffset);
        }
    }

    public ContinuousHiveSplitEnumerator(SplitEnumeratorContext<HiveSourceSplit> splitEnumeratorContext, T t, Collection<List<String>> collection, FileSplitAssigner fileSplitAssigner, long j, JobConf jobConf, ObjectPath objectPath, ContinuousPartitionFetcher<Partition, T> continuousPartitionFetcher, HiveTableSource.HiveContinuousPartitionFetcherContext<T> hiveContinuousPartitionFetcherContext) {
        this.enumeratorContext = splitEnumeratorContext;
        this.currentReadOffset = t;
        this.seenPartitionsSinceOffset = new ArrayList(collection);
        this.splitAssigner = fileSplitAssigner;
        this.discoveryInterval = j;
        this.fetcherContext = hiveContinuousPartitionFetcherContext;
        this.monitor = new PartitionMonitor<>(t, collection, objectPath, jobConf, continuousPartitionFetcher, hiveContinuousPartitionFetcherContext);
    }

    public void start() {
        try {
            this.fetcherContext.open();
            this.enumeratorContext.callAsync(this.monitor, this::handleNewSplits, this.discoveryInterval, this.discoveryInterval);
        } catch (Exception e) {
            throw new FlinkHiveException("Failed to start continuous split enumerator", e);
        }
    }

    public void handleSplitRequest(int i, @Nullable String str) {
        this.readersAwaitingSplit.put(Integer.valueOf(i), str);
        assignSplits();
    }

    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        LOG.error("Received unrecognized event: {}", sourceEvent);
    }

    public void addSplitsBack(List<HiveSourceSplit> list, int i) {
        LOG.debug("Continuous Hive Source Enumerator adds splits back: {}", list);
        this.splitAssigner.addSplits(new ArrayList(list));
    }

    public void addReader(int i) {
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public PendingSplitsCheckpoint<HiveSourceSplit> m647snapshotState() throws Exception {
        return new ContinuousHivePendingSplitsCheckpoint(this.splitAssigner.remainingSplits(), this.currentReadOffset, this.seenPartitionsSinceOffset);
    }

    public void close() throws IOException {
        try {
            this.fetcherContext.close();
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    private void handleNewSplits(NewSplitsAndState<T> newSplitsAndState, Throwable th) {
        if (th != null) {
            throw new FlinkHiveException("Failed to enumerate files", th);
        }
        this.currentReadOffset = (T) ((NewSplitsAndState) newSplitsAndState).offset;
        this.seenPartitionsSinceOffset = ((NewSplitsAndState) newSplitsAndState).seenPartitions;
        this.splitAssigner.addSplits(new ArrayList(((NewSplitsAndState) newSplitsAndState).newSplits));
        assignSplits();
    }

    private void assignSplits() {
        Iterator<Map.Entry<Integer, String>> it = this.readersAwaitingSplit.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, String> next = it.next();
            String value = next.getValue();
            int intValue = next.getKey().intValue();
            Optional next2 = this.splitAssigner.getNext(value);
            if (!next2.isPresent()) {
                return;
            }
            this.enumeratorContext.assignSplit((HiveSourceSplit) next2.get(), intValue);
            it.remove();
        }
    }
}
