package org.apache.nemo.runtime.executor.datatransfer;

import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DataPersistenceProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
import org.apache.nemo.runtime.executor.data.block.Block;
import org.apache.nemo.runtime.executor.data.partitioner.DedicatedKeyPerElement;
import org.apache.nemo.runtime.executor.data.partitioner.Partitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.class */
public final class BlockOutputWriter implements OutputWriter {
    private static final Logger LOG = LoggerFactory.getLogger(BlockOutputWriter.class.getName());
    private final RuntimeEdge<?> runtimeEdge;
    private final IRVertex dstIrVertex;
    private final Partitioner partitioner;
    private final DataStoreProperty.Value blockStoreValue;
    private final BlockManagerWorker blockManagerWorker;
    private final Block blockToWrite;
    private final boolean nonDummyBlock;
    private long writtenBytes;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BlockOutputWriter(int i, String str, IRVertex iRVertex, RuntimeEdge<?> runtimeEdge, BlockManagerWorker blockManagerWorker) {
        this.runtimeEdge = runtimeEdge;
        this.dstIrVertex = iRVertex;
        this.partitioner = OutputWriter.getPartitioner(runtimeEdge, i);
        this.blockManagerWorker = blockManagerWorker;
        this.blockStoreValue = (DataStoreProperty.Value) runtimeEdge.getPropertyValue(DataStoreProperty.class).orElseThrow(() -> {
            return new RuntimeException("No data store property on the edge");
        });
        this.blockToWrite = blockManagerWorker.createBlock(RuntimeIdManager.generateBlockId(runtimeEdge.getId(), str), this.blockStoreValue);
        Optional propertyValue = runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
        this.nonDummyBlock = !propertyValue.isPresent() || ((DuplicateEdgeGroupPropertyValue) propertyValue.get()).getRepresentativeEdgeId().equals(runtimeEdge.getId()) || ((DuplicateEdgeGroupPropertyValue) propertyValue.get()).getGroupSize() <= 1;
    }

    @Override // org.apache.nemo.runtime.executor.datatransfer.OutputWriter
    public void write(Object obj) {
        if (this.nonDummyBlock) {
            this.blockToWrite.write(this.partitioner.partition(obj), obj);
            if (((DedicatedKeyPerElement) this.partitioner.getClass().getAnnotation(DedicatedKeyPerElement.class)) != null) {
                this.blockToWrite.commitPartitions();
            }
        }
    }

    @Override // org.apache.nemo.runtime.executor.datatransfer.OutputWriter
    public void writeWatermark(Watermark watermark) {
    }

    @Override // org.apache.nemo.runtime.executor.datatransfer.OutputWriter
    public void close() {
        DataPersistenceProperty.Value value = (DataPersistenceProperty.Value) this.runtimeEdge.getPropertyValue(DataPersistenceProperty.class).get();
        Optional commit = this.blockToWrite.commit();
        if (commit.isPresent()) {
            long j = 0;
            Iterator it = ((Map) commit.get()).values().iterator();
            while (it.hasNext()) {
                j += ((Long) it.next()).longValue();
            }
            this.writtenBytes = j;
        } else {
            this.writtenBytes = -1L;
        }
        this.blockManagerWorker.writeBlock(this.blockToWrite, this.blockStoreValue, getExpectedRead(), value);
    }

    @Override // org.apache.nemo.runtime.executor.datatransfer.OutputWriter
    public Optional<Long> getWrittenBytes() {
        return this.writtenBytes == -1 ? Optional.empty() : Optional.of(Long.valueOf(this.writtenBytes));
    }

    private int getExpectedRead() {
        Optional propertyValue = this.runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
        return (CommunicationPatternProperty.Value.OneToOne.equals(this.runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).orElseThrow(() -> {
            return new RuntimeException("No communication pattern on this edge.");
        })) ? 1 : ((Integer) this.dstIrVertex.getPropertyValue(ParallelismProperty.class).orElseThrow(() -> {
            return new RuntimeException("No parallelism property on the destination vertex.");
        })).intValue()) * (propertyValue.isPresent() ? ((DuplicateEdgeGroupPropertyValue) propertyValue.get()).getGroupSize() : 1);
    }
}
