/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive.read;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connectors.hive.ConsumeOrder;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.connectors.hive.read.DirectoryMonitorDiscovery;
import org.apache.flink.connectors.hive.read.HiveTableInputFormat;
import org.apache.flink.connectors.hive.read.HiveTableInputSplit;
import org.apache.flink.connectors.hive.read.PartitionDiscovery;
import org.apache.flink.connectors.hive.read.TimestampedHiveInputSplit;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.filesystem.DefaultPartTimeExtractor;
import org.apache.flink.table.filesystem.PartitionTimeExtractor;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveContinuousMonitoringFunction
extends RichSourceFunction<TimestampedHiveInputSplit>
implements CheckpointedFunction {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(HiveContinuousMonitoringFunction.class);
    private final int readerParallelism;
    private final long interval;
    private final HiveShim hiveShim;
    private final JobConfWrapper conf;
    private final ObjectPath tablePath;
    private final List<String> partitionKeys;
    private final String[] fieldNames;
    private final DataType[] fieldTypes;
    private final ConsumeOrder consumeOrder;
    private final String consumeOffset;
    private final String extractorKind;
    private final String extractorClass;
    private final String extractorPattern;
    private volatile boolean isRunning = true;
    private volatile long currentReadTime;
    private transient PartitionDiscovery.Context context;
    private transient PartitionDiscovery fetcher;
    private transient Object checkpointLock;
    private transient ListState<Long> currReadTimeState;
    private transient ListState<List<List<String>>> distinctPartsState;
    private transient IMetaStoreClient client;
    private transient Properties tableProps;
    private transient String defaultPartitionName;
    private transient Set<List<String>> distinctPartitions;

    public HiveContinuousMonitoringFunction(HiveShim hiveShim, JobConf conf, ObjectPath tablePath, CatalogTable catalogTable, int readerParallelism, ConsumeOrder consumeOrder, String consumeOffset, String extractorKind, String extractorClass, String extractorPattern, long interval) {
        this.hiveShim = hiveShim;
        this.conf = new JobConfWrapper(conf);
        this.tablePath = tablePath;
        this.partitionKeys = catalogTable.getPartitionKeys();
        this.fieldNames = catalogTable.getSchema().getFieldNames();
        this.fieldTypes = catalogTable.getSchema().getFieldDataTypes();
        this.consumeOrder = consumeOrder;
        this.extractorKind = extractorKind;
        this.extractorClass = extractorClass;
        this.extractorPattern = extractorPattern;
        this.consumeOffset = consumeOffset;
        this.interval = interval;
        this.readerParallelism = Math.max(readerParallelism, 1);
        this.currentReadTime = 0L;
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        this.currReadTimeState = context.getOperatorStateStore().getListState(new ListStateDescriptor("current-read-time-state", (TypeSerializer)LongSerializer.INSTANCE));
        this.distinctPartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor("distinct-partitions-state", (TypeSerializer)new ListSerializer((TypeSerializer)new ListSerializer((TypeSerializer)StringSerializer.INSTANCE))));
        this.client = this.hiveShim.getHiveMetastoreClient(new HiveConf((Configuration)this.conf.conf(), HiveConf.class));
        final Table hiveTable = this.client.getTable(this.tablePath.getDatabaseName(), this.tablePath.getObjectName());
        this.tableProps = HiveReflectionUtils.getTableMetadata(this.hiveShim, hiveTable);
        this.defaultPartitionName = this.conf.conf().get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
        final PartitionTimeExtractor extractor = PartitionTimeExtractor.create((ClassLoader)this.getRuntimeContext().getUserCodeClassLoader(), (String)this.extractorKind, (String)this.extractorClass, (String)this.extractorPattern);
        this.fetcher = new DirectoryMonitorDiscovery();
        Path location = new Path(hiveTable.getSd().getLocation());
        final FileSystem fs = location.getFileSystem((Configuration)this.conf.conf());
        this.context = new PartitionDiscovery.Context(){

            @Override
            public List<String> partitionKeys() {
                return HiveContinuousMonitoringFunction.this.partitionKeys;
            }

            @Override
            public Optional<Partition> getPartition(List<String> partValues) throws TException {
                try {
                    return Optional.of(HiveContinuousMonitoringFunction.this.client.getPartition(HiveContinuousMonitoringFunction.this.tablePath.getDatabaseName(), HiveContinuousMonitoringFunction.this.tablePath.getObjectName(), partValues));
                }
                catch (NoSuchObjectException e) {
                    return Optional.empty();
                }
            }

            @Override
            public FileSystem fileSystem() {
                return fs;
            }

            @Override
            public Path tableLocation() {
                return new Path(hiveTable.getSd().getLocation());
            }

            @Override
            public long extractTimestamp(List<String> partKeys, List<String> partValues, Supplier<Long> fileTime) {
                switch (HiveContinuousMonitoringFunction.this.consumeOrder) {
                    case CREATE_TIME_ORDER: {
                        return fileTime.get();
                    }
                    case PARTITION_TIME_ORDER: {
                        return DefaultPartTimeExtractor.toMills((LocalDateTime)extractor.extract(partKeys, partValues));
                    }
                }
                throw new UnsupportedOperationException("Unsupported consumer order: " + (Object)((Object)HiveContinuousMonitoringFunction.this.consumeOrder));
            }
        };
        this.distinctPartitions = new HashSet<List<String>>();
        if (context.isRestored()) {
            LOG.info("Restoring state for the {}.", (Object)((Object)((Object)this)).getClass().getSimpleName());
            this.currentReadTime = (Long)((Iterable)this.currReadTimeState.get()).iterator().next();
            this.distinctPartitions.addAll((Collection)((Iterable)this.distinctPartsState.get()).iterator().next());
        } else {
            LOG.info("No state to restore for the {}.", (Object)((Object)((Object)this)).getClass().getSimpleName());
            this.currentReadTime = DefaultPartTimeExtractor.toMills((String)this.consumeOffset);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<TimestampedHiveInputSplit> context) throws Exception {
        this.checkpointLock = context.getCheckpointLock();
        while (this.isRunning) {
            Object object = this.checkpointLock;
            synchronized (object) {
                this.monitorAndForwardSplits(context);
            }
            Thread.sleep(this.interval);
        }
    }

    private void monitorAndForwardSplits(SourceFunction.SourceContext<TimestampedHiveInputSplit> context) throws Exception {
        assert (Thread.holdsLock(this.checkpointLock));
        List<Tuple2<Partition, Long>> partitions = this.fetcher.fetchPartitions(this.context, this.currentReadTime);
        if (partitions.isEmpty()) {
            return;
        }
        partitions.sort((o1, o2) -> (int)((Long)o1.f1 - (Long)o2.f1));
        long maxTimestamp = Long.MIN_VALUE;
        HashSet<List<String>> nextDistinctParts = new HashSet<List<String>>();
        for (Tuple2<Partition, Long> tuple2 : partitions) {
            HiveTableInputSplit[] splits;
            Partition partition = (Partition)tuple2.f0;
            List<String> partSpec = partition.getValues();
            if (this.distinctPartitions.contains(partSpec)) continue;
            this.distinctPartitions.add(partSpec);
            long timestamp = (Long)tuple2.f1;
            if (timestamp > this.currentReadTime) {
                nextDistinctParts.add(partSpec);
            }
            if (timestamp > maxTimestamp) {
                maxTimestamp = timestamp;
            }
            LOG.info("Found new partition {} of table {}, forwarding splits to downstream readers", partSpec, (Object)this.tablePath.getFullName());
            for (HiveTableInputSplit split : splits = HiveTableInputFormat.createInputSplits(this.readerParallelism, Collections.singletonList(this.toHiveTablePartition(partition)), this.conf.conf())) {
                context.collect((Object)new TimestampedHiveInputSplit(timestamp, split));
            }
        }
        if (maxTimestamp > this.currentReadTime) {
            this.currentReadTime = maxTimestamp;
            this.distinctPartitions.clear();
            this.distinctPartitions.addAll(nextDistinctParts);
        }
    }

    private HiveTablePartition toHiveTablePartition(Partition p) {
        return HiveTableSource.toHiveTablePartition(this.partitionKeys, this.fieldNames, this.fieldTypes, this.hiveShim, this.tableProps, this.defaultPartitionName, p);
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        Preconditions.checkState((this.currReadTimeState != null ? 1 : 0) != 0, (Object)("The " + ((Object)((Object)this)).getClass().getSimpleName() + " state has not been properly initialized."));
        this.currReadTimeState.clear();
        this.currReadTimeState.add((Object)this.currentReadTime);
        this.distinctPartsState.clear();
        this.distinctPartsState.add(new ArrayList<List<String>>(this.distinctPartitions));
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} checkpointed {}.", (Object)((Object)((Object)this)).getClass().getSimpleName(), (Object)this.currentReadTime);
        }
    }

    public void close() {
        this.cancel();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel() {
        if (this.checkpointLock != null) {
            Object object = this.checkpointLock;
            synchronized (object) {
                this.currentReadTime = Long.MAX_VALUE;
                this.isRunning = false;
            }
        } else {
            this.currentReadTime = Long.MAX_VALUE;
            this.isRunning = false;
        }
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
    }
}

