package org.apache.gobblin.data.management.copy.publisher;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopySource;
import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper;
import org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriter;
import org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.publisher.UnpublishedHandling;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.WriterUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.class */
public class CopyDataPublisher extends DataPublisher implements UnpublishedHandling {
    private static final Logger log = LoggerFactory.getLogger(CopyDataPublisher.class);
    private final Path writerOutputDir;
    private final FileSystem fs;
    protected final EventSubmitter eventSubmitter;
    protected final RecoveryHelper recoveryHelper;
    protected final Optional<LineageInfo> lineageInfo;

    public boolean isThreadSafe() {
        return getClass() == CopyDataPublisher.class;
    }

    public CopyDataPublisher(State state) throws IOException {
        super(state);
        if (state instanceof SourceState) {
            this.lineageInfo = LineageInfo.getLineageInfo(((SourceState) state).getBroker());
        } else if (state instanceof WorkUnitState) {
            this.lineageInfo = LineageInfo.getLineageInfo(((WorkUnitState) state).getTaskBrokerNullable());
        } else {
            this.lineageInfo = Optional.absent();
        }
        this.fs = FileSystem.get(URI.create(this.state.getProp("writer.fs.uri", "file:///")), WriterUtils.getFsConfiguration(state));
        FileAwareInputStreamDataWriterBuilder.setJobSpecificOutputPaths(state);
        this.writerOutputDir = new Path(state.getProp("writer.output.dir"));
        this.eventSubmitter = new EventSubmitter.Builder(Instrumented.getMetricContext(state, CopyDataPublisher.class, GobblinMetrics.getCustomTagsFromState(state)), "org.apache.gobblin.copy.CopyDataPublisher").build();
        this.recoveryHelper = new RecoveryHelper(this.fs, state);
        this.recoveryHelper.purgeOldPersistedFile();
    }

    public void publishData(Collection<? extends WorkUnitState> collection) throws IOException {
        Multimap<CopyEntity.DatasetAndPartition, WorkUnitState> groupByFileSet = groupByFileSet(collection);
        boolean z = true;
        for (CopyEntity.DatasetAndPartition datasetAndPartition : groupByFileSet.keySet()) {
            try {
                publishFileSet(datasetAndPartition, groupByFileSet.get(datasetAndPartition));
            } catch (Throwable th) {
                CopyEventSubmitterHelper.submitFailedDatasetPublish(this.eventSubmitter, datasetAndPartition);
                log.error("Failed to publish " + datasetAndPartition.getDataset().getDatasetURN(), th);
                z = false;
            }
        }
        if (!z) {
            throw new IOException("Not all datasets published successfully");
        }
    }

    public void handleUnpublishedWorkUnits(Collection<? extends WorkUnitState> collection) throws IOException {
        log.info(String.format("Successfully persisted %d work units.", Integer.valueOf(persistFailedFileSet(collection))));
    }

    private static Multimap<CopyEntity.DatasetAndPartition, WorkUnitState> groupByFileSet(Collection<? extends WorkUnitState> collection) {
        ArrayListMultimap create = ArrayListMultimap.create();
        for (WorkUnitState workUnitState : collection) {
            create.put(CopySource.deserializeCopyEntity(workUnitState).getDatasetAndPartition(CopyableDatasetMetadata.deserialize(workUnitState.getProp(CopySource.SERIALIZED_COPYABLE_DATASET))), workUnitState);
        }
        return create;
    }

