package org.apache.flink.connectors.hive.read;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connectors.hive.ConsumeOrder;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.connectors.hive.read.PartitionDiscovery;
import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.filesystem.DefaultPartTimeExtractor;
import org.apache.flink.table.filesystem.PartitionTimeExtractor;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.class */
public class HiveContinuousMonitoringFunction extends RichSourceFunction<TimestampedHiveInputSplit> implements CheckpointedFunction {
    private static final long serialVersionUID = 1;
    private static final Logger LOG;
    private final int readerParallelism;
    private final long interval;
    private final HiveShim hiveShim;
    private final JobConfWrapper conf;
    private final ObjectPath tablePath;
    private final List<String> partitionKeys;
    private final String[] fieldNames;
    private final DataType[] fieldTypes;
    private final ConsumeOrder consumeOrder;
    private final String consumeOffset;
    private final String extractorKind;
    private final String extractorClass;
    private final String extractorPattern;
    private volatile boolean isRunning = true;
    private volatile long currentReadTime = 0;
    private transient PartitionDiscovery.Context context;
    private transient PartitionDiscovery fetcher;
    private transient Object checkpointLock;
    private transient ListState<Long> currReadTimeState;
    private transient ListState<List<List<String>>> distinctPartsState;
    private transient IMetaStoreClient client;
    private transient Properties tableProps;
    private transient String defaultPartitionName;
    private transient Set<List<String>> distinctPartitions;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* renamed from: org.apache.flink.connectors.hive.read.HiveContinuousMonitoringFunction$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$connectors$hive$ConsumeOrder = new int[ConsumeOrder.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$connectors$hive$ConsumeOrder[ConsumeOrder.CREATE_TIME_ORDER.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$connectors$hive$ConsumeOrder[ConsumeOrder.PARTITION_TIME_ORDER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public HiveContinuousMonitoringFunction(HiveShim hiveShim, JobConf jobConf, ObjectPath objectPath, CatalogTable catalogTable, int i, ConsumeOrder consumeOrder, String str, String str2, String str3, String str4, long j) {
        this.hiveShim = hiveShim;
        this.conf = new JobConfWrapper(jobConf);
        this.tablePath = objectPath;
        this.partitionKeys = catalogTable.getPartitionKeys();
        this.fieldNames = catalogTable.getSchema().getFieldNames();
        this.fieldTypes = catalogTable.getSchema().getFieldDataTypes();
        this.consumeOrder = consumeOrder;
        this.extractorKind = str2;
        this.extractorClass = str3;
        this.extractorPattern = str4;
        this.consumeOffset = str;
        this.interval = j;
        this.readerParallelism = Math.max(i, 1);
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.currReadTimeState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("current-read-time-state", LongSerializer.INSTANCE));
        this.distinctPartsState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("distinct-partitions-state", new ListSerializer(new ListSerializer(StringSerializer.INSTANCE))));
        this.client = this.hiveShim.getHiveMetastoreClient(HiveConfUtils.create(this.conf.conf()));
        final Table table = this.client.getTable(this.tablePath.getDatabaseName(), this.tablePath.getObjectName());
        this.tableProps = HiveReflectionUtils.getTableMetadata(this.hiveShim, table);
        this.defaultPartitionName = this.conf.conf().get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
        final PartitionTimeExtractor create = PartitionTimeExtractor.create(getRuntimeContext().getUserCodeClassLoader(), this.extractorKind, this.extractorClass, this.extractorPattern);
        this.fetcher = new DirectoryMonitorDiscovery();
        final FileSystem fileSystem = new Path(table.getSd().getLocation()).getFileSystem(this.conf.conf());
        this.context = new PartitionDiscovery.Context() { // from class: org.apache.flink.connectors.hive.read.HiveContinuousMonitoringFunction.1
            @Override // org.apache.flink.connectors.hive.read.PartitionDiscovery.Context
            public List<String> partitionKeys() {
                return HiveContinuousMonitoringFunction.this.partitionKeys;
            }

            @Override // org.apache.flink.connectors.hive.read.PartitionDiscovery.Context
            public Optional<Partition> getPartition(List<String> list) throws TException {
                try {
                    return Optional.of(HiveContinuousMonitoringFunction.this.client.getPartition(HiveContinuousMonitoringFunction.this.tablePath.getDatabaseName(), HiveContinuousMonitoringFunction.this.tablePath.getObjectName(), list));
                } catch (NoSuchObjectException e) {
                    return Optional.empty();
                }
            }

            @Override // org.apache.flink.connectors.hive.read.PartitionDiscovery.Context
            public FileSystem fileSystem() {
                return fileSystem;
            }

            @Override // org.apache.flink.connectors.hive.read.PartitionDiscovery.Context
            public Path tableLocation() {
                return new Path(table.getSd().getLocation());
            }

            @Override // org.apache.flink.connectors.hive.read.PartitionDiscovery.Context
            public long extractTimestamp(List<String> list, List<String> list2, Supplier<Long> supplier) {
                switch (AnonymousClass2.$SwitchMap$org$apache$flink$connectors$hive$ConsumeOrder[HiveContinuousMonitoringFunction.this.consumeOrder.ordinal()]) {
                    case 1:
                        return supplier.get().longValue();
                    case 2:
                        return DefaultPartTimeExtractor.toMills(create.extract(list, list2));
                    default:
                        throw new UnsupportedOperationException("Unsupported consumer order: " + HiveContinuousMonitoringFunction.this.consumeOrder);
                }
            }
        };
        this.distinctPartitions = new HashSet();
        if (!functionInitializationContext.isRestored()) {
            LOG.info("No state to restore for the {}.", getClass().getSimpleName());
            this.currentReadTime = DefaultPartTimeExtractor.toMills(this.consumeOffset);
        } else {
            LOG.info("Restoring state for the {}.", getClass().getSimpleName());
            this.currentReadTime = ((Long) ((Iterable) this.currReadTimeState.get()).iterator().next()).longValue();
            this.distinctPartitions.addAll((Collection) ((Iterable) this.distinctPartsState.get()).iterator().next());
        }
    }

    public void run(SourceFunction.SourceContext<TimestampedHiveInputSplit> sourceContext) throws Exception {
        this.checkpointLock = sourceContext.getCheckpointLock();
        while (this.isRunning) {
            synchronized (this.checkpointLock) {
                monitorAndForwardSplits(sourceContext);
            }
            Thread.sleep(this.interval);
        }
    }

    private void monitorAndForwardSplits(SourceFunction.SourceContext<TimestampedHiveInputSplit> sourceContext) throws Exception {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError();
        }
        List<Tuple2<Partition, Long>> fetchPartitions = this.fetcher.fetchPartitions(this.context, this.currentReadTime);
        if (fetchPartitions.isEmpty()) {
            return;
        }
        fetchPartitions.sort((tuple2, tuple22) -> {
            return (int) (((Long) tuple2.f1).longValue() - ((Long) tuple22.f1).longValue());
        });
        long j = Long.MIN_VALUE;
        HashSet hashSet = new HashSet();
        for (Tuple2<Partition, Long> tuple23 : fetchPartitions) {
            Partition partition = (Partition) tuple23.f0;
            List<String> values = partition.getValues();
            if (!this.distinctPartitions.contains(values)) {
                this.distinctPartitions.add(values);
                long longValue = ((Long) tuple23.f1).longValue();
                if (longValue > this.currentReadTime) {
                    hashSet.add(values);
                }
                if (longValue > j) {
                    j = longValue;
                }
                LOG.info("Found new partition {} of table {}, forwarding splits to downstream readers", values, this.tablePath.getFullName());
                for (HiveTableInputSplit hiveTableInputSplit : HiveTableInputFormat.createInputSplits(this.readerParallelism, Collections.singletonList(toHiveTablePartition(partition)), this.conf.conf())) {
                    sourceContext.collect(new TimestampedHiveInputSplit(longValue, hiveTableInputSplit));
                }
            }
        }
        if (j > this.currentReadTime) {
            this.currentReadTime = j;
            this.distinctPartitions.clear();
            this.distinctPartitions.addAll(hashSet);
        }
    }

    private HiveTablePartition toHiveTablePartition(Partition partition) {
        return HiveTableSource.toHiveTablePartition(this.partitionKeys, this.fieldNames, this.fieldTypes, this.hiveShim, this.tableProps, this.defaultPartitionName, partition);
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        Preconditions.checkState(this.currReadTimeState != null, "The " + getClass().getSimpleName() + " state has not been properly initialized.");
        this.currReadTimeState.clear();
        this.currReadTimeState.add(Long.valueOf(this.currentReadTime));
        this.distinctPartsState.clear();
        this.distinctPartsState.add(new ArrayList(this.distinctPartitions));
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} checkpointed {}.", getClass().getSimpleName(), Long.valueOf(this.currentReadTime));
        }
    }

    public void close() {
        cancel();
    }

    public void cancel() {
        if (this.checkpointLock != null) {
            synchronized (this.checkpointLock) {
                this.currentReadTime = Long.MAX_VALUE;
                this.isRunning = false;
            }
        } else {
            this.currentReadTime = Long.MAX_VALUE;
            this.isRunning = false;
        }
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
    }

    static {
        $assertionsDisabled = !HiveContinuousMonitoringFunction.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(HiveContinuousMonitoringFunction.class);
    }
}
