package gobblin.data.management.copy.publisher;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import gobblin.configuration.State;
import gobblin.configuration.WorkUnitState;
import gobblin.data.management.copy.CopySource;
import gobblin.data.management.copy.CopyableDataset;
import gobblin.data.management.copy.SerializableCopyableDataset;
import gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder;
import gobblin.data.management.util.PathUtils;
import gobblin.publisher.DataPublisher;
import gobblin.util.HadoopUtils;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/data/management/copy/publisher/CopyDataPublisher.class */
public class CopyDataPublisher extends DataPublisher {
    private static final Logger log = LoggerFactory.getLogger(CopyDataPublisher.class);
    private Path writerOutputDir;
    private FileSystem fs;

    public CopyDataPublisher(State state) throws IOException {
        super(state);
        this.fs = FileSystem.get(URI.create(this.state.getProp("writer.fs.uri", "file:///")), new Configuration());
        FileAwareInputStreamDataWriterBuilder.setJobSpecificOutputPaths(state);
        this.writerOutputDir = new Path(state.getProp("writer.output.dir"));
    }

    public void publishData(Collection<? extends WorkUnitState> collection) throws IOException {
        Multimap<CopyableDataset, WorkUnitState> datasetRoots = getDatasetRoots(collection);
        boolean z = true;
        for (CopyableDataset copyableDataset : datasetRoots.keySet()) {
            try {
                publishDataset(copyableDataset, datasetRoots.get(copyableDataset));
            } catch (Throwable th) {
                log.error("Failed to publish " + copyableDataset.datasetTargetRoot(), th);
                z = false;
            }
        }
        this.fs.delete(this.writerOutputDir, true);
        if (!z) {
            throw new IOException("Not all datasets published successfully");
        }
    }

    private Multimap<CopyableDataset, WorkUnitState> getDatasetRoots(Collection<? extends WorkUnitState> collection) throws IOException {
        ArrayListMultimap create = ArrayListMultimap.create();
        for (WorkUnitState workUnitState : collection) {
            create.put(SerializableCopyableDataset.deserialize(workUnitState.getProp(CopySource.SERIALIZED_COPYABLE_DATASET)), workUnitState);
        }
        return create;
    }

    private void publishDataset(CopyableDataset copyableDataset, Collection<WorkUnitState> collection) throws IOException {
        Path path = new Path(this.writerOutputDir, PathUtils.withoutLeadingSeparator(copyableDataset.datasetTargetRoot()));
        log.info(String.format("Publishing dataset from %s to %s", path, copyableDataset.datasetTargetRoot()));
        HadoopUtils.renameRecursively(this.fs, path, copyableDataset.datasetTargetRoot());
        this.fs.delete(path, true);
        for (WorkUnitState workUnitState : collection) {
            if (workUnitState.getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL) {
                workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
            }
        }
    }

    public void publishMetadata(Collection<? extends WorkUnitState> collection) throws IOException {
    }

    public void close() throws IOException {
    }

    public void initialize() throws IOException {
    }
}
