package gobblin.publisher;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import gobblin.configuration.State;
import gobblin.configuration.WorkUnitState;
import gobblin.util.ForkOperatorUtils;
import gobblin.util.HadoopUtils;
import gobblin.util.ParallelRunner;
import gobblin.util.WriterUtils;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/publisher/BaseDataPublisher.class */
public class BaseDataPublisher extends SingleTaskDataPublisher {
    private static final Logger LOG = LoggerFactory.getLogger(BaseDataPublisher.class);
    protected final int numBranches;
    protected final List<FileSystem> writerFileSystemByBranches;
    protected final List<FileSystem> publisherFileSystemByBranches;
    protected final List<FileSystem> metaDataWriterFileSystemByBranches;
    protected final List<Optional<String>> publisherFinalDirOwnerGroupsByBranches;
    protected final List<FsPermission> permissions;
    protected final Closer closer;
    protected final Closer parallelRunnerCloser;
    protected final int parallelRunnerThreads;
    protected final Map<String, ParallelRunner> parallelRunners;
    protected final Set<Path> publisherOutputDirs;

    public BaseDataPublisher(State state) throws IOException {
        super(state);
        this.parallelRunners = Maps.newHashMap();
        this.publisherOutputDirs = Sets.newHashSet();
        this.closer = Closer.create();
        Configuration configuration = new Configuration();
        for (String str : getState().getPropertyNames()) {
            configuration.set(str, getState().getProp(str));
        }
        this.numBranches = getState().getPropAsInt("fork.branches", 1);
        this.writerFileSystemByBranches = Lists.newArrayListWithCapacity(this.numBranches);
        this.publisherFileSystemByBranches = Lists.newArrayListWithCapacity(this.numBranches);
        this.metaDataWriterFileSystemByBranches = Lists.newArrayListWithCapacity(this.numBranches);
        this.publisherFinalDirOwnerGroupsByBranches = Lists.newArrayListWithCapacity(this.numBranches);
        this.permissions = Lists.newArrayListWithCapacity(this.numBranches);
        for (int i = 0; i < this.numBranches; i++) {
            URI create = URI.create(getState().getProp(ForkOperatorUtils.getPropertyNameForBranch("writer.fs.uri", this.numBranches, i), "file:///"));
            this.writerFileSystemByBranches.add(FileSystem.get(create, configuration));
            URI create2 = URI.create(getState().getProp(ForkOperatorUtils.getPropertyNameForBranch("data.publisher.fs.uri", this.numBranches, i), create.toString()));
            this.publisherFileSystemByBranches.add(FileSystem.get(create2, configuration));
            this.metaDataWriterFileSystemByBranches.add(FileSystem.get(create2, configuration));
            this.publisherFinalDirOwnerGroupsByBranches.add(Optional.fromNullable(getState().getProp(ForkOperatorUtils.getPropertyNameForBranch("data.publisher.final.dir.group", this.numBranches, i))));
            this.permissions.add(new FsPermission(state.getPropAsShortWithRadix(ForkOperatorUtils.getPropertyNameForBranch("data.publisher.permissions", this.numBranches, i), FsPermission.getDefault().toShort(), 8)));
        }
        this.parallelRunnerThreads = state.getPropAsInt("parallel.runner.threads", 10);
        this.parallelRunnerCloser = Closer.create();
    }

    public void initialize() throws IOException {
    }

    public void close() throws IOException {
        try {
            Iterator<Path> it = this.publisherOutputDirs.iterator();
            while (it.hasNext()) {
                this.state.appendToSetProp("data.publisher.output.dirs", it.next().toString());
            }
        } finally {
            this.closer.close();
        }
    }

    public void publishData(WorkUnitState workUnitState) throws IOException {
        for (int i = 0; i < this.numBranches; i++) {
            publishSingleTaskData(workUnitState, i);
        }
        this.parallelRunnerCloser.close();
    }

    private void publishSingleTaskData(WorkUnitState workUnitState, int i) throws IOException {
        publishData(workUnitState, i, true, new HashSet());
    }

    public void publishData(Collection<? extends WorkUnitState> collection) throws IOException {
        HashSet newHashSet = Sets.newHashSet();
        for (WorkUnitState workUnitState : collection) {
            for (int i = 0; i < this.numBranches; i++) {
                publishMultiTaskData(workUnitState, i, newHashSet);
            }
        }
        this.parallelRunnerCloser.close();
        Iterator<? extends WorkUnitState> it = collection.iterator();
        while (it.hasNext()) {
            it.next().setWorkingState(WorkUnitState.WorkingState.COMMITTED);
        }
    }

    private void publishMultiTaskData(WorkUnitState workUnitState, int i, Set<Path> set) throws IOException {
        publishData(workUnitState, i, false, set);
    }

