package org.apache.gobblin.publisher;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.WriterUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisher.class */
public class TimePartitionedStreamingDataPublisher extends TimePartitionedDataPublisher {
    private static final Logger log = LoggerFactory.getLogger(TimePartitionedStreamingDataPublisher.class);
    private final MetricContext metricContext;

    public TimePartitionedStreamingDataPublisher(State state) throws IOException {
        super(state);
        this.metricContext = Instrumented.getMetricContext(state, TimePartitionedStreamingDataPublisher.class);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.gobblin.publisher.BaseDataPublisher
    public void publishMultiTaskData(WorkUnitState workUnitState, int i, Set<Path> set) throws IOException {
        workUnitState.setProp("data.publisher.dataset.dir", getPublisherOutputDir(workUnitState, i).toString());
        super.publishMultiTaskData(workUnitState, i, set);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.gobblin.publisher.BaseDataPublisher
    public void publishData(WorkUnitState workUnitState, int i, boolean z, Set<Path> set) throws IOException {
        if (!this.writerFileSystemByBranches.get(i).exists(WriterUtils.getWriterOutputDir(workUnitState, this.numBranches, i))) {
            log.warn(String.format("Branch %d of WorkUnit %s produced no data", Integer.valueOf(i), workUnitState.getId()));
            return;
        }
        Path publisherOutputDir = getPublisherOutputDir(workUnitState, i);
        if (!this.publisherFileSystemByBranches.get(i).exists(publisherOutputDir)) {
            WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(i), publisherOutputDir, this.permissions.get(i), this.retrierConfig);
        }
        super.publishData(workUnitState, i, z, set);
    }

    @Override // org.apache.gobblin.publisher.BaseDataPublisher
    public void publishData(Collection<? extends WorkUnitState> collection) throws IOException {
        publishDataImpl(collection);
        wusCleanUp(collection);
    }

    public void publishDataImpl(Collection<? extends WorkUnitState> collection) throws IOException {
        Set<Path> newHashSet = Sets.newHashSet();
        for (WorkUnitState workUnitState : collection) {
            for (int i = 0; i < this.numBranches; i++) {
                publishMultiTaskData(workUnitState, i, newHashSet);
            }
        }
        Iterator<ParallelRunner> it = this.parallelRunners.values().iterator();
        while (it.hasNext()) {
            it.next().waitForTasks();
        }
        Iterator<? extends WorkUnitState> it2 = collection.iterator();
        while (it2.hasNext()) {
            it2.next().setWorkingState(WorkUnitState.WorkingState.COMMITTED);
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (WorkUnitState workUnitState2 : collection) {
            if (LineageInfo.hasLineageInfo(workUnitState2)) {
                newArrayList.add(workUnitState2);
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        submitLineageEvents(newArrayList);
        log.info("Emitting lineage events took {} millis", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private void submitLineageEvents(Collection<? extends WorkUnitState> collection) {
        for (Map.Entry entry : LineageInfo.aggregateByLineageEvent(collection).entrySet()) {
            LineageInfo.submitLineageEvent((String) entry.getKey(), (Collection) entry.getValue(), this.metricContext);
        }
    }

    protected void wusCleanUp(Collection<? extends WorkUnitState> collection) {
        int propAsInt = collection.stream().findFirst().get().getPropAsInt("fork.branches", 1);
        for (WorkUnitState workUnitState : collection) {
            for (int i = 0; i < propAsInt; i++) {
                String propertyNameForBranch = ForkOperatorUtils.getPropertyNameForBranch("writer.final.output.file.paths", propAsInt, i);
                if (workUnitState.contains(propertyNameForBranch)) {
                    workUnitState.removeProp(propertyNameForBranch);
                }
                LineageInfo.removeDestinationProp(workUnitState, i);
            }
        }
    }

    @VisibleForTesting
    Set<Path> getPublishOutputDirs() {
        return this.publisherOutputDirs;
    }
}