    private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition, Collection<WorkUnitState> collection) throws IOException {
        HashMap newHashMap = Maps.newHashMap();
        Preconditions.checkArgument(!collection.isEmpty(), "publishFileSet received an empty collection work units. This is an error in code.");
        CopyableDatasetMetadata deserialize = CopyableDatasetMetadata.deserialize(collection.iterator().next().getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
        Path path = new Path(this.writerOutputDir, datasetAndPartition.identifier());
        log.info(String.format("[%s] Publishing fileSet from %s for dataset %s", datasetAndPartition.identifier(), path, deserialize.getDatasetURN()));
        List<CommitStep> commitSequence = getCommitSequence(collection, PrePublishStep.class);
        List<CommitStep> commitSequence2 = getCommitSequence(collection, PostPublishStep.class);
        log.info(String.format("[%s] Found %d prePublish steps and %d postPublish steps.", datasetAndPartition.identifier(), Integer.valueOf(commitSequence.size()), Integer.valueOf(commitSequence2.size())));
        executeCommitSequence(commitSequence);
        if (hasCopyableFiles(collection)) {
            HadoopUtils.renameRecursively(this.fs, path, new Path("/"));
        } else {
            log.info(String.format("[%s] No copyable files in dataset. Proceeding to postpublish steps.", datasetAndPartition.identifier()));
        }
        executeCommitSequence(commitSequence2);
        this.fs.delete(path, true);
        long j = Long.MAX_VALUE;
        long j2 = Long.MAX_VALUE;
        Optional absent = Optional.absent();
        for (WorkUnitState workUnitState : collection) {
            if (workUnitState.getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL) {
                workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
            }
            CopyEntity deserializeCopyEntity = CopySource.deserializeCopyEntity(workUnitState);
            if (deserializeCopyEntity instanceof CopyableFile) {
                CopyableFile copyableFile = (CopyableFile) deserializeCopyEntity;
                if (workUnitState.getWorkingState() == WorkUnitState.WorkingState.COMMITTED) {
                    CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter, copyableFile, workUnitState);
                    if (!absent.isPresent() && copyableFile.getDatasetOutputPath() != null) {
                        absent = Optional.of(copyableFile.getDatasetOutputPath());
                    }
                    if (this.lineageInfo.isPresent()) {
                        ((LineageInfo) this.lineageInfo.get()).putDestination(copyableFile.getDestinationDataset(), 0, workUnitState);
                    }
                }
                if (j > copyableFile.getOriginTimestamp()) {
                    j = copyableFile.getOriginTimestamp();
                }
                if (j2 > copyableFile.getUpstreamTimestamp()) {
                    j2 = copyableFile.getUpstreamTimestamp();
                }
            }
        }
        if (Long.MAX_VALUE == j) {
            j = 0;
        }
        if (Long.MAX_VALUE == j2) {
            j2 = 0;
        }
        newHashMap.put("sourceCluster", this.state.getProp("sourceCluster"));
        newHashMap.put("destinationCluster", this.state.getProp("destinationCluster"));
        newHashMap.put("datasetOutputPath", absent.or("Unknown"));
        CopyEventSubmitterHelper.submitSuccessfulDatasetPublish(this.eventSubmitter, datasetAndPartition, Long.toString(j), Long.toString(j2), newHashMap);
    }

    private static boolean hasCopyableFiles(Collection<WorkUnitState> collection) throws IOException {
        Iterator<WorkUnitState> it = collection.iterator();
        while (it.hasNext()) {
            if (CopyableFile.class.isAssignableFrom(CopySource.getCopyEntityClass(it.next()))) {
                return true;
            }
        }
        return false;
    }

    private static List<CommitStep> getCommitSequence(Collection<WorkUnitState> collection, Class<?> cls) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        for (WorkUnitState workUnitState : collection) {
            if (cls.isAssignableFrom(CopySource.getCopyEntityClass(workUnitState))) {
                newArrayList.add((CommitStepCopyEntity) CopySource.deserializeCopyEntity(workUnitState));
            }
        }
        Collections.sort(newArrayList, new Comparator<CommitStepCopyEntity>() { // from class: org.apache.gobblin.data.management.copy.publisher.CopyDataPublisher.1
            @Override // java.util.Comparator
            public int compare(CommitStepCopyEntity commitStepCopyEntity, CommitStepCopyEntity commitStepCopyEntity2) {
                return Integer.compare(commitStepCopyEntity.getPriority(), commitStepCopyEntity2.getPriority());
            }
        });
        ArrayList newArrayList2 = Lists.newArrayList();
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            newArrayList2.add(((CommitStepCopyEntity) it.next()).getStep());
        }
        return newArrayList2;
    }

    private static void executeCommitSequence(List<CommitStep> list) throws IOException {
        Iterator<CommitStep> it = list.iterator();
        while (it.hasNext()) {
            it.next().execute();
        }
    }

    private int persistFailedFileSet(Collection<? extends WorkUnitState> collection) throws IOException {
        int i = 0;
        Iterator<? extends WorkUnitState> it = collection.iterator();
        while (it.hasNext()) {
            State state = (WorkUnitState) it.next();
            if (state.getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL) {
                CopyEntity deserializeCopyEntity = CopySource.deserializeCopyEntity(state);
                if (deserializeCopyEntity instanceof CopyableFile) {
                    CopyableFile copyableFile = (CopyableFile) deserializeCopyEntity;
                    if (this.recoveryHelper.persistFile(state, copyableFile, FileAwareInputStreamDataWriter.getOutputFilePath(copyableFile, FileAwareInputStreamDataWriter.getOutputDir(state), copyableFile.getDatasetAndPartition(CopySource.deserializeCopyableDataset(state))))) {
                        i++;
                    }
                }
            }
        }
        return i;
    }

    public void publishMetadata(Collection<? extends WorkUnitState> collection) throws IOException {
    }

    public void close() throws IOException {
    }

    public void initialize() throws IOException {
    }
}
