package org.apache.nemo.runtime.executor.data.block;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.nemo.common.KeyRange;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.exception.BlockFetchException;
import org.apache.nemo.common.exception.BlockWriteException;
import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.data.FileArea;
import org.apache.nemo.runtime.executor.data.metadata.FileMetadata;
import org.apache.nemo.runtime.executor.data.metadata.PartitionMetadata;
import org.apache.nemo.runtime.executor.data.partition.NonSerializedPartition;
import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:org/apache/nemo/runtime/executor/data/block/FileBlock.class */
public final class FileBlock<K extends Serializable> implements Block<K> {
    private static final Logger LOG = LoggerFactory.getLogger(FileBlock.class.getName());
    private final String id;
    private final Map<K, SerializedPartition<K>> nonCommittedPartitionsMap = new HashMap();
    private final Serializer serializer;
    private final String filePath;
    private final FileMetadata<K> metadata;

    public FileBlock(String str, Serializer serializer, String str2, FileMetadata<K> fileMetadata) {
        this.id = str;
        this.serializer = serializer;
        this.filePath = str2;
        this.metadata = fileMetadata;
    }

    private void writeToFile(Iterable<SerializedPartition<K>> iterable) throws IOException {
        FileOutputStream fileOutputStream = new FileOutputStream(this.filePath, true);
        Throwable th = null;
        try {
            try {
                for (SerializedPartition<K> serializedPartition : iterable) {
                    this.metadata.writePartitionMetadata(serializedPartition.getKey(), serializedPartition.getLength());
                    fileOutputStream.write(serializedPartition.getData(), 0, serializedPartition.getLength());
                }
                if (fileOutputStream != null) {
                    if (0 == 0) {
                        fileOutputStream.close();
                        return;
                    }
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fileOutputStream != null) {
                if (th != null) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            throw th4;
        }
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public void write(K k, Object obj) throws BlockWriteException {
        if (this.metadata.isCommitted()) {
            throw new BlockWriteException(new Throwable("The partition is already committed!"));
        }
        try {
            SerializedPartition<K> serializedPartition = this.nonCommittedPartitionsMap.get(k);
            if (serializedPartition == null) {
                serializedPartition = new SerializedPartition<>(k, this.serializer);
                this.nonCommittedPartitionsMap.put(k, serializedPartition);
            }
            serializedPartition.write(obj);
        } catch (IOException e) {
            throw new BlockWriteException(e);
        }
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public void writePartitions(Iterable<NonSerializedPartition<K>> iterable) throws BlockWriteException {
        if (this.metadata.isCommitted()) {
            throw new BlockWriteException(new Throwable("The partition is already committed!"));
        }
        try {
            writeSerializedPartitions(DataUtil.convertToSerPartitions(this.serializer, iterable));
        } catch (IOException e) {
            throw new BlockWriteException(e);
        }
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public void writeSerializedPartitions(Iterable<SerializedPartition<K>> iterable) throws BlockWriteException {
        if (this.metadata.isCommitted()) {
            throw new BlockWriteException(new Throwable("The partition is already committed!"));
        }
        try {
            writeToFile(iterable);
        } catch (IOException e) {
            throw new BlockWriteException(e);
        }
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public Iterable<NonSerializedPartition<K>> readPartitions(KeyRange keyRange) throws BlockFetchException {
        if (!this.metadata.isCommitted()) {
            throw new BlockFetchException(new Throwable("Cannot retrieve elements before a block is committed"));
        }
        ArrayList arrayList = new ArrayList();
        try {
            ArrayList<Pair> arrayList2 = new ArrayList();
            FileInputStream fileInputStream = new FileInputStream(this.filePath);
            Throwable th = null;
            try {
                try {
                    for (PartitionMetadata<K> partitionMetadata : this.metadata.getPartitionMetadataList()) {
                        K key = partitionMetadata.getKey();
                        if (keyRange.includes(key)) {
                            byte[] bArr = new byte[partitionMetadata.getPartitionSize()];
                            fileInputStream.read(bArr, 0, partitionMetadata.getPartitionSize());
                            arrayList2.add(Pair.of(key, bArr));
                        } else {
                            skipBytes(fileInputStream, partitionMetadata.getPartitionSize());
                        }
                    }
                    if (fileInputStream != null) {
                        if (0 != 0) {
                            try {
                                fileInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fileInputStream.close();
                        }
                    }
                    for (Pair pair : arrayList2) {
                        arrayList.add(DataUtil.deserializePartition(((byte[]) pair.right()).length, this.serializer, (Serializable) pair.left(), new ByteArrayInputStream((byte[]) pair.right())));
                    }
                    return arrayList;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new BlockFetchException(e);
        }
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public Iterable<SerializedPartition<K>> readSerializedPartitions(KeyRange keyRange) throws BlockFetchException {
        if (!this.metadata.isCommitted()) {
            throw new BlockFetchException(new Throwable("Cannot retrieve elements before a block is committed"));
        }
        ArrayList arrayList = new ArrayList();
        try {
            FileInputStream fileInputStream = new FileInputStream(this.filePath);
            Throwable th = null;
            try {
                try {
                    for (PartitionMetadata<K> partitionMetadata : this.metadata.getPartitionMetadataList()) {
                        K key = partitionMetadata.getKey();
                        if (keyRange.includes(key)) {
                            byte[] bArr = new byte[partitionMetadata.getPartitionSize()];
                            if (fileInputStream.read(bArr) != bArr.length) {
                                throw new IOException("The read data size does not match with the partition size.");
                            }
                            arrayList.add(new SerializedPartition(key, bArr, bArr.length));
                        } else {
                            skipBytes(fileInputStream, partitionMetadata.getPartitionSize());
                        }
                    }
                    if (fileInputStream != null) {
                        if (0 != 0) {
                            try {
                                fileInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fileInputStream.close();
                        }
                    }
                    return arrayList;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new BlockFetchException(e);
        }
    }

    private void skipBytes(InputStream inputStream, long j) throws IOException {
        long j2 = j;
        while (j2 > 0) {
            long skip = inputStream.skip(j);
            j2 -= skip;
            if (skip <= 0) {
                throw new IOException("The file stream failed to skip to the next block.");
            }
        }
    }

    public List<FileArea> asFileAreas(KeyRange keyRange) throws IOException {
        if (!this.metadata.isCommitted()) {
            throw new IOException("Cannot retrieve elements before a block is committed");
        }
        ArrayList arrayList = new ArrayList();
        for (PartitionMetadata<K> partitionMetadata : this.metadata.getPartitionMetadataList()) {
            if (keyRange.includes(partitionMetadata.getKey())) {
                arrayList.add(new FileArea(this.filePath, partitionMetadata.getOffset(), partitionMetadata.getPartitionSize()));
            }
        }
        return arrayList;
    }

    public void deleteFile() throws IOException {
        this.metadata.deleteMetadata();
        if (new File(this.filePath).exists()) {
            Files.delete(Paths.get(this.filePath, new String[0]));
        }
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public synchronized Optional<Map<K, Long>> commit() throws BlockWriteException {
        try {
            if (!this.metadata.isCommitted()) {
                commitPartitions();
                this.metadata.commitBlock();
            }
            List<PartitionMetadata<K>> partitionMetadataList = this.metadata.getPartitionMetadataList();
            HashMap hashMap = new HashMap(partitionMetadataList.size());
            for (PartitionMetadata<K> partitionMetadata : partitionMetadataList) {
                K key = partitionMetadata.getKey();
                long partitionSize = partitionMetadata.getPartitionSize();
                if (hashMap.containsKey(key)) {
                    hashMap.compute(key, (serializable, l) -> {
                        return Long.valueOf(l.longValue() + partitionSize);
                    });
                } else {
                    hashMap.put(key, Long.valueOf(partitionSize));
                }
            }
            return Optional.of(hashMap);
        } catch (IOException e) {
            throw new BlockWriteException(e);
        }
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public synchronized void commitPartitions() throws BlockWriteException {
        ArrayList arrayList = new ArrayList();
        try {
            for (SerializedPartition<K> serializedPartition : this.nonCommittedPartitionsMap.values()) {
                serializedPartition.commit();
                arrayList.add(serializedPartition);
            }
            writeToFile(arrayList);
            this.nonCommittedPartitionsMap.clear();
        } catch (IOException e) {
            throw new BlockWriteException(e);
        }
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public String getId() {
        return this.id;
    }

    @Override // org.apache.nemo.runtime.executor.data.block.Block
    public boolean isCommitted() {
        return this.metadata.isCommitted();
    }
}
