package org.apache.nemo.runtime.executor.datatransfer;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
import org.apache.nemo.runtime.executor.data.partitioner.Partitioner;
import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.class */
public final class PipeOutputWriter implements OutputWriter {
    private static final Logger LOG = LoggerFactory.getLogger(OutputWriter.class.getName());
    private final String srcTaskId;
    private final int srcTaskIndex;
    private final PipeManagerWorker pipeManagerWorker;
    private final Partitioner partitioner;
    private final RuntimeEdge runtimeEdge;
    private boolean initialized = false;
    private Serializer serializer;
    private List<ByteOutputContext> pipes;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PipeOutputWriter(int i, String str, RuntimeEdge runtimeEdge, PipeManagerWorker pipeManagerWorker) {
        this.srcTaskId = str;
        this.pipeManagerWorker = pipeManagerWorker;
        this.pipeManagerWorker.notifyMaster(runtimeEdge.getId(), RuntimeIdManager.getIndexFromTaskId(str));
        this.partitioner = OutputWriter.getPartitioner(runtimeEdge, i);
        this.runtimeEdge = runtimeEdge;
        this.srcTaskIndex = RuntimeIdManager.getIndexFromTaskId(str);
    }

    private void writeData(Object obj, List<ByteOutputContext> list) {
        list.forEach(byteOutputContext -> {
            try {
                ByteOutputContext.ByteOutputStream newOutputStream = byteOutputContext.newOutputStream();
                Throwable th = null;
                try {
                    try {
                        newOutputStream.writeElement(obj, this.serializer);
                        if (newOutputStream != null) {
                            if (0 != 0) {
                                try {
                                    newOutputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                newOutputStream.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Override // org.apache.nemo.runtime.executor.datatransfer.OutputWriter
    public void write(Object obj) {
        if (!this.initialized) {
            doInitialize();
        }
        writeData(obj, getPipeToWrite(obj));
    }

    @Override // org.apache.nemo.runtime.executor.datatransfer.OutputWriter
    public void writeWatermark(Watermark watermark) {
        if (!this.initialized) {
            doInitialize();
        }
        writeData(new WatermarkWithIndex(watermark, this.srcTaskIndex), this.pipes);
    }

    @Override // org.apache.nemo.runtime.executor.datatransfer.OutputWriter
    public Optional<Long> getWrittenBytes() {
        return Optional.empty();
    }

    @Override // org.apache.nemo.runtime.executor.datatransfer.OutputWriter
    public void close() {
        if (!this.initialized) {
            doInitialize();
        }
        this.pipes.forEach(byteOutputContext -> {
            try {
                byteOutputContext.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private void doInitialize() {
        this.initialized = true;
        this.pipes = this.pipeManagerWorker.getOutputContexts(this.runtimeEdge, RuntimeIdManager.getIndexFromTaskId(this.srcTaskId));
        this.serializer = this.pipeManagerWorker.getSerializer(this.runtimeEdge.getId());
    }

    private List<ByteOutputContext> getPipeToWrite(Object obj) {
        CommunicationPatternProperty.Value value = (CommunicationPatternProperty.Value) this.runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).get();
        return value.equals(CommunicationPatternProperty.Value.OneToOne) ? Collections.singletonList(this.pipes.get(0)) : value.equals(CommunicationPatternProperty.Value.BroadCast) ? this.pipes : Collections.singletonList(this.pipes.get(((Integer) this.partitioner.partition(obj)).intValue()));
    }
}
