package org.apache.nemo.runtime.executor.data.block;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.nemo.common.KeyRange;
import org.apache.nemo.common.exception.BlockFetchException;
import org.apache.nemo.common.exception.BlockWriteException;
import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.data.partition.NonSerializedPartition;
import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;

@NotThreadSafe
/* loaded from: input_file:org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.class */
public final class SerializedMemoryBlock<K extends Serializable> implements Block<K> {
    private final String id;
    private final Serializer serializer;
    private final List<SerializedPartition<K>> serializedPartitions = new ArrayList();
    private final Map<K, SerializedPartition<K>> nonCommittedPartitionsMap = new HashMap();
    private volatile boolean committed = false;

    public SerializedMemoryBlock(String str, Serializer serializer) {
        this.id = str;
        this.serializer = serializer;
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public void write(K k, Object obj) throws BlockWriteException {
        if (this.committed) {
            throw new BlockWriteException(new Throwable("The partition is already committed!"));
        }
        try {
            SerializedPartition<K> serializedPartition = this.nonCommittedPartitionsMap.get(k);
            if (serializedPartition == null) {
                serializedPartition = new SerializedPartition<>(k, this.serializer);
                this.nonCommittedPartitionsMap.put(k, serializedPartition);
            }
            serializedPartition.write(obj);
        } catch (IOException e) {
            throw new BlockWriteException(e);
        }
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public void writePartitions(Iterable<NonSerializedPartition<K>> iterable) throws BlockWriteException {
        if (this.committed) {
            throw new BlockWriteException(new Throwable("Cannot append partitions to the committed block"));
        }
        try {
            writeSerializedPartitions(DataUtil.convertToSerPartitions(this.serializer, iterable));
        } catch (IOException e) {
            throw new BlockWriteException(e);
        }
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public void writeSerializedPartitions(Iterable<SerializedPartition<K>> iterable) throws BlockWriteException {
        if (this.committed) {
            throw new BlockWriteException(new Throwable("Cannot append partitions to the committed block"));
        }
        List<SerializedPartition<K>> list = this.serializedPartitions;
        list.getClass();
        iterable.forEach((v1) -> {
            r1.add(v1);
        });
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public Iterable<NonSerializedPartition<K>> readPartitions(KeyRange keyRange) throws BlockFetchException {
        try {
            return DataUtil.convertToNonSerPartitions(this.serializer, readSerializedPartitions(keyRange));
        } catch (IOException e) {
            throw new BlockFetchException(e);
        }
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public Iterable<SerializedPartition<K>> readSerializedPartitions(KeyRange keyRange) throws BlockFetchException {
        if (!this.committed) {
            throw new BlockFetchException(new Throwable("Cannot retrieve elements before a block is committed"));
        }
        ArrayList arrayList = new ArrayList();
        this.serializedPartitions.forEach(serializedPartition -> {
            if (keyRange.includes((Serializable) serializedPartition.getKey())) {
                arrayList.add(serializedPartition);
            }
        });
        return arrayList;
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public synchronized Optional<Map<K, Long>> commit() throws BlockWriteException {
        try {
            if (!this.committed) {
                commitPartitions();
                this.committed = true;
            }
            HashMap hashMap = new HashMap(this.serializedPartitions.size());
            for (SerializedPartition<K> serializedPartition : this.serializedPartitions) {
                K key = serializedPartition.getKey();
                long length = serializedPartition.getLength();
                if (hashMap.containsKey(key)) {
                    hashMap.compute(key, (serializable, l) -> {
                        return Long.valueOf(l.longValue() + length);
                    });
                } else {
                    hashMap.put(key, Long.valueOf(length));
                }
            }
            return Optional.of(hashMap);
        } catch (IOException e) {
            throw new BlockWriteException(e);
        }
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public synchronized void commitPartitions() throws BlockWriteException {
        try {
            for (SerializedPartition<K> serializedPartition : this.nonCommittedPartitionsMap.values()) {
                serializedPartition.commit();
                this.serializedPartitions.add(serializedPartition);
            }
            this.nonCommittedPartitionsMap.clear();
        } catch (IOException e) {
            throw new BlockWriteException(e);
        }
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public String getId() {
        return this.id;
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public synchronized boolean isCommitted() {
        return this.committed;
    }
}
