package org.apache.gobblin.source.extractor.extract;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.config.client.ConfigClient;
import org.apache.gobblin.config.client.ConfigClientCache;
import org.apache.gobblin.config.client.api.ConfigStoreFactoryDoesNotExistsException;
import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
import org.apache.gobblin.config.store.api.ConfigStoreCreationException;
import org.apache.gobblin.config.store.api.VersionDoesNotExistException;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.source.extractor.partition.Partition;
import org.apache.gobblin.source.extractor.partition.Partitioner;
import org.apache.gobblin.source.extractor.utils.Utils;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.DatasetFilterUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.dataset.DatasetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:org/apache/gobblin/source/extractor/extract/QueryBasedSource.class */
public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> {
    public static final String ENTITY_BLACKLIST = "entity.blacklist";
    public static final String ENTITY_WHITELIST = "entity.whitelist";
    public static final String SOURCE_OBTAIN_TABLE_PROPS_FROM_CONFIG_STORE = "source.obtain_table_props_from_config_store";
    public static final boolean DEFAULT_SOURCE_OBTAIN_TABLE_PROPS_FROM_CONFIG_STORE = false;
    private static final String QUERY_BASED_SOURCE = "query_based_source";
    public static final String WORK_UNIT_STATE_VERSION_KEY = "source.querybased.workUnitState.version";
    protected Optional<LineageInfo> lineageInfo;
    private static final Logger log = LoggerFactory.getLogger(QueryBasedSource.class);
    public static final Integer CURRENT_WORK_UNIT_STATE_VERSION = 3;

    /* loaded from: input_file:org/apache/gobblin/source/extractor/extract/QueryBasedSource$SourceEntity.class */
    public static final class SourceEntity {
        private final String sourceEntityName;
        private final String destTableName;

        public String getDatasetName() {
            return this.sourceEntityName;
        }

        static String sanitizeEntityName(String str) {
            return Utils.escapeSpecialCharacters(str, "$,&", "_");
        }

        public static SourceEntity fromSourceEntityName(String str) {
            return new SourceEntity(str, sanitizeEntityName(str));
        }

        public static Optional<SourceEntity> fromState(State state) {
            String prop;
            String str;
            if (state.contains("source.entity")) {
                str = state.getProp("source.entity");
                prop = state.getProp("extract.table.name", sanitizeEntityName(str));
            } else {
                if (!state.contains("extract.table.name")) {
                    return Optional.absent();
                }
                prop = state.getProp("extract.table.name");
                str = prop;
            }
            return Optional.of(new SourceEntity(str, prop));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            SourceEntity sourceEntity = (SourceEntity) obj;
            return getDatasetName() == null ? sourceEntity.getDatasetName() == null : getDatasetName().equals(sourceEntity.getDatasetName());
        }

        public int hashCode() {
            return (31 * 1) + (getDatasetName() == null ? 0 : getDatasetName().hashCode());
        }

        public SourceEntity(String str, String str2) {
            this.sourceEntityName = str;
            this.destTableName = str2;
        }

        public String getSourceEntityName() {
            return this.sourceEntityName;
        }

        public String getDestTableName() {
            return this.destTableName;
        }

        public String toString() {
            return "QueryBasedSource.SourceEntity(sourceEntityName=" + getSourceEntityName() + ", destTableName=" + getDestTableName() + ")";
        }
    }

    public List<WorkUnit> getWorkunits(SourceState sourceState) {
        initLogger(sourceState);
        this.lineageInfo = LineageInfo.getLineageInfo(sourceState.getBroker());
        ArrayList newArrayList = Lists.newArrayList();
        Set<SourceEntity> filteredSourceEntities = getFilteredSourceEntities(sourceState);
        Map<SourceEntity, State> tableSpecificPropsFromConfigStore = shouldObtainTablePropsFromConfigStore(sourceState) ? getTableSpecificPropsFromConfigStore(filteredSourceEntities, sourceState) : getTableSpecificPropsFromState(filteredSourceEntities, sourceState);
        Map<SourceEntity, Long> previousWatermarksForAllTables = getPreviousWatermarksForAllTables(sourceState);
        Iterator it = Sets.union(filteredSourceEntities, previousWatermarksForAllTables.keySet()).iterator();
        while (it.hasNext()) {
            SourceEntity sourceEntity = (SourceEntity) it.next();
            log.info("Source entity to be processed: {}, carry-over from previous state: {} ", sourceEntity, Boolean.valueOf(!filteredSourceEntities.contains(sourceEntity)));
            SourceState combinedState = getCombinedState(sourceState, tableSpecificPropsFromConfigStore.get(sourceEntity));
            long longValue = previousWatermarksForAllTables.containsKey(sourceEntity) ? previousWatermarksForAllTables.get(sourceEntity).longValue() : -1L;
            if (!filteredSourceEntities.contains(sourceEntity)) {
                combinedState.setProp("source.querybased.end.value", Long.valueOf(longValue));
            }
            newArrayList.addAll(generateWorkUnits(sourceEntity, combinedState, longValue));
        }
        log.info("Total number of workunits for the current run: " + newArrayList.size());
        List previousWorkUnitsForRetry = getPreviousWorkUnitsForRetry(sourceState);
        log.info("Total number of incomplete tasks from the previous run: " + previousWorkUnitsForRetry.size());
        newArrayList.addAll(previousWorkUnitsForRetry);
        return pack(newArrayList, sourceState.getPropAsInt("mr.job.max.mappers", 100));
    }

    protected List<WorkUnit> generateWorkUnits(SourceEntity sourceEntity, SourceState sourceState, long j) {
        ArrayList newArrayList = Lists.newArrayList();
        String prop = sourceState.getProp("extract.namespace");
        Extract.TableType valueOf = Extract.TableType.valueOf(sourceState.getProp("extract.table.type").toUpperCase());
        List<Partition> partitionList = new Partitioner(sourceState).getPartitionList(j);
        Collections.sort(partitionList, Partitioner.ascendingComparator);
        String destTableName = sourceEntity.getDestTableName();
        log.info("Create extract output with table name is " + destTableName);
        Extract createExtract = createExtract(valueOf, prop, destTableName);
        if (Boolean.valueOf(sourceState.getProp("extract.is.full")).booleanValue()) {
            createExtract.setFullTrue(System.currentTimeMillis());
        }
        for (Partition partition : partitionList) {
            WorkUnit create = WorkUnit.create(createExtract);
            create.setProp("source.entity", sourceEntity.getSourceEntityName());
            create.setProp("extract.table.name", sourceEntity.getDestTableName());
            create.setProp(WORK_UNIT_STATE_VERSION_KEY, CURRENT_WORK_UNIT_STATE_VERSION);
            addLineageSourceInfo(sourceState, sourceEntity, create);
            partition.serialize(create);
            newArrayList.add(create);
        }
        return newArrayList;
    }

    protected void addLineageSourceInfo(SourceState sourceState, SourceEntity sourceEntity, WorkUnit workUnit) {
    }

    protected Set<SourceEntity> getFilteredSourceEntities(SourceState sourceState) {
        return getFilteredSourceEntitiesHelper(sourceState, getSourceEntities(sourceState));
    }

    static Set<SourceEntity> getFilteredSourceEntitiesHelper(SourceState sourceState, Iterable<SourceEntity> iterable) {
        HashSet hashSet = new HashSet();
        List patternList = DatasetFilterUtils.getPatternList(sourceState, ENTITY_BLACKLIST);
        List patternList2 = DatasetFilterUtils.getPatternList(sourceState, ENTITY_WHITELIST);
        for (SourceEntity sourceEntity : iterable) {
            if (DatasetFilterUtils.survived(sourceEntity.getSourceEntityName(), patternList, patternList2)) {
                hashSet.add(sourceEntity);
            }
        }
        return hashSet;
    }

    public static Map<SourceEntity, State> getTableSpecificPropsFromState(Iterable<SourceEntity> iterable, SourceState sourceState) {
        HashMap hashMap = new HashMap();
        for (SourceEntity sourceEntity : iterable) {
            hashMap.put(sourceEntity.getDatasetName(), sourceEntity);
        }
        Map datasetSpecificProps = DatasetUtils.getDatasetSpecificProps(hashMap.keySet(), sourceState);
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : datasetSpecificProps.entrySet()) {
            hashMap2.put(hashMap.get(entry.getKey()), entry.getValue());
        }
        return hashMap2;
    }

    protected Set<SourceEntity> getSourceEntities(State state) {
        return getSourceEntitiesHelper(state);
    }

    static Set<SourceEntity> getSourceEntitiesHelper(State state) {
        if (state.contains("source.entities")) {
            log.info("Using entity names in source.entities");
            HashSet hashSet = new HashSet();
            Iterator it = state.getPropAsList("source.entities").iterator();
            while (it.hasNext()) {
                hashSet.add(SourceEntity.fromSourceEntityName((String) it.next()));
            }
            return hashSet;
        }
        if (!state.contains("source.entity") && !state.contains("extract.table.name")) {
            throw new IllegalStateException(String.format("One of the following properties must be specified: %s, %s.", "source.entities", "source.entity"));
        }
        Optional<SourceEntity> fromState = SourceEntity.fromState(state);
        log.info("Using entity name in " + fromState.get());
        return ImmutableSet.of(fromState.get());
    }

    private static boolean shouldObtainTablePropsFromConfigStore(SourceState sourceState) {
        return sourceState.getPropAsBoolean(SOURCE_OBTAIN_TABLE_PROPS_FROM_CONFIG_STORE, false);
    }

    private static Map<SourceEntity, State> getTableSpecificPropsFromConfigStore(Collection<SourceEntity> collection, State state) {
        ConfigClient client = ConfigClientCache.getClient(VersionStabilityPolicy.STRONG_LOCAL_STABILITY);
        String prop = state.getProp("gobblin.config.management.store.uri");
        Preconditions.checkNotNull(prop);
        HashMap newHashMap = Maps.newHashMap();
        for (SourceEntity sourceEntity : collection) {
            try {
                newHashMap.put(sourceEntity, ConfigUtils.configToState(client.getConfig(PathUtils.combinePaths(new String[]{prop, QUERY_BASED_SOURCE, sourceEntity.getDatasetName()}).toUri())));
            } catch (VersionDoesNotExistException | ConfigStoreFactoryDoesNotExistsException | ConfigStoreCreationException e) {
                throw new RuntimeException("Unable to get table config for " + sourceEntity, e);
            }
        }
        return newHashMap;
    }

    private static SourceState getCombinedState(SourceState sourceState, State state) {
        if (state == null) {
            return sourceState;
        }
        SourceState sourceState2 = new SourceState(sourceState, sourceState.getPreviousDatasetStatesByUrns(), sourceState.getPreviousWorkUnitStates());
        sourceState2.addAll(state);
        return sourceState2;
    }

    private static List<WorkUnit> pack(List<WorkUnit> list, int i) {
        Preconditions.checkArgument(i > 0);
        if (list.size() <= i) {
            return list;
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i);
        for (int i2 = 0; i2 < i; i2++) {
            newArrayListWithCapacity.add(MultiWorkUnit.createEmpty());
        }
        for (int i3 = 0; i3 < list.size(); i3++) {
            ((MultiWorkUnit) newArrayListWithCapacity.get(i3 % i)).addWorkUnit(list.get(i3));
        }
        return newArrayListWithCapacity;
    }

    public void shutdown(SourceState sourceState) {
    }

    static Map<SourceEntity, Long> getPreviousWatermarksForAllTables(SourceState sourceState) {
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        HashMap newHashMap3 = Maps.newHashMap();
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        boolean z = JobCommitPolicy.getCommitPolicy(sourceState) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
        for (WorkUnitState workUnitState : sourceState.getPreviousWorkUnitStates()) {
            Optional<SourceEntity> fromState = SourceEntity.fromState(workUnitState);
            if (fromState.isPresent()) {
                SourceEntity sourceEntity = (SourceEntity) fromState.get();
                long j = -1;
                LongWatermark lowWatermark = workUnitState.getWorkunit().getLowWatermark(LongWatermark.class);
                if (lowWatermark != null) {
                    j = lowWatermark.getValue();
                } else if (workUnitState.getProperties().containsKey("workunit.low.water.mark")) {
                    j = Long.parseLong(workUnitState.getProperties().getProperty("workunit.low.water.mark"));
                    log.warn("can not find low water mark in json format, getting value from workunit.low.water.mark low water mark " + j);
                }
                if (newHashMap2.containsKey(sourceEntity)) {
                    newHashMap2.put(sourceEntity, Long.valueOf(Math.min(((Long) newHashMap2.get(sourceEntity)).longValue(), j)));
                } else {
                    newHashMap2.put(sourceEntity, Long.valueOf(j));
                }
                long j2 = -1;
                LongWatermark actualHighWatermark = workUnitState.getActualHighWatermark(LongWatermark.class);
                if (actualHighWatermark != null) {
                    j2 = actualHighWatermark.getValue();
                } else if (workUnitState.getProperties().containsKey("workunit.state.runtime.high.water.mark")) {
                    j2 = Long.parseLong(workUnitState.getProperties().getProperty("workunit.state.runtime.high.water.mark"));
                    log.warn("can not find high water mark in json format, getting value from workunit.state.runtime.high.water.mark high water mark " + j2);
                }
                if (newHashMap3.containsKey(sourceEntity)) {
                    newHashMap3.put(sourceEntity, Long.valueOf(Math.max(((Long) newHashMap3.get(sourceEntity)).longValue(), j2)));
                } else {
                    newHashMap3.put(sourceEntity, Long.valueOf(j2));
                }
                if (z && !isSuccessfulOrCommited(workUnitState)) {
                    newHashSet.add(sourceEntity);
                }
                if (!isAnyDataProcessed(workUnitState)) {
                    newHashSet2.add(sourceEntity);
                }
            } else {
                log.warn("Missing source entity for WorkUnit state: " + workUnitState);
            }
        }
        for (Map.Entry entry : newHashMap2.entrySet()) {
            if (newHashSet.contains(entry.getKey())) {
                log.info("Resetting low watermark to {} because previous run failed.", entry.getValue());
                newHashMap.put(entry.getKey(), entry.getValue());
            } else if (newHashSet2.contains(entry.getKey()) && sourceState.getPropAsBoolean("source.querybased.resetEmptyPartitionWatermark", true)) {
                log.info("Resetting low watermakr to {} because previous run processed no data.", entry.getValue());
                newHashMap.put(entry.getKey(), entry.getValue());
            } else {
                newHashMap.put(entry.getKey(), newHashMap3.get(entry.getKey()));
            }
        }
        return newHashMap;
    }

    private static boolean isSuccessfulOrCommited(WorkUnitState workUnitState) {
        return workUnitState.getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL || workUnitState.getWorkingState() == WorkUnitState.WorkingState.COMMITTED;
    }

    private static boolean isAnyDataProcessed(WorkUnitState workUnitState) {
        return workUnitState.getPropAsLong("qualitychecker.rows.expected", 0L) > 0;
    }

    private static void initLogger(SourceState sourceState) {
        MDC.put("sourceInfo", "[" + StringUtils.stripToEmpty(sourceState.getProp("source.querybased.schema")) + "_" + StringUtils.stripToEmpty(sourceState.getProp("source.entity")) + "]");
    }
}
