package org.apache.gobblin.data.management.conversion.hive.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.gson.Gson;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.conversion.hive.avro.AvroSchemaManager;
import org.apache.gobblin.data.management.conversion.hive.avro.SchemaNotFoundException;
import org.apache.gobblin.data.management.conversion.hive.events.EventConstants;
import org.apache.gobblin.data.management.conversion.hive.events.EventWorkunitUtils;
import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractorFactory;
import org.apache.gobblin.data.management.conversion.hive.extractor.HiveConvertExtractorFactory;
import org.apache.gobblin.data.management.conversion.hive.provider.HiveUnitUpdateProvider;
import org.apache.gobblin.data.management.conversion.hive.provider.UpdateNotFoundException;
import org.apache.gobblin.data.management.conversion.hive.provider.UpdateProviderFactory;
import org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarker;
import org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarkerFactory;
import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker;
import org.apache.gobblin.data.management.copy.hive.HiveDataset;
import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
import org.apache.gobblin.data.management.copy.hive.HiveUtils;
import org.apache.gobblin.data.management.copy.hive.PartitionFilterGenerator;
import org.apache.gobblin.data.management.copy.hive.filter.LookbackPartitionFilterGenerator;
import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.WatermarkInterval;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.AutoReturnableObject;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.io.GsonInterfaceAdapter;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
import org.apache.log4j.Level;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/data/management/conversion/hive/source/HiveSource.class */
public class HiveSource implements Source {
    public static final String DISABLE_AVRO_CHAECK = "hive.source.disable.avro.check";
    public static final boolean DEFAULT_DISABLE_AVRO_CHAECK = false;
    public static final String HIVE_SOURCE_MAXIMUM_LOOKBACK_DAYS_KEY = "hive.source.maximum.lookbackDays";
    public static final int DEFAULT_HIVE_SOURCE_MAXIMUM_LOOKBACK_DAYS = 3;
    public static final String HIVE_SOURCE_DATASET_FINDER_CLASS_KEY = "hive.dataset.finder.class";
    public static final String HIVE_SOURCE_DATASET_FINDER_PARTITION_FILTER_KEY = "hive.dataset.finder.partitionfilter.class";
    public static final String DISTCP_REGISTRATION_GENERATION_TIME_KEY = "registrationGenerationTimeMillis";
    public static final String HIVE_SOURCE_WATERMARKER_FACTORY_CLASS_KEY = "hive.source.watermarker.factoryClass";
    public static final String HIVE_SOURCE_EXTRACTOR_TYPE = "hive.source.extractorType";
    public static final String HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS = "hive.source.createWorkunitsForPartitions";
    public static final boolean DEFAULT_HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS = true;
    public static final String HIVE_SOURCE_FS_URI = "hive.source.fs.uri";
    public static final String HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER_KEY = "hive.source.ignoreDataPathIdentifier";
    public static final String DEFAULT_HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER = "";
    protected MetricContext metricContext;
    protected EventSubmitter eventSubmitter;
    protected AvroSchemaManager avroSchemaManager;
    protected HiveUnitUpdateProvider updateProvider;
    protected HiveSourceWatermarker watermarker;
    protected IterableDatasetFinder<HiveDataset> datasetFinder;
    protected List<WorkUnit> workunits;
    protected PartitionFilterGenerator partitionFilterGenerator;
    protected long maxLookBackTime;
    protected long beginGetWorkunitsTime;
    protected List<String> ignoreDataPathIdentifierList;
    protected final ClassAliasResolver<HiveBaseExtractorFactory> classAliasResolver = new ClassAliasResolver<>(HiveBaseExtractorFactory.class);
    private static final Logger log = LoggerFactory.getLogger(HiveSource.class);
    public static final String DEFAULT_HIVE_SOURCE_DATASET_FINDER_CLASS = HiveDatasetFinder.class.getName();
    public static final String DEFAULT_HIVE_SOURCE_DATASET_FINDER_PARTITION_FILTER_CLASS = LookbackPartitionFilterGenerator.class.getName();
    public static final String DEFAULT_HIVE_SOURCE_WATERMARKER_FACTORY_CLASS = PartitionLevelWatermarker.Factory.class.getName();
    public static final String DEFAULT_HIVE_SOURCE_EXTRACTOR_TYPE = HiveConvertExtractorFactory.class.getName();
    public static final Gson GENERICS_AWARE_GSON = GsonInterfaceAdapter.getGson(Object.class);
    public static final Splitter COMMA_BASED_SPLITTER = Splitter.on(",").omitEmptyStrings().trimResults();

