package gobblin.publisher;

import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import gobblin.configuration.State;
import gobblin.configuration.WorkUnitState;
import gobblin.hive.HiveRegProps;
import gobblin.hive.HiveRegister;
import gobblin.hive.policy.HiveRegistrationPolicy;
import gobblin.hive.policy.HiveRegistrationPolicyBase;
import gobblin.hive.spec.HiveSpec;
import gobblin.util.ExecutorsUtils;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/publisher/HiveRegistrationPublisher.class */
public class HiveRegistrationPublisher extends DataPublisher {
    private static final Logger log = LoggerFactory.getLogger(HiveRegistrationPublisher.class);
    private final Closer closer;
    private final HiveRegister hiveRegister;
    private final HiveRegistrationPolicy policy;
    private final ExecutorService hivePolicyExecutor;

    public HiveRegistrationPublisher(State state) throws IOException {
        super(state);
        this.closer = Closer.create();
        this.hiveRegister = this.closer.register(HiveRegister.get(state));
        this.policy = HiveRegistrationPolicyBase.getPolicy(state);
        this.hivePolicyExecutor = Executors.newFixedThreadPool(new HiveRegProps(state).getNumThreads(), ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("HivePolicyExecutor-%d")));
    }

    public void close() throws IOException {
        try {
            ExecutorsUtils.shutdownExecutorService(this.hivePolicyExecutor, Optional.of(log));
        } finally {
            this.closer.close();
        }
    }

    @Deprecated
    public void initialize() throws IOException {
    }

    public void publishData(Collection<? extends WorkUnitState> collection) throws IOException {
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.hivePolicyExecutor);
        Set<String> uniquePathsToRegister = getUniquePathsToRegister(collection);
        log.info("Number of paths to be registered in Hive: " + uniquePathsToRegister.size());
        for (final String str : uniquePathsToRegister) {
            executorCompletionService.submit(new Callable<Collection<HiveSpec>>() { // from class: gobblin.publisher.HiveRegistrationPublisher.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Collection<HiveSpec> call() throws Exception {
                    return HiveRegistrationPublisher.this.policy.getHiveSpecs(new Path(str));
                }
            });
        }
        for (int i = 0; i < uniquePathsToRegister.size(); i++) {
            try {
                Iterator it = ((Collection) executorCompletionService.take().get()).iterator();
                while (it.hasNext()) {
                    this.hiveRegister.register((HiveSpec) it.next());
                }
            } catch (InterruptedException | ExecutionException e) {
                log.info("Failed to generate HiveSpec", e);
                throw new IOException(e);
            }
        }
        log.info("Finished generating all HiveSpecs");
    }

    private static Set<String> getUniquePathsToRegister(Collection<? extends WorkUnitState> collection) {
        HashSet newHashSet = Sets.newHashSet();
        for (State state : collection) {
            if (state.contains("data.publisher.output.dirs")) {
                newHashSet.addAll(state.getPropAsList("data.publisher.output.dirs"));
            }
        }
        return newHashSet;
    }

    public void publishMetadata(Collection<? extends WorkUnitState> collection) throws IOException {
    }
}
