package org.apache.gobblin.data.management.copy;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicates;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.UnmodifiableIterator;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.extractor.EmptyExtractor;
import org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractor;
import org.apache.gobblin.data.management.copy.prioritization.FileSetComparator;
import org.apache.gobblin.data.management.copy.publisher.CopyEventSubmitterHelper;
import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
import org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkGenerator;
import org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkHelper;
import org.apache.gobblin.data.management.dataset.DatasetUtils;
import org.apache.gobblin.data.management.partition.CopyableDatasetRequestor;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.data.management.partition.FileSetResourceEstimator;
import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.dataset.IterableDatasetFinderImpl;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.WatermarkInterval;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitWeighter;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.WriterUtils;
import org.apache.gobblin.util.binpacking.FieldWeighter;
import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking;
import org.apache.gobblin.util.deprecation.DeprecationUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import org.apache.gobblin.util.guid.Guid;
import org.apache.gobblin.util.guid.HasGuid;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.util.request_allocation.AllocatedRequestsIterator;
import org.apache.gobblin.util.request_allocation.GreedyAllocator;
import org.apache.gobblin.util.request_allocation.HierarchicalAllocator;
import org.apache.gobblin.util.request_allocation.HierarchicalPrioritizer;
import org.apache.gobblin.util.request_allocation.PriorityIterableBasedRequestAllocator;
import org.apache.gobblin.util.request_allocation.RequestAllocator;
import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
import org.apache.gobblin.util.request_allocation.RequestAllocatorUtils;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/data/management/copy/CopySource.class */
public class CopySource extends AbstractSource<String, FileAwareInputStream> {
    private static final Logger log = LoggerFactory.getLogger(CopySource.class);
    public static final String DEFAULT_DATASET_PROFILE_CLASS_KEY = CopyableGlobDatasetFinder.class.getCanonicalName();
    public static final String SERIALIZED_COPYABLE_FILE = "gobblin.copy.serialized.copyable.file";
    public static final String COPY_ENTITY_CLASS = "gobblin.copy.copy.entity.class";
    public static final String SERIALIZED_COPYABLE_DATASET = "gobblin.copy.serialized.copyable.datasets";
    public static final String WORK_UNIT_GUID = "gobblin.copy.work.unit.guid";
    public static final String MAX_CONCURRENT_LISTING_SERVICES = "gobblin.copy.max.concurrent.listing.services";
    public static final int DEFAULT_MAX_CONCURRENT_LISTING_SERVICES = 20;
    public static final String MAX_FILES_COPIED_KEY = "gobblin.copy.max.files.copied";
    public static final String SIMULATE = "gobblin.copy.simulate";
    public static final String MAX_SIZE_MULTI_WORKUNITS = "gobblin.copy.binPacking.maxSizePerBin";
    public static final String MAX_WORK_UNITS_PER_BIN = "gobblin.copy.binPacking.maxWorkUnitsPerBin";
    public static final String REQUESTS_EXCEEDING_AVAILABLE_RESOURCE_POOL_EVENT_NAME = "RequestsExceedingAvailableResourcePoolEvent";
    public static final String REQUESTS_DROPPED_EVENT_NAME = "RequestsDroppedEvent";
    public static final String REQUESTS_REJECTED_DUE_TO_INSUFFICIENT_EVICTION_EVENT_NAME = "RequestsRejectedDueToInsufficientEvictionEvent";
    public static final String REQUESTS_REJECTED_WITH_LOW_PRIORITY_EVENT_NAME = "RequestsRejectedWithLowPriorityEvent";
    public static final String FILESET_NAME = "fileset.name";
    public static final String FILESET_TOTAL_ENTITIES = "fileset.total.entities";
    public static final String FILESET_TOTAL_SIZE_IN_BYTES = "fileset.total.size";
    private static final String WORK_UNIT_WEIGHT = "gobblin.copy.workUnitWeight";
    private final WorkUnitWeighter weighter = new FieldWeighter(WORK_UNIT_WEIGHT);
    public MetricContext metricContext;
    public EventSubmitter eventSubmitter;
    protected Optional<LineageInfo> lineageInfo;

