package org.apache.nemo.runtime.executor.data;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.nemo.common.KeyRange;
import org.apache.nemo.common.exception.BlockFetchException;
import org.apache.nemo.common.exception.BlockWriteException;
import org.apache.nemo.common.exception.UnsupportedBlockStoreException;
import org.apache.nemo.common.exception.UnsupportedExecutionPropertyException;
import org.apache.nemo.common.ir.edge.executionproperty.DataPersistenceProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import org.apache.nemo.runtime.executor.bytetransfer.ByteInputContext;
import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
import org.apache.nemo.runtime.executor.bytetransfer.ByteTransfer;
import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.data.block.Block;
import org.apache.nemo.runtime.executor.data.block.FileBlock;
import org.apache.nemo.runtime.executor.data.partition.NonSerializedPartition;
import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
import org.apache.nemo.runtime.executor.data.stores.BlockStore;
import org.apache.nemo.runtime.executor.data.stores.LocalFileStore;
import org.apache.nemo.runtime.executor.data.stores.MemoryStore;
import org.apache.nemo.runtime.executor.data.stores.RemoteFileStore;
import org.apache.nemo.runtime.executor.data.stores.SerializedMemoryStore;
import org.apache.reef.tang.annotations.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/nemo/runtime/executor/data/BlockManagerWorker.class */
public final class BlockManagerWorker {
    private static final Logger LOG = LoggerFactory.getLogger(BlockManagerWorker.class.getName());
    private static final String REMOTE_FILE_STORE = "REMOTE_FILE_STORE";
    private final String executorId;
    private final SerializerManager serializerManager;
    private final MemoryStore memoryStore;
    private final SerializedMemoryStore serializedMemoryStore;
    private final LocalFileStore localFileStore;
    private final RemoteFileStore remoteFileStore;
    private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
    private final ByteTransfer byteTransfer;
    private final ExecutorService backgroundExecutorService;
    private final BlockTransferThrottler blockTransferThrottler;
    private final Map<String, AtomicInteger> blockToRemainingRead = new ConcurrentHashMap();
    private final Map<String, CompletableFuture<ControlMessage.Message>> pendingBlockLocationRequest = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.nemo.runtime.executor.data.BlockManagerWorker$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/nemo/runtime/executor/data/BlockManagerWorker$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nemo$common$ir$edge$executionproperty$DataPersistenceProperty$Value;
        static final /* synthetic */ int[] $SwitchMap$org$apache$nemo$common$ir$edge$executionproperty$DataStoreProperty$Value;
        static final /* synthetic */ int[] $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$BlockStore = new int[ControlMessage.BlockStore.values().length];