    protected void publishData(WorkUnitState workUnitState, int i, boolean z, Set<Path> set) throws IOException {
        ParallelRunner parallelRunner = getParallelRunner(this.writerFileSystemByBranches.get(i));
        Path writerOutputDir = WriterUtils.getWriterOutputDir(workUnitState, this.numBranches, i);
        if (!this.writerFileSystemByBranches.get(i).exists(writerOutputDir)) {
            LOG.warn(String.format("Branch %d of WorkUnit %s produced no data", Integer.valueOf(i), workUnitState.getId()));
            return;
        }
        Path publisherOutputDir = getPublisherOutputDir(workUnitState, i);
        if (z) {
            WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(i), publisherOutputDir, this.permissions.get(i));
            addSingleTaskWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, workUnitState, i, parallelRunner);
            return;
        }
        if (set.contains(writerOutputDir)) {
            return;
        }
        if (!this.publisherFileSystemByBranches.get(i).exists(publisherOutputDir)) {
            WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(i), publisherOutputDir.getParent(), this.permissions.get(i));
        } else if (!getState().getPropAsBoolean(ForkOperatorUtils.getPropertyNameForBranch("data.publisher.replace.final.dir", this.numBranches, i))) {
            addWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, workUnitState, i, parallelRunner);
            set.add(writerOutputDir);
            return;
        } else {
            LOG.info("Deleting publisher output dir " + publisherOutputDir);
            this.publisherFileSystemByBranches.get(i).delete(publisherOutputDir, true);
        }
        movePath(parallelRunner, workUnitState, writerOutputDir, publisherOutputDir, i);
        set.add(writerOutputDir);
    }

    protected Path getPublisherOutputDir(WorkUnitState workUnitState, int i) {
        return WriterUtils.getDataPublisherFinalDir(workUnitState, this.numBranches, i);
    }

    protected void addSingleTaskWriterOutputToExistingDir(Path path, Path path2, WorkUnitState workUnitState, int i, ParallelRunner parallelRunner) throws IOException {
        String propertyNameForBranch = ForkOperatorUtils.getPropertyNameForBranch("writer.final.output.file.paths", this.numBranches, i);
        if (!workUnitState.contains(propertyNameForBranch)) {
            LOG.warn("Missing property " + propertyNameForBranch + ". This task may have pulled no data.");
            return;
        }
        for (String str : workUnitState.getPropAsSet(propertyNameForBranch)) {
            Path path3 = new Path(str);
            if (this.writerFileSystemByBranches.get(i).exists(path3)) {
                Path path4 = new Path(path2, str.substring(str.indexOf(path.toString()) + path.toString().length() + 1));
                WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(i), path4.getParent(), this.permissions.get(i));
                movePath(parallelRunner, workUnitState, path3, path4, i);
            } else {
                LOG.warn("Task output file " + str + " doesn't exist.");
            }
        }
    }

    protected void addWriterOutputToExistingDir(Path path, Path path2, WorkUnitState workUnitState, int i, ParallelRunner parallelRunner) throws IOException {
        boolean propAsBoolean = workUnitState.getPropAsBoolean(ForkOperatorUtils.getPropertyNameForBranch("source.filebased.preserve.file.name", this.numBranches, i), false);
        for (FileStatus fileStatus : this.writerFileSystemByBranches.get(i).listStatus(path)) {
            movePath(parallelRunner, workUnitState, fileStatus.getPath(), propAsBoolean ? new Path(path2, workUnitState.getProp(ForkOperatorUtils.getPropertyNameForBranch("data.publisher.final.name", this.numBranches, i))) : new Path(path2, fileStatus.getPath().getName()), i);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void movePath(ParallelRunner parallelRunner, State state, Path path, Path path2, int i) throws IOException {
        LOG.info(String.format("Moving %s to %s", path, path2));
        boolean propAsBoolean = state.getPropAsBoolean("data.publisher.overwrite.enabled", false);
        this.publisherOutputDirs.addAll(recordPublisherOutputDirs(path, path2, i));
        parallelRunner.movePath(path, this.publisherFileSystemByBranches.get(i), path2, propAsBoolean, this.publisherFinalDirOwnerGroupsByBranches.get(i));
    }

    protected Collection<Path> recordPublisherOutputDirs(Path path, Path path2, int i) throws IOException {
        return this.writerFileSystemByBranches.get(i).getFileStatus(path).isDirectory() ? ImmutableList.of(path2) : ImmutableList.of(path2.getParent());
    }

    private ParallelRunner getParallelRunner(FileSystem fileSystem) {
        String uri = fileSystem.getUri().toString();
        if (!this.parallelRunners.containsKey(uri)) {
            this.parallelRunners.put(uri, this.parallelRunnerCloser.register(new ParallelRunner(this.parallelRunnerThreads, fileSystem)));
        }
        return this.parallelRunners.get(uri);
    }

    public void publishMetadata(Collection<? extends WorkUnitState> collection) throws IOException {
        Iterator<? extends WorkUnitState> it = collection.iterator();
        while (it.hasNext()) {
            publishMetadata(it.next());
        }
    }

    public void publishMetadata(WorkUnitState workUnitState) throws IOException {
        FSDataOutputStream register;
        Throwable th;
        String prop = workUnitState.getProp("data.publisher.metadata.string");
        if (prop == null) {
            return;
        }
        if (workUnitState.getProp("data.publisher.metadata.output.dir") == null) {
            LOG.error("Missing metadata output directory path : data.publisher.metadata.output.dir in the config");
            return;
        }
        for (int i = 0; i < this.numBranches; i++) {
            FileSystem fileSystem = this.metaDataWriterFileSystemByBranches.get(i);
            String propertyNameForBranch = ForkOperatorUtils.getPropertyNameForBranch(workUnitState.getProp("data.publisher.metadata.output_file"), this.numBranches, i);
            String prop2 = workUnitState.getProp("data.publisher.metadata.output.dir");
            Path path = new Path(prop2);
            try {
                if (!fileSystem.exists(path)) {
                    WriterUtils.mkdirsWithRecursivePermission(fileSystem, path, this.permissions.get(i));
                }
                Path path2 = new Path(prop2, propertyNameForBranch);
                if (fileSystem.exists(path2)) {
                    HadoopUtils.deletePath(fileSystem, path2, false);
                }
                register = this.closer.register(fileSystem.create(path2));
                th = null;
            } catch (IOException e) {
                LOG.error("metadata file is not generated: " + e, e);
            }
            try {
                try {
                    register.write(prop.getBytes("UTF-8"));
                    if (register != null) {
                        if (0 != 0) {
                            try {
                                register.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            register.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
                break;
            }
        }
    }
}