    @Alias("FileSetWorkUnitGenerator")
    /* loaded from: input_file:org/apache/gobblin/data/management/copy/CopySource$FileSetWorkUnitGenerator.class */
    public static class FileSetWorkUnitGenerator implements Callable<Void> {
        protected final CopyableDatasetBase copyableDataset;
        protected final FileSet<CopyEntity> fileSet;
        protected final State state;
        protected final FileSystem targetFs;
        protected final SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitList;
        protected final Optional<CopyableFileWatermarkGenerator> watermarkGenerator;
        protected final long minWorkUnitWeight;
        protected final Optional<LineageInfo> lineageInfo;

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() {
            try {
                Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, CopyConfiguration.COPY_PREFIX, this.fileSet.getName().replace(':', '_'));
                ArrayList newArrayList = Lists.newArrayList();
                UnmodifiableIterator it = this.fileSet.getFiles().iterator();
                while (it.hasNext()) {
                    CopyEntity copyEntity = (CopyEntity) it.next();
                    CopyableDatasetMetadata copyableDatasetMetadata = new CopyableDatasetMetadata(this.copyableDataset);
                    CopyEntity.DatasetAndPartition datasetAndPartition = copyEntity.getDatasetAndPartition(copyableDatasetMetadata);
                    WorkUnit workUnit = new WorkUnit(extract);
                    workUnit.addAll(this.state);
                    CopySource.serializeCopyEntity(workUnit, copyEntity);
                    CopySource.serializeCopyableDataset(workUnit, copyableDatasetMetadata);
                    GobblinMetrics.addCustomTagToState(workUnit, new Tag(CopyEventSubmitterHelper.DATASET_ROOT_METADATA_NAME, this.copyableDataset.datasetURN()));
                    workUnit.setProp("dataset.urn", datasetAndPartition.toString());
                    workUnit.setProp("event.sla.datasetUrn", this.copyableDataset.datasetURN());
                    workUnit.setProp("event.sla.partition", copyEntity.getFileSet());
                    CopySource.setWorkUnitWeight(workUnit, copyEntity, this.minWorkUnitWeight);
                    setWorkUnitWatermark(workUnit, this.watermarkGenerator, copyEntity);
                    CopySource.computeAndSetWorkUnitGuid(workUnit);
                    addLineageInfo(copyEntity, workUnit);
                    if ((copyEntity instanceof CopyableFile) && DistcpFileSplitter.allowSplit(this.state, this.targetFs)) {
                        newArrayList.addAll(DistcpFileSplitter.splitFile((CopyableFile) copyEntity, workUnit, this.targetFs));
                    } else {
                        newArrayList.add(workUnit);
                    }
                }
                this.workUnitList.putAll(this.fileSet, newArrayList);
                return null;
            } catch (IOException e) {
                throw new RuntimeException("Failed to generate work units for dataset " + this.copyableDataset.datasetURN(), e);
            }
        }