    public List<WorkUnit> getWorkunits(SourceState sourceState) {
        try {
            this.beginGetWorkunitsTime = System.currentTimeMillis();
            initialize(sourceState);
            EventSubmitter.submit(Optional.of(this.eventSubmitter), EventConstants.CONVERSION_FIND_HIVE_TABLES_EVENT);
            Iterator datasetsIterator = this.datasetFinder.getDatasetsIterator();
            boolean propAsBoolean = sourceState.getPropAsBoolean(DISABLE_AVRO_CHAECK, false);
            while (datasetsIterator.hasNext()) {
                HiveDataset hiveDataset = (HiveDataset) datasetsIterator.next();
                AutoReturnableObject<IMetaStoreClient> client = hiveDataset.getClientPool().getClient();
                Throwable th = null;
                try {
                    try {
                        log.debug(String.format("Processing dataset: %s", hiveDataset));
                        if (hiveDataset.getTable().isPartitioned() && sourceState.getPropAsBoolean(HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS, true)) {
                            createWorkunitsForPartitionedTable(hiveDataset, client, propAsBoolean);
                        } else {
                            createWorkunitForNonPartitionedTable(hiveDataset, propAsBoolean);
                        }
                        if (client != null) {
                            if (0 != 0) {
                                try {
                                    client.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                client.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            }
            int size = this.workunits.size();
            this.watermarker.onGetWorkunitsEnd(this.workunits);
            log.info(String.format("Created %s real workunits and %s watermark workunits", Integer.valueOf(size), Integer.valueOf(this.workunits.size() - size)));
            return this.workunits;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    public void initialize(SourceState sourceState) throws IOException {
        this.updateProvider = UpdateProviderFactory.create((State) sourceState);
        this.metricContext = Instrumented.getMetricContext(sourceState, HiveSource.class);
        this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, EventConstants.CONVERSION_NAMESPACE).build();
        this.avroSchemaManager = new AvroSchemaManager(getSourceFs(sourceState), sourceState);
        this.workunits = Lists.newArrayList();
        this.watermarker = ((HiveSourceWatermarkerFactory) GobblinConstructorUtils.invokeConstructor(HiveSourceWatermarkerFactory.class, sourceState.getProp(HIVE_SOURCE_WATERMARKER_FACTORY_CLASS_KEY, DEFAULT_HIVE_SOURCE_WATERMARKER_FACTORY_CLASS), new Object[0])).createFromState(sourceState);
        EventSubmitter.submit(Optional.of(this.eventSubmitter), EventConstants.CONVERSION_SETUP_EVENT);
        this.datasetFinder = (IterableDatasetFinder) GobblinConstructorUtils.invokeConstructor(HiveDatasetFinder.class, sourceState.getProp(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY, DEFAULT_HIVE_SOURCE_DATASET_FINDER_CLASS), new Object[]{getSourceFs(sourceState), sourceState.getProperties(), this.eventSubmitter});
        this.maxLookBackTime = new DateTime().minusDays(sourceState.getPropAsInt(HIVE_SOURCE_MAXIMUM_LOOKBACK_DAYS_KEY, 3)).getMillis();
        this.ignoreDataPathIdentifierList = COMMA_BASED_SPLITTER.splitToList(sourceState.getProp(HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER_KEY, DEFAULT_HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER));
        this.partitionFilterGenerator = (PartitionFilterGenerator) GobblinConstructorUtils.invokeConstructor(PartitionFilterGenerator.class, sourceState.getProp(HIVE_SOURCE_DATASET_FINDER_PARTITION_FILTER_KEY, DEFAULT_HIVE_SOURCE_DATASET_FINDER_PARTITION_FILTER_CLASS), new Object[]{sourceState.getProperties()});
        silenceHiveLoggers();
    }

    @Deprecated
    protected void createWorkunitForNonPartitionedTable(HiveDataset hiveDataset) throws IOException {
        createWorkunitForNonPartitionedTable(hiveDataset, false);
    }

    protected void createWorkunitForNonPartitionedTable(HiveDataset hiveDataset, boolean z) throws IOException {
        try {
            long millis = new DateTime().getMillis();
            long updateTime = this.updateProvider.getUpdateTime(hiveDataset.getTable());
            this.watermarker.onTableProcessBegin(hiveDataset.getTable(), millis);
            LongWatermark previousHighWatermark = this.watermarker.getPreviousHighWatermark(hiveDataset.getTable());
            if (!shouldCreateWorkUnit(hiveDataset.getTable().getPath())) {
                log.info(String.format("Not creating workunit for table %s as partition path %s contains data path tokens to ignore %s", hiveDataset.getTable().getCompleteName(), hiveDataset.getTable().getPath(), this.ignoreDataPathIdentifierList));
                return;
            }
            if (shouldCreateWorkunit(hiveDataset.getTable(), previousHighWatermark)) {
                log.info(String.format("Creating workunit for table %s as updateTime %s or createTime %s is greater than low watermark %s", hiveDataset.getTable().getCompleteName(), Long.valueOf(updateTime), Integer.valueOf(hiveDataset.getTable().getTTable().getCreateTime()), Long.valueOf(previousHighWatermark.getValue())));
                HiveWorkUnit workUnitForTable = workUnitForTable(hiveDataset, z);
                workUnitForTable.setWatermarkInterval(new WatermarkInterval(previousHighWatermark, this.watermarker.getExpectedHighWatermark(hiveDataset.getTable(), millis)));
                EventWorkunitUtils.setTableSlaEventMetadata(workUnitForTable, hiveDataset.getTable(), updateTime, previousHighWatermark.getValue(), this.beginGetWorkunitsTime);
                this.workunits.add(workUnitForTable);
                log.debug(String.format("Workunit added for table: %s", workUnitForTable));
            } else {
                log.info(String.format("Not creating workunit for table %s as updateTime %s and createTime %s is not greater than low watermark %s", hiveDataset.getTable().getCompleteName(), Long.valueOf(updateTime), Integer.valueOf(hiveDataset.getTable().getTTable().getCreateTime()), Long.valueOf(previousHighWatermark.getValue())));
            }
        } catch (SchemaNotFoundException e) {
            log.error(String.format("Not Creating workunit for %s as schema was not found. %s", hiveDataset.getTable().getCompleteName(), e.getMessage()), e);
        } catch (UpdateNotFoundException e2) {
            log.error(String.format("Not Creating workunit for %s as update time was not found. %s", hiveDataset.getTable().getCompleteName(), e2.getMessage()), e2);
        }
    }

    @Deprecated
    protected HiveWorkUnit workUnitForTable(HiveDataset hiveDataset) throws IOException {
        return workUnitForTable(hiveDataset, false);
    }

    protected HiveWorkUnit workUnitForTable(HiveDataset hiveDataset, boolean z) throws IOException {
        HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset);
        if (z || isAvro(hiveDataset.getTable())) {
            hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
        }
        return hiveWorkUnit;
    }

    @Deprecated
    protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, AutoReturnableObject<IMetaStoreClient> autoReturnableObject) throws IOException {
        createWorkunitsForPartitionedTable(hiveDataset, autoReturnableObject, false);
    }

    protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, AutoReturnableObject<IMetaStoreClient> autoReturnableObject, boolean z) throws IOException {
        long millis = new DateTime().getMillis();
        this.watermarker.onTableProcessBegin(hiveDataset.getTable(), millis);
        for (Partition partition : HiveUtils.getPartitions((IMetaStoreClient) autoReturnableObject.get(), hiveDataset.getTable(), Optional.fromNullable(this.partitionFilterGenerator.getFilter(hiveDataset)))) {
            if (!isOlderThanLookback(partition)) {
                LongWatermark previousHighWatermark = this.watermarker.getPreviousHighWatermark(partition);
                try {
                    if (shouldCreateWorkUnit(new Path(partition.getLocation()))) {
                        long updateTime = this.updateProvider.getUpdateTime(partition);
                        if (shouldCreateWorkunit(partition, previousHighWatermark)) {
                            log.debug(String.format("Processing partition: %s", partition));
                            long millis2 = new DateTime().getMillis();
                            this.watermarker.onPartitionProcessBegin(partition, millis2, updateTime);
                            LongWatermark expectedHighWatermark = this.watermarker.getExpectedHighWatermark(partition, millis, millis2);
                            HiveWorkUnit workUnitForPartition = workUnitForPartition(hiveDataset, partition, z);
                            workUnitForPartition.setWatermarkInterval(new WatermarkInterval(previousHighWatermark, expectedHighWatermark));
                            EventWorkunitUtils.setPartitionSlaEventMetadata(workUnitForPartition, hiveDataset.getTable(), partition, updateTime, previousHighWatermark.getValue(), this.beginGetWorkunitsTime);
                            this.workunits.add(workUnitForPartition);
                            log.info(String.format("Creating workunit for partition %s as updateTime %s is greater than low watermark %s", partition.getCompleteName(), Long.valueOf(updateTime), Long.valueOf(previousHighWatermark.getValue())));
                        } else {
                            log.info(String.format("Not creating workunit for partition %s as updateTime %s is lesser than low watermark %s", partition.getCompleteName(), Long.valueOf(updateTime), Long.valueOf(previousHighWatermark.getValue())));
                        }
                    } else {
                        log.info(String.format("Not creating workunit for partition %s as partition path %s contains data path tokens to ignore %s", partition.getCompleteName(), partition.getLocation(), this.ignoreDataPathIdentifierList));
                    }
                } catch (UpdateNotFoundException e) {
                    log.error(String.format("Not creating workunit for %s as update time was not found. %s", partition.getCompleteName(), e.getMessage()));
                } catch (UncheckedExecutionException e2) {
                    log.error(String.format("Not creating workunit for %s because an unchecked exception occurred. %s", partition.getCompleteName(), e2.getMessage()));
                } catch (SchemaNotFoundException e3) {
                    log.error(String.format("Not creating workunit for %s as schema was not found. %s", partition.getCompleteName(), e3.getMessage()));
                }
            }
        }
    }

    @Deprecated
    protected HiveWorkUnit workUnitForPartition(HiveDataset hiveDataset, Partition partition) throws IOException {
        return workUnitForPartition(hiveDataset, partition, false);
    }

    protected HiveWorkUnit workUnitForPartition(HiveDataset hiveDataset, Partition partition, boolean z) throws IOException {
        HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset, partition);
        if (z || isAvro(hiveDataset.getTable())) {
            hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
            hiveWorkUnit.setPartitionSchemaUrl(this.avroSchemaManager.getSchemaUrl(partition));
        }
        return hiveWorkUnit;
    }

    protected boolean shouldCreateWorkUnit(Path path) {
        if (null == this.ignoreDataPathIdentifierList || this.ignoreDataPathIdentifierList.size() == 0) {
            return true;
        }
        Iterator<String> it = this.ignoreDataPathIdentifierList.iterator();
        while (it.hasNext()) {
            if (path.toString().toLowerCase().contains(it.next().toLowerCase())) {
                return false;
            }
        }
        return true;
    }

    protected boolean shouldCreateWorkunit(Partition partition, LongWatermark longWatermark) throws UpdateNotFoundException {
        return shouldCreateWorkunit(getCreateTime(partition), this.updateProvider.getUpdateTime(partition), longWatermark);
    }

    protected boolean shouldCreateWorkunit(Table table, LongWatermark longWatermark) throws UpdateNotFoundException {
        return shouldCreateWorkunit(getCreateTime(table), this.updateProvider.getUpdateTime(table), longWatermark);
    }

    protected boolean shouldCreateWorkunit(long j, long j2, LongWatermark longWatermark) {
        if (new DateTime(j2).isBefore(this.maxLookBackTime)) {
            return false;
        }
        return new DateTime(j2).isAfter(longWatermark.getValue());
    }

    @VisibleForTesting
    public boolean isOlderThanLookback(Partition partition) {
        return new DateTime(getCreateTime(partition)).isBefore(this.maxLookBackTime);
    }

    @VisibleForTesting
    public static long getCreateTime(Partition partition) {
        if (partition.getTPartition().getCreateTime() > 0) {
            return TimeUnit.MILLISECONDS.convert(partition.getTPartition().getCreateTime(), TimeUnit.SECONDS);
        }
        if (partition.getTPartition().isSetParameters() && partition.getTPartition().getParameters().containsKey("registrationGenerationTimeMillis")) {
            log.debug("Did not find createTime in Hive partition, used distcp registration generation time.");
            return Long.parseLong((String) partition.getTPartition().getParameters().get("registrationGenerationTimeMillis"));
        }
        log.warn(String.format("Could not find create time for partition %s. Will return createTime as 0", partition.getCompleteName()));
        return 0L;
    }

    protected static long getCreateTime(Table table) {
        return TimeUnit.MILLISECONDS.convert(table.getTTable().getCreateTime(), TimeUnit.SECONDS);
    }

    public Extractor getExtractor(WorkUnitState workUnitState) throws IOException {
        try {
            return ((HiveBaseExtractorFactory) this.classAliasResolver.resolveClass(workUnitState.getProp(HIVE_SOURCE_EXTRACTOR_TYPE, DEFAULT_HIVE_SOURCE_EXTRACTOR_TYPE)).newInstance()).createExtractor(workUnitState, getSourceFs(workUnitState));
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    public void shutdown(SourceState sourceState) {
    }

    public static FileSystem getSourceFs(State state) throws IOException {
        return state.contains(HIVE_SOURCE_FS_URI) ? FileSystem.get(URI.create(state.getProp(HIVE_SOURCE_FS_URI)), HadoopUtils.getConfFromState(state)) : FileSystem.get(HadoopUtils.getConfFromState(state));
    }

    private void silenceHiveLoggers() {
        Iterator it = ImmutableList.of("org.apache.hadoop.hive", "org.apache.hive", "hive.ql.parse").iterator();
        while (it.hasNext()) {
            org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger((String) it.next());
            if (logger != null) {
                logger.setLevel(Level.WARN);
            }
        }
    }

    private boolean isAvro(Table table) {
        return AvroSerDe.class.getName().equals(table.getSd().getSerdeInfo().getSerializationLib());
    }
}