        static {
            try {
                $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$BlockStore[ControlMessage.BlockStore.MEMORY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$BlockStore[ControlMessage.BlockStore.SER_MEMORY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$BlockStore[ControlMessage.BlockStore.LOCAL_FILE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$BlockStore[ControlMessage.BlockStore.REMOTE_FILE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$org$apache$nemo$common$ir$edge$executionproperty$DataStoreProperty$Value = new int[DataStoreProperty.Value.values().length];
            try {
                $SwitchMap$org$apache$nemo$common$ir$edge$executionproperty$DataStoreProperty$Value[DataStoreProperty.Value.MemoryStore.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$nemo$common$ir$edge$executionproperty$DataStoreProperty$Value[DataStoreProperty.Value.SerializedMemoryStore.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$nemo$common$ir$edge$executionproperty$DataStoreProperty$Value[DataStoreProperty.Value.LocalFileStore.ordinal()] = 3;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$nemo$common$ir$edge$executionproperty$DataStoreProperty$Value[DataStoreProperty.Value.GlusterFileStore.ordinal()] = 4;
            } catch (NoSuchFieldError e8) {
            }
            $SwitchMap$org$apache$nemo$common$ir$edge$executionproperty$DataPersistenceProperty$Value = new int[DataPersistenceProperty.Value.values().length];
            try {
                $SwitchMap$org$apache$nemo$common$ir$edge$executionproperty$DataPersistenceProperty$Value[DataPersistenceProperty.Value.Discard.ordinal()] = 1;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$nemo$common$ir$edge$executionproperty$DataPersistenceProperty$Value[DataPersistenceProperty.Value.Keep.ordinal()] = 2;
            } catch (NoSuchFieldError e10) {
            }
        }
    }

    @Inject
    private BlockManagerWorker(@Parameter(JobConf.ExecutorId.class) String str, @Parameter(JobConf.IORequestHandleThreadsTotal.class) int i, MemoryStore memoryStore, SerializedMemoryStore serializedMemoryStore, LocalFileStore localFileStore, RemoteFileStore remoteFileStore, PersistentConnectionToMasterMap persistentConnectionToMasterMap, ByteTransfer byteTransfer, SerializerManager serializerManager, BlockTransferThrottler blockTransferThrottler) {
        this.executorId = str;
        this.memoryStore = memoryStore;
        this.serializedMemoryStore = serializedMemoryStore;
        this.localFileStore = localFileStore;
        this.remoteFileStore = remoteFileStore;
        this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
        this.byteTransfer = byteTransfer;
        this.backgroundExecutorService = Executors.newFixedThreadPool(i);
        this.serializerManager = serializerManager;
        this.blockTransferThrottler = blockTransferThrottler;
    }

    public Block createBlock(String str, DataStoreProperty.Value value) throws BlockWriteException {
        return getBlockStore(value).createBlock(str);
    }

    public CompletableFuture<DataUtil.IteratorWithNumBytes> readBlock(String str, String str2, DataStoreProperty.Value value, KeyRange keyRange) {
        CompletableFuture<ControlMessage.Message> computeIfAbsent = this.pendingBlockLocationRequest.computeIfAbsent(str, str3 -> {
            return this.persistentConnectionToMasterMap.getMessageSender("BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID").request(ControlMessage.Message.newBuilder().setId(RuntimeIdManager.generateMessageId()).setListenerId("BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID").setType(ControlMessage.MessageType.RequestBlockLocation).setRequestBlockLocationMsg(ControlMessage.RequestBlockLocationMsg.newBuilder().setExecutorId(this.executorId).setBlockIdWildcard(str).build()).build());
        });
        computeIfAbsent.whenComplete((message, th) -> {
            this.pendingBlockLocationRequest.remove(str);
        });
        return computeIfAbsent.thenCompose(message2 -> {
            if (message2.getType() != ControlMessage.MessageType.BlockLocationInfo) {
                throw new RuntimeException("Response message type mismatch!");
            }
            ControlMessage.BlockLocationInfoMsg blockLocationInfoMsg = message2.getBlockLocationInfoMsg();
            if (!blockLocationInfoMsg.hasOwnerExecutorId()) {
                throw new BlockFetchException(new Throwable("Block " + str + " location unknown: The block state is " + blockLocationInfoMsg.getState()));
            }
            String blockId = blockLocationInfoMsg.getBlockId();
            String ownerExecutorId = blockLocationInfoMsg.getOwnerExecutorId();
            if (ownerExecutorId.equals(this.executorId) || ownerExecutorId.equals(REMOTE_FILE_STORE)) {
                return getDataFromLocalBlock(blockId, value, keyRange);
            }
            ControlMessage.BlockTransferContextDescriptor build = ControlMessage.BlockTransferContextDescriptor.newBuilder().setBlockId(blockId).setBlockStore(convertBlockStore(value)).setRuntimeEdgeId(str2).setKeyRange(ByteString.copyFrom(SerializationUtils.serialize(keyRange))).build();
            CompletableFuture<U> thenCompose = this.blockTransferThrottler.requestTransferPermission(str2).thenCompose(r8 -> {
                return this.byteTransfer.newInputContext(ownerExecutorId, build.toByteArray(), false);
            });
            thenCompose.whenComplete((BiConsumer<? super U, ? super Throwable>) (byteInputContext, th2) -> {
                if (th2 != null) {
                    this.blockTransferThrottler.onTransferFinished(str2);
                } else {
                    byteInputContext.getCompletedFuture().whenComplete((it, th2) -> {
                        this.blockTransferThrottler.onTransferFinished(str2);
                    });
                }
            });
            return thenCompose.thenApply((Function<? super U, ? extends U>) byteInputContext2 -> {
                return new DataUtil.InputStreamIterator(byteInputContext2.getInputStreams(), this.serializerManager.getSerializer(str2));
            });
        });
    }

    public void writeBlock(Block block, DataStoreProperty.Value value, int i, DataPersistenceProperty.Value value2) {
        String id = block.getId();
        LOG.info("CommitBlock: {}", id);
        switch (AnonymousClass3.$SwitchMap$org$apache$nemo$common$ir$edge$executionproperty$DataPersistenceProperty$Value[value2.ordinal()]) {
            case 1:
                this.blockToRemainingRead.put(block.getId(), new AtomicInteger(i));
                break;
            case 2:
                break;
            default:
                throw new UnsupportedExecutionPropertyException("This used data handling property is not supported.");
        }
        getBlockStore(value).writeBlock(block);
        ControlMessage.BlockStateChangedMsg.Builder state = ControlMessage.BlockStateChangedMsg.newBuilder().setExecutorId(this.executorId).setBlockId(id).setState(ControlMessage.BlockStateFromExecutor.AVAILABLE);
        if (DataStoreProperty.Value.GlusterFileStore.equals(value)) {
            state.setLocation(REMOTE_FILE_STORE);
        } else {
            state.setLocation(this.executorId);
        }
        this.persistentConnectionToMasterMap.getMessageSender("BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID").send(ControlMessage.Message.newBuilder().setId(RuntimeIdManager.generateMessageId()).setListenerId("BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID").setType(ControlMessage.MessageType.BlockStateChanged).setBlockStateChangedMsg(state.build()).build());
    }

    public void removeBlock(String str, DataStoreProperty.Value value) {
        LOG.info("RemoveBlock: {}", str);
        if (!getBlockStore(value).deleteBlock(str)) {
            throw new BlockFetchException(new Throwable("Cannot find corresponding block " + str));
        }
        ControlMessage.BlockStateChangedMsg.Builder state = ControlMessage.BlockStateChangedMsg.newBuilder().setExecutorId(this.executorId).setBlockId(str).setState(ControlMessage.BlockStateFromExecutor.NOT_AVAILABLE);
        if (DataStoreProperty.Value.GlusterFileStore.equals(value)) {
            state.setLocation(REMOTE_FILE_STORE);
        } else {
            state.setLocation(this.executorId);
        }
        this.persistentConnectionToMasterMap.getMessageSender("BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID").send(ControlMessage.Message.newBuilder().setId(RuntimeIdManager.generateMessageId()).setListenerId("BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID").setType(ControlMessage.MessageType.BlockStateChanged).setBlockStateChangedMsg(state).build());
    }

    public void onOutputContext(final ByteOutputContext byteOutputContext) throws InvalidProtocolBufferException {
        ControlMessage.BlockTransferContextDescriptor blockTransferContextDescriptor = (ControlMessage.BlockTransferContextDescriptor) ControlMessage.BlockTransferContextDescriptor.PARSER.parseFrom(byteOutputContext.getContextDescriptor());
        final DataStoreProperty.Value convertBlockStore = convertBlockStore(blockTransferContextDescriptor.getBlockStore());
        final String blockId = blockTransferContextDescriptor.getBlockId();
        final KeyRange keyRange = (KeyRange) SerializationUtils.deserialize(blockTransferContextDescriptor.getKeyRange().toByteArray());
        this.backgroundExecutorService.submit(new Runnable() { // from class: org.apache.nemo.runtime.executor.data.BlockManagerWorker.1
            @Override // java.lang.Runnable
            public void run() {
                ByteOutputContext.ByteOutputStream newOutputStream;
                try {
                    Optional<Block> readBlock = BlockManagerWorker.this.getBlockStore(convertBlockStore).readBlock(blockId);
                    if (!readBlock.isPresent()) {
                        throw new RuntimeException(String.format("Block %s not found in local BlockManagerWorker", blockId));
                    }
                    if (DataStoreProperty.Value.LocalFileStore.equals(convertBlockStore) || DataStoreProperty.Value.GlusterFileStore.equals(convertBlockStore)) {
                        for (FileArea fileArea : ((FileBlock) readBlock.get()).asFileAreas(keyRange)) {
                            newOutputStream = byteOutputContext.newOutputStream();
                            Throwable th = null;
                            try {
                                try {
                                    newOutputStream.writeFileArea(fileArea);
                                    if (newOutputStream != null) {
                                        if (0 != 0) {
                                            try {
                                                newOutputStream.close();
                                            } catch (Throwable th2) {
                                                th.addSuppressed(th2);
                                            }
                                        } else {
                                            newOutputStream.close();
                                        }
                                    }
                                } catch (Throwable th3) {
                                    th = th3;
                                    throw th3;
                                }
                            } finally {
                                if (newOutputStream != null) {
                                    if (th != null) {
                                        try {
                                            newOutputStream.close();
                                        } catch (Throwable th4) {
                                            th.addSuppressed(th4);
                                        }
                                    } else {
                                        newOutputStream.close();
                                    }
                                }
                            }
                        }
                    } else {
                        for (SerializedPartition serializedPartition : readBlock.get().readSerializedPartitions(keyRange)) {
                            newOutputStream = byteOutputContext.newOutputStream();
                            Throwable th5 = null;
                            try {
                                try {
                                    newOutputStream.writeSerializedPartition(serializedPartition);
                                    if (newOutputStream != null) {
                                        if (0 != 0) {
                                            try {
                                                newOutputStream.close();
                                            } catch (Throwable th6) {
                                                th5.addSuppressed(th6);
                                            }
                                        } else {
                                            newOutputStream.close();
                                        }
                                    }
                                } finally {
                                }
                            } catch (Throwable th7) {
                                th5 = th7;
                                throw th7;
                            }
                        }
                    }
                    BlockManagerWorker.this.handleDataPersistence(convertBlockStore, blockId);
                    byteOutputContext.close();
                } catch (IOException | BlockFetchException e) {
                    BlockManagerWorker.LOG.error("Closing a block request exceptionally", e);
                    byteOutputContext.onChannelError(e);
                }
            }
        });
    }

    public void onInputContext(ByteInputContext byteInputContext) {
        throw new IllegalStateException("No logic here");
    }

    private CompletableFuture<DataUtil.IteratorWithNumBytes> getDataFromLocalBlock(String str, DataStoreProperty.Value value, KeyRange keyRange) {
        Optional<Block> readBlock = getBlockStore(value).readBlock(str);
        if (!readBlock.isPresent()) {
            throw new RuntimeException(String.format("Block %s not found in local BlockManagerWorker", str));
        }
        Iterable<NonSerializedPartition> readPartitions = readBlock.get().readPartitions(keyRange);
        handleDataPersistence(value, str);
        try {
            Iterator it = DataUtil.concatNonSerPartitions(readPartitions).iterator();
            long j = 0;
            long j2 = 0;
            try {
                for (NonSerializedPartition nonSerializedPartition : readPartitions) {
                    j += nonSerializedPartition.getNumSerializedBytes();
                    j2 += nonSerializedPartition.getNumEncodedBytes();
                }
                return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(it, j, j2));
            } catch (DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
                return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(it));
            }
        } catch (IOException e2) {
            throw new BlockFetchException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleDataPersistence(final DataStoreProperty.Value value, final String str) {
        AtomicInteger atomicInteger = this.blockToRemainingRead.get(str);
        if (atomicInteger == null || atomicInteger.decrementAndGet() != 0) {
            return;
        }
        this.blockToRemainingRead.remove(str);
        this.backgroundExecutorService.submit(new Runnable() { // from class: org.apache.nemo.runtime.executor.data.BlockManagerWorker.2
            @Override // java.lang.Runnable
            public void run() {
                BlockManagerWorker.this.removeBlock(str, value);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public BlockStore getBlockStore(DataStoreProperty.Value value) {
        switch (AnonymousClass3.$SwitchMap$org$apache$nemo$common$ir$edge$executionproperty$DataStoreProperty$Value[value.ordinal()]) {
            case 1:
                return this.memoryStore;
            case 2:
                return this.serializedMemoryStore;
            case 3:
                return this.localFileStore;
            case 4:
                return this.remoteFileStore;
            default:
                throw new UnsupportedBlockStoreException(new Exception(value + " is not supported."));
        }
    }

    private static ControlMessage.BlockStore convertBlockStore(DataStoreProperty.Value value) {
        switch (AnonymousClass3.$SwitchMap$org$apache$nemo$common$ir$edge$executionproperty$DataStoreProperty$Value[value.ordinal()]) {
            case 1:
                return ControlMessage.BlockStore.MEMORY;
            case 2:
                return ControlMessage.BlockStore.SER_MEMORY;
            case 3:
                return ControlMessage.BlockStore.LOCAL_FILE;
            case 4:
                return ControlMessage.BlockStore.REMOTE_FILE;
            default:
                throw new UnsupportedBlockStoreException(new Exception(value + " is not supported."));
        }
    }

    private static DataStoreProperty.Value convertBlockStore(ControlMessage.BlockStore blockStore) {
        switch (AnonymousClass3.$SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$BlockStore[blockStore.ordinal()]) {
            case 1:
                return DataStoreProperty.Value.MemoryStore;
            case 2:
                return DataStoreProperty.Value.SerializedMemoryStore;
            case 3:
                return DataStoreProperty.Value.LocalFileStore;
            case 4:
                return DataStoreProperty.Value.GlusterFileStore;
            default:
                throw new UnsupportedBlockStoreException(new Exception("This block store is not yet supported"));
        }
    }
}