        private void setWorkUnitWatermark(WorkUnit workUnit, Optional<CopyableFileWatermarkGenerator> optional, CopyEntity copyEntity) throws IOException {
            if (copyEntity instanceof CopyableFile) {
                Optional<WatermarkInterval> copyableFileWatermark = CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile) copyEntity, optional);
                if (copyableFileWatermark.isPresent()) {
                    workUnit.setWatermarkInterval((WatermarkInterval) copyableFileWatermark.get());
                }
            }
        }

        private void addLineageInfo(CopyEntity copyEntity, WorkUnit workUnit) {
            if (copyEntity instanceof CopyableFile) {
                CopyableFile copyableFile = (CopyableFile) copyEntity;
                if (!this.lineageInfo.isPresent() || copyableFile.getSourceData() == null || copyableFile.getDestinationData() == null) {
                    return;
                }
                ((LineageInfo) this.lineageInfo.get()).setSource(copyableFile.getSourceData(), workUnit);
            }
        }

        @ConstructorProperties({"copyableDataset", "fileSet", "state", "targetFs", "workUnitList", "watermarkGenerator", "minWorkUnitWeight", "lineageInfo"})
        public FileSetWorkUnitGenerator(CopyableDatasetBase copyableDatasetBase, FileSet<CopyEntity> fileSet, State state, FileSystem fileSystem, SetMultimap<FileSet<CopyEntity>, WorkUnit> setMultimap, Optional<CopyableFileWatermarkGenerator> optional, long j, Optional<LineageInfo> optional2) {
            this.copyableDataset = copyableDatasetBase;
            this.fileSet = fileSet;
            this.state = state;
            this.targetFs = fileSystem;
            this.workUnitList = setMultimap;
            this.watermarkGenerator = optional;
            this.minWorkUnitWeight = j;
            this.lineageInfo = optional2;
        }
    }

    public List<WorkUnit> getWorkunits(final SourceState sourceState) {
        this.metricContext = Instrumented.getMetricContext(sourceState, CopySource.class);
        this.lineageInfo = LineageInfo.getLineageInfo(sourceState.getBroker());
        try {
            DeprecationUtils.renameDeprecatedKeys(sourceState, "gobblin.copy.prioritization.maxCopy.copyEntities", Lists.newArrayList(new String[]{MAX_FILES_COPIED_KEY}));
            FileSystem sourceFileSystem = HadoopUtils.getSourceFileSystem(sourceState);
            final FileSystem writerFileSystem = HadoopUtils.getWriterFileSystem(sourceState, 1, 0);
            sourceState.setProp("sourceCluster", sourceFileSystem.getUri());
            sourceState.setProp("destinationCluster", writerFileSystem.getUri());
            log.info("Identified source file system at {} and target file system at {}.", sourceFileSystem.getUri(), writerFileSystem.getUri());
            long propAsLong = sourceState.getPropAsLong(MAX_SIZE_MULTI_WORKUNITS, 0L);
            long propAsLong2 = sourceState.getPropAsLong(MAX_WORK_UNITS_PER_BIN, 50L);
            final long max = Math.max(1L, propAsLong / propAsLong2);
            final Optional<CopyableFileWatermarkGenerator> copyableFileWatermarkGenerator = CopyableFileWatermarkHelper.getCopyableFileWatermarkGenerator(sourceState);
            int propAsInt = sourceState.getPropAsInt(MAX_CONCURRENT_LISTING_SERVICES, 20);
            CopyConfiguration build = CopyConfiguration.builder(writerFileSystem, sourceState.getProperties()).build();
            this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, CopyConfiguration.COPY_PREFIX).build();
            IterableDatasetFinder instantiateDatasetFinder = DatasetUtils.instantiateDatasetFinder(sourceState.getProperties(), sourceFileSystem, DEFAULT_DATASET_PROFILE_CLASS_KEY, this.eventSubmitter, sourceState);
            UnmodifiableIterator filter = Iterators.filter(Iterators.transform((instantiateDatasetFinder instanceof IterableDatasetFinder ? instantiateDatasetFinder : new IterableDatasetFinderImpl(instantiateDatasetFinder)).getDatasetsIterator(), new CopyableDatasetRequestor.Factory(writerFileSystem, build, log)), Predicates.notNull());
            final SetMultimap synchronizedSetMultimap = Multimaps.synchronizedSetMultimap(HashMultimap.create());
            RequestAllocator<FileSet<CopyEntity>> createRequestAllocator = createRequestAllocator(build, propAsInt);
            AllocatedRequestsIterator allocateRequests = createRequestAllocator.allocateRequests(filter, build.getMaxToCopy());
            submitUnfulfilledRequestEvents(createRequestAllocator);
            final String prop = sourceState.getProp("copy.source.fileset.wu.generator.class", FileSetWorkUnitGenerator.class.getName());
            try {
                Iterator it = new IteratorExecutor(Iterators.transform(allocateRequests, new Function<FileSet<CopyEntity>, Callable<Void>>() { // from class: org.apache.gobblin.data.management.copy.CopySource.1
                    @Nullable
                    public Callable<Void> apply(FileSet<CopyEntity> fileSet) {
                        try {
                            return (Callable) GobblinConstructorUtils.invokeLongestConstructor(new ClassAliasResolver(FileSetWorkUnitGenerator.class).resolveClass(prop), new Object[]{fileSet.getDataset(), fileSet, sourceState, writerFileSystem, synchronizedSetMultimap, copyableFileWatermarkGenerator, Long.valueOf(max), CopySource.this.lineageInfo});
                        } catch (Exception e) {
                            throw new RuntimeException("Cannot create workunits generator", e);
                        }
                    }
                }), propAsInt, ExecutorsUtils.newDaemonThreadFactory(Optional.of(log), Optional.of("Copy-file-listing-pool-%d"))).execute().iterator();
                while (it.hasNext()) {
                    try {
                        ((Future) it.next()).get();
                    } catch (ExecutionException e) {
                        log.error("Failed to get work units for dataset.", e.getCause());
                    }
                }
                log.info(String.format("Created %s workunits ", Integer.valueOf(synchronizedSetMultimap.size())));
                build.getCopyContext().logCacheStatistics();
                if (!sourceState.contains(SIMULATE) || !sourceState.getPropAsBoolean(SIMULATE)) {
                    List pack = new WorstFitDecreasingBinPacking(propAsLong).pack(Lists.newArrayList(synchronizedSetMultimap.values()), this.weighter);
                    log.info(String.format("Bin packed work units. Initial work units: %d, packed work units: %d, max weight per bin: %d, max work units per bin: %d.", Integer.valueOf(synchronizedSetMultimap.size()), Integer.valueOf(pack.size()), Long.valueOf(propAsLong), Long.valueOf(propAsLong2)));
                    return ImmutableList.copyOf(pack);
                }
                log.info("Simulate mode enabled. Will not execute the copy.");
                for (Map.Entry entry : synchronizedSetMultimap.asMap().entrySet()) {
                    log.info(String.format("Actions for dataset %s file set %s.", ((FileSet) entry.getKey()).getDataset().datasetURN(), ((FileSet) entry.getKey()).getName()));
                    for (WorkUnit workUnit : (Collection) entry.getValue()) {
                        try {
                            log.info(deserializeCopyEntity(workUnit).explain());
                        } catch (Exception e2) {
                            log.info("Cannot deserialize CopyEntity from wu : {}", workUnit.toString());
                        }
                    }
                }
                return Lists.newArrayList();
            } catch (InterruptedException e3) {
                log.error("Retrieval of work units was interrupted. Aborting.");
                return Lists.newArrayList();
            }
        } catch (IOException e4) {
            throw new RuntimeException(e4);
        }
    }

    private void submitUnfulfilledRequestEventsHelper(List<FileSet<CopyEntity>> list, String str) {
        for (FileSet<CopyEntity> fileSet : list) {
            this.metricContext.submitEvent(GobblinTrackingEvent.newBuilder().setName(str).setNamespace(CopySource.class.getName()).setMetadata(ImmutableMap.builder().put("dataset.urn", fileSet.getDataset().getUrn()).put(FILESET_TOTAL_ENTITIES, Integer.toString(fileSet.getTotalEntities())).put(FILESET_TOTAL_SIZE_IN_BYTES, Long.toString(fileSet.getTotalSizeInBytes())).put(FILESET_NAME, fileSet.getName()).build()).build());
        }
    }

    private void submitUnfulfilledRequestEvents(RequestAllocator<FileSet<CopyEntity>> requestAllocator) {
        if (PriorityIterableBasedRequestAllocator.class.isAssignableFrom(requestAllocator.getClass())) {
            PriorityIterableBasedRequestAllocator priorityIterableBasedRequestAllocator = (PriorityIterableBasedRequestAllocator) requestAllocator;
            submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsExceedingAvailableResourcePool(), REQUESTS_EXCEEDING_AVAILABLE_RESOURCE_POOL_EVENT_NAME);
            submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsRejectedDueToInsufficientEviction(), REQUESTS_REJECTED_DUE_TO_INSUFFICIENT_EVICTION_EVENT_NAME);
            submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsRejectedWithLowPriority(), REQUESTS_REJECTED_WITH_LOW_PRIORITY_EVENT_NAME);
            submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsDropped(), REQUESTS_DROPPED_EVENT_NAME);
        }
    }

    private RequestAllocator<FileSet<CopyEntity>> createRequestAllocator(CopyConfiguration copyConfiguration, int i) {
        Optional<FileSetComparator> prioritizer = copyConfiguration.getPrioritizer();
        RequestAllocatorConfig.Builder withLimitedScopeConfig = RequestAllocatorConfig.builder(new FileSetResourceEstimator()).allowParallelization(i).storeRejectedRequests(copyConfiguration.getStoreRejectedRequestsSetting()).withLimitedScopeConfig(copyConfiguration.getPrioritizationConfig());
        if (!prioritizer.isPresent()) {
            return new GreedyAllocator(withLimitedScopeConfig.build());
        }
        withLimitedScopeConfig.withPrioritizer((Comparator) prioritizer.get());
        return prioritizer.get() instanceof HierarchicalPrioritizer ? new HierarchicalAllocator.Factory().createRequestAllocator(withLimitedScopeConfig.build()) : RequestAllocatorUtils.inferFromConfig(withLimitedScopeConfig.build());
    }

    public Extractor<String, FileAwareInputStream> getExtractor(WorkUnitState workUnitState) throws IOException {
        if (!CopyableFile.class.isAssignableFrom(getCopyEntityClass(workUnitState))) {
            return new EmptyExtractor("empty");
        }
        return extractorForCopyableFile(HadoopUtils.getSourceFileSystem(workUnitState), (CopyableFile) deserializeCopyEntity(workUnitState), workUnitState);
    }

    protected Extractor<String, FileAwareInputStream> extractorForCopyableFile(FileSystem fileSystem, CopyableFile copyableFile, WorkUnitState workUnitState) throws IOException {
        return new FileAwareInputStreamExtractor(fileSystem, copyableFile, workUnitState);
    }

    public void shutdown(SourceState sourceState) {
    }

    @Deprecated
    protected FileSystem getSourceFileSystem(State state) throws IOException {
        return HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(state.getProp("source.filebased.fs.uri", "file:///")), HadoopUtils.getConfFromState(state, Optional.of("source.filebased.encrypted"))), state);
    }

    @Deprecated
    private static FileSystem getTargetFileSystem(State state) throws IOException {
        return HadoopUtils.getOptionallyThrottledFileSystem(WriterUtils.getWriterFS(state, 1, 0), state);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void setWorkUnitWeight(WorkUnit workUnit, CopyEntity copyEntity, long j) {
        long j2 = 0;
        if (copyEntity instanceof CopyableFile) {
            j2 = ((CopyableFile) copyEntity).getOrigin().getLen();
        }
        workUnit.setProp(WORK_UNIT_WEIGHT, Long.toString(Math.max(j2, j)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void computeAndSetWorkUnitGuid(WorkUnit workUnit) throws IOException {
        String[] strArr = new String[1];
        strArr[0] = workUnit.contains("converter.classes") ? workUnit.getProp("converter.classes") : HiveSource.DEFAULT_HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER;
        setWorkUnitGuid(workUnit, Guid.fromStrings(strArr).append(new HasGuid[]{deserializeCopyEntity(workUnit)}));
    }

    public static void setWorkUnitGuid(State state, Guid guid) {
        state.setProp(WORK_UNIT_GUID, guid.toString());
    }

    public static Optional<Guid> getWorkUnitGuid(State state) throws IOException {
        return state.contains(WORK_UNIT_GUID) ? Optional.of(Guid.deserialize(state.getProp(WORK_UNIT_GUID))) : Optional.absent();
    }

    public static void serializeCopyEntity(State state, CopyEntity copyEntity) {
        state.setProp(SERIALIZED_COPYABLE_FILE, CopyEntity.serialize(copyEntity));
        state.setProp(COPY_ENTITY_CLASS, copyEntity.getClass().getName());
    }

    public static Class<?> getCopyEntityClass(State state) throws IOException {
        try {
            return Class.forName(state.getProp(COPY_ENTITY_CLASS));
        } catch (ClassNotFoundException e) {
            throw new IOException(e);
        }
    }

    public static CopyEntity deserializeCopyEntity(State state) {
        return CopyEntity.deserialize(state.getProp(SERIALIZED_COPYABLE_FILE));
    }

    public static void serializeCopyableDataset(State state, CopyableDatasetMetadata copyableDatasetMetadata) {
        state.setProp(SERIALIZED_COPYABLE_DATASET, copyableDatasetMetadata.serialize());
    }

    public static CopyableDatasetMetadata deserializeCopyableDataset(State state) {
        return CopyableDatasetMetadata.deserialize(state.getProp(SERIALIZED_COPYABLE_DATASET));
    }
}
