package org.apache.gobblin.compliance.purger;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.gobblin.compliance.ComplianceConfigurationKeys;
import org.apache.gobblin.compliance.ComplianceEvents;
import org.apache.gobblin.compliance.HivePartitionDataset;
import org.apache.gobblin.compliance.HivePartitionFinder;
import org.apache.gobblin.compliance.utils.DatasetUtils;
import org.apache.gobblin.compliance.utils.ProxyUtils;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.DatasetsFinder;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/compliance/purger/HivePurgerSource.class */
public class HivePurgerSource implements Source {
    private static final Logger log = LoggerFactory.getLogger(HivePurgerSource.class);
    protected DatasetsFinder datasetFinder;
    protected int maxWorkUnitExecutionAttempts;
    protected int maxWorkUnits;
    protected String lowWatermark;
    protected String timeStamp;
    protected PurgePolicy policy;
    protected boolean shouldProxy;
    protected MetricContext metricContext;
    protected EventSubmitter eventSubmitter;
    protected int executionCount;
    protected Map<String, WorkUnit> workUnitMap = new HashMap();
    protected int workUnitsCreatedCount = 0;
    protected List<HivePartitionDataset> datasets = new ArrayList();

    @VisibleForTesting
    protected void initialize(SourceState sourceState) throws IOException {
        setTimeStamp();
        setLowWatermark(sourceState);
        setExecutionCount(sourceState);
        this.metricContext = Instrumented.getMetricContext(sourceState, getClass());
        this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, "gobblin.compliance").build();
        submitCycleCompletionEvent();
        this.maxWorkUnits = sourceState.getPropAsInt(ComplianceConfigurationKeys.MAX_WORKUNITS_KEY, 5);
        this.maxWorkUnitExecutionAttempts = sourceState.getPropAsInt(ComplianceConfigurationKeys.MAX_WORKUNIT_EXECUTION_ATTEMPTS_KEY, 3);
        this.datasetFinder = (DatasetsFinder) GobblinConstructorUtils.invokeConstructor(DatasetsFinder.class, sourceState.getProp(ComplianceConfigurationKeys.GOBBLIN_COMPLIANCE_DATASET_FINDER_CLASS, HivePartitionFinder.class.getName()), new Object[]{sourceState});
        populateDatasets();
        this.policy = (PurgePolicy) GobblinConstructorUtils.invokeConstructor(PurgePolicy.class, sourceState.getProp(ComplianceConfigurationKeys.PURGE_POLICY_CLASS, HivePurgerPolicy.class.getName()), new Object[]{this.lowWatermark});
        this.shouldProxy = sourceState.getPropAsBoolean(ComplianceConfigurationKeys.GOBBLIN_COMPLIANCE_SHOULD_PROXY, false);
        if (this.shouldProxy) {
            try {
                ProxyUtils.cancelTokens(new State(sourceState));
            } catch (InterruptedException | TException e) {
                throw new IOException(e);
            }
        }
    }

    public List<WorkUnit> getWorkunits(SourceState sourceState) {
        try {
            initialize(sourceState);
            createWorkUnits(sourceState);
        } catch (IOException e) {
            Throwables.propagate(e);
        }
        return new ArrayList(this.workUnitMap.values());
    }

    protected Optional<WorkUnit> createNewWorkUnit(String str, int i) {
        Optional<HivePartitionDataset> findDataset = DatasetUtils.findDataset(str, this.datasets);
        return !findDataset.isPresent() ? Optional.absent() : Optional.fromNullable(createNewWorkUnit((HivePartitionDataset) findDataset.get(), i));
    }

    protected WorkUnit createNewWorkUnit(HivePartitionDataset hivePartitionDataset) {
        return createNewWorkUnit(hivePartitionDataset, 1);
    }

    protected WorkUnit createNewWorkUnit(HivePartitionDataset hivePartitionDataset, int i) {
        WorkUnit createEmpty = WorkUnit.createEmpty();
        createEmpty.setProp(ComplianceConfigurationKeys.PARTITION_NAME, hivePartitionDataset.datasetURN());
        createEmpty.setProp(ComplianceConfigurationKeys.EXECUTION_ATTEMPTS, Integer.valueOf(i));
        createEmpty.setProp(ComplianceConfigurationKeys.TIMESTAMP, this.timeStamp);
        createEmpty.setProp(ComplianceConfigurationKeys.GOBBLIN_COMPLIANCE_SHOULD_PROXY, Boolean.valueOf(this.shouldProxy));
        createEmpty.setProp(ComplianceConfigurationKeys.EXECUTION_COUNT, Integer.valueOf(this.executionCount));
        createEmpty.setProp(ComplianceConfigurationKeys.NUM_ROWS, DatasetUtils.getProperty(hivePartitionDataset, ComplianceConfigurationKeys.NUM_ROWS, -1L));
        createEmpty.setProp(ComplianceConfigurationKeys.RAW_DATA_SIZE, DatasetUtils.getProperty(hivePartitionDataset, ComplianceConfigurationKeys.RAW_DATA_SIZE, -1L));
        createEmpty.setProp(ComplianceConfigurationKeys.TOTAL_SIZE, DatasetUtils.getProperty(hivePartitionDataset, ComplianceConfigurationKeys.TOTAL_SIZE, -1L));
        submitWorkUnitGeneratedEvent(hivePartitionDataset.datasetURN(), i);
        return createEmpty;
    }

    protected void submitWorkUnitGeneratedEvent(String str, int i) {
        HashMap hashMap = new HashMap();
        hashMap.put(ComplianceConfigurationKeys.EXECUTION_ATTEMPTS, Integer.toString(i));
        hashMap.put(ComplianceConfigurationKeys.PARTITION_NAME, str);
        this.eventSubmitter.submit(ComplianceEvents.Purger.WORKUNIT_GENERATED, hashMap);
    }

    protected void createWorkUnits(SourceState sourceState) throws IOException {
        createWorkunitsFromPreviousState(sourceState);
        if (this.datasets.isEmpty()) {
            return;
        }
        for (HivePartitionDataset hivePartitionDataset : this.datasets) {
            hivePartitionDataset.getOwner();
            if (workUnitsExceeded()) {
                log.info("Workunits exceeded");
                setJobWatermark(sourceState, hivePartitionDataset.datasetURN());
                return;
            } else if (this.policy.shouldPurge(hivePartitionDataset)) {
                WorkUnit createNewWorkUnit = createNewWorkUnit(hivePartitionDataset);
                log.info("Created new work unit with partition " + createNewWorkUnit.getProp(ComplianceConfigurationKeys.PARTITION_NAME));
                this.workUnitMap.put(createNewWorkUnit.getProp(ComplianceConfigurationKeys.PARTITION_NAME), createNewWorkUnit);
                this.workUnitsCreatedCount++;
            }
        }
        if (sourceState.contains(ComplianceConfigurationKeys.HIVE_PURGER_WATERMARK)) {
            return;
        }
        setJobWatermark(sourceState, ComplianceConfigurationKeys.NO_PREVIOUS_WATERMARK);
    }

    protected boolean workUnitsExceeded() {
        return this.workUnitsCreatedCount >= this.maxWorkUnits;
    }

    protected void populateDatasets() throws IOException {
        this.datasets = this.datasetFinder.findDatasets();
        sortHiveDatasets(this.datasets);
    }

    protected List<HivePartitionDataset> sortHiveDatasets(List<HivePartitionDataset> list) {
        Collections.sort(list, new Comparator<HivePartitionDataset>() { // from class: org.apache.gobblin.compliance.purger.HivePurgerSource.1
            @Override // java.util.Comparator
            public int compare(HivePartitionDataset hivePartitionDataset, HivePartitionDataset hivePartitionDataset2) {
                return hivePartitionDataset.datasetURN().compareTo(hivePartitionDataset2.datasetURN());
            }
        });
        return list;
    }

    protected void createWorkunitsFromPreviousState(SourceState sourceState) {
        if (this.lowWatermark.equalsIgnoreCase(ComplianceConfigurationKeys.NO_PREVIOUS_WATERMARK) || Iterables.isEmpty(sourceState.getPreviousWorkUnitStates())) {
            return;
        }
        for (WorkUnitState workUnitState : sourceState.getPreviousWorkUnitStates()) {
            if (workUnitState.getWorkingState() != WorkUnitState.WorkingState.COMMITTED) {
                WorkUnit workunit = workUnitState.getWorkunit();
                Preconditions.checkArgument(workunit.contains(ComplianceConfigurationKeys.PARTITION_NAME), "Older WorkUnit doesn't contain property partition name.");
                int propAsInt = workunit.getPropAsInt(ComplianceConfigurationKeys.EXECUTION_ATTEMPTS, 1);
                if (propAsInt < this.maxWorkUnitExecutionAttempts) {
                    Optional<WorkUnit> createNewWorkUnit = createNewWorkUnit(workunit.getProp(ComplianceConfigurationKeys.PARTITION_NAME), propAsInt + 1);
                    if (createNewWorkUnit.isPresent()) {
                        WorkUnit workUnit = (WorkUnit) createNewWorkUnit.get();
                        log.info("Revived old Work Unit for partiton " + workUnit.getProp(ComplianceConfigurationKeys.PARTITION_NAME) + " having execution attempt " + workUnit.getProp(ComplianceConfigurationKeys.EXECUTION_ATTEMPTS));
                        this.workUnitMap.put(workUnit.getProp(ComplianceConfigurationKeys.PARTITION_NAME), workUnit);
                    }
                }
            }
        }
    }

    protected void setTimeStamp() {
        this.timeStamp = Long.toString(System.currentTimeMillis());
    }

    public Extractor getExtractor(WorkUnitState workUnitState) throws IOException {
        return new HivePurgerExtractor(workUnitState);
    }

    public void shutdown(SourceState sourceState) {
    }

    protected void setLowWatermark(SourceState sourceState) {
        this.lowWatermark = getWatermarkFromPreviousWorkUnits(sourceState, ComplianceConfigurationKeys.HIVE_PURGER_WATERMARK);
        log.info("Setting low watermark for the job: " + this.lowWatermark);
    }

    protected void setExecutionCount(SourceState sourceState) {
        String watermarkFromPreviousWorkUnits = getWatermarkFromPreviousWorkUnits(sourceState, ComplianceConfigurationKeys.EXECUTION_COUNT);
        if (watermarkFromPreviousWorkUnits.equalsIgnoreCase(ComplianceConfigurationKeys.NO_PREVIOUS_WATERMARK)) {
            this.executionCount = 1;
            log.info("No executionCount is found. Setting it to " + this.executionCount);
            return;
        }
        try {
            this.executionCount = Integer.parseInt(watermarkFromPreviousWorkUnits) + 1;
        } catch (NumberFormatException e) {
            log.warn("Unable to convert executionCount " + watermarkFromPreviousWorkUnits + " to int : " + e.getMessage());
            this.executionCount = 1;
        }
    }

    protected void submitCycleCompletionEvent() {
        if (this.lowWatermark.equalsIgnoreCase(ComplianceConfigurationKeys.NO_PREVIOUS_WATERMARK) && this.executionCount > 1) {
            HashMap hashMap = new HashMap();
            hashMap.put(ComplianceConfigurationKeys.TOTAL_EXECUTIONS, Integer.toString(this.executionCount - 1));
            this.eventSubmitter.submit(ComplianceEvents.Purger.CYCLE_COMPLETED, hashMap);
            this.executionCount = 1;
        }
    }

    protected String getLowWatermark() {
        return this.lowWatermark;
    }

    protected void setJobWatermark(SourceState sourceState, String str) {
        sourceState.setProp(ComplianceConfigurationKeys.HIVE_PURGER_WATERMARK, str);
        log.info("Setting job watermark for the job: " + str);
    }

    protected static String getWatermarkFromPreviousWorkUnits(SourceState sourceState, String str) {
        return sourceState.getPreviousWorkUnitStates().isEmpty() ? ComplianceConfigurationKeys.NO_PREVIOUS_WATERMARK : ((WorkUnitState) sourceState.getPreviousWorkUnitStates().get(0)).getProp(str, ComplianceConfigurationKeys.NO_PREVIOUS_WATERMARK);
    }
}
