package org.apache.spark.network.shuffle;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Timer;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
import org.apache.spark.network.shuffle.protocol.StreamHandle;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.annotations.VisibleForTesting;
import org.spark_project.guava.collect.Lists;

/* loaded from: input_file:org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.class */
public class ExternalShuffleBlockHandler extends RpcHandler {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ExternalShuffleBlockHandler.class);

    @VisibleForTesting
    final ExternalShuffleBlockResolver blockManager;
    private final OneForOneStreamManager streamManager;
    private final ShuffleMetrics metrics;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/spark/network/shuffle/ExternalShuffleBlockHandler$ShuffleMetrics.class */
    public class ShuffleMetrics implements MetricSet {
        private final Map<String, Metric> allMetrics;
        private final Timer openBlockRequestLatencyMillis;
        private final Timer registerExecutorRequestLatencyMillis;
        private final Meter blockTransferRateBytes;

        private ShuffleMetrics() {
            this.openBlockRequestLatencyMillis = new Timer();
            this.registerExecutorRequestLatencyMillis = new Timer();
            this.blockTransferRateBytes = new Meter();
            this.allMetrics = new HashMap();
            this.allMetrics.put("openBlockRequestLatencyMillis", this.openBlockRequestLatencyMillis);
            this.allMetrics.put("registerExecutorRequestLatencyMillis", this.registerExecutorRequestLatencyMillis);
            this.allMetrics.put("blockTransferRateBytes", this.blockTransferRateBytes);
            this.allMetrics.put("registeredExecutorsSize", new Gauge<Integer>() { // from class: org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.codahale.metrics.Gauge
                /* renamed from: getValue */
                public Integer mo8419getValue() {
                    return Integer.valueOf(ExternalShuffleBlockHandler.this.blockManager.getRegisteredExecutorsSize());
                }
            });
        }

        @Override // com.codahale.metrics.MetricSet
        public Map<String, Metric> getMetrics() {
            return this.allMetrics;
        }
    }

    public ExternalShuffleBlockHandler(TransportConf transportConf, File file) throws IOException {
        this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(transportConf, file));
    }

    @VisibleForTesting
    public ExternalShuffleBlockHandler(OneForOneStreamManager oneForOneStreamManager, ExternalShuffleBlockResolver externalShuffleBlockResolver) {
        this.metrics = new ShuffleMetrics();
        this.streamManager = oneForOneStreamManager;
        this.blockManager = externalShuffleBlockResolver;
    }

    @Override // org.apache.spark.network.server.RpcHandler
    public void receive(TransportClient transportClient, ByteBuffer byteBuffer, RpcResponseCallback rpcResponseCallback) {
        handleMessage(BlockTransferMessage.Decoder.fromByteBuffer(byteBuffer), transportClient, rpcResponseCallback);
    }

    protected void handleMessage(BlockTransferMessage blockTransferMessage, TransportClient transportClient, RpcResponseCallback rpcResponseCallback) {
        Timer.Context time;
        if (!(blockTransferMessage instanceof OpenBlocks)) {
            if (!(blockTransferMessage instanceof RegisterExecutor)) {
                throw new UnsupportedOperationException("Unexpected message: " + blockTransferMessage);
            }
            time = this.metrics.registerExecutorRequestLatencyMillis.time();
            try {
                RegisterExecutor registerExecutor = (RegisterExecutor) blockTransferMessage;
                checkAuth(transportClient, registerExecutor.appId);
                this.blockManager.registerExecutor(registerExecutor.appId, registerExecutor.execId, registerExecutor.executorInfo);
                rpcResponseCallback.onSuccess(ByteBuffer.wrap(new byte[0]));
                time.stop();
                return;
            } finally {
            }
        }
        time = this.metrics.openBlockRequestLatencyMillis.time();
        try {
            OpenBlocks openBlocks = (OpenBlocks) blockTransferMessage;
            checkAuth(transportClient, openBlocks.appId);
            ArrayList newArrayList = Lists.newArrayList();
            long j = 0;
            for (String str : openBlocks.blockIds) {
                ManagedBuffer blockData = this.blockManager.getBlockData(openBlocks.appId, openBlocks.execId, str);
                j += blockData != null ? blockData.size() : 0L;
                newArrayList.add(blockData);
            }
            long registerStream = this.streamManager.registerStream(transportClient.getClientId(), newArrayList.iterator());
            if (logger.isTraceEnabled()) {
                logger.trace("Registered streamId {} with {} buffers for client {} from host {}", Long.valueOf(registerStream), Integer.valueOf(openBlocks.blockIds.length), transportClient.getClientId(), NettyUtils.getRemoteAddress(transportClient.getChannel()));
            }
            rpcResponseCallback.onSuccess(new StreamHandle(registerStream, openBlocks.blockIds.length).toByteBuffer());
            this.metrics.blockTransferRateBytes.mark(j);
            time.stop();
        } finally {
        }
    }

    public MetricSet getAllMetrics() {
        return this.metrics;
    }

    @Override // org.apache.spark.network.server.RpcHandler
    public StreamManager getStreamManager() {
        return this.streamManager;
    }

    public void applicationRemoved(String str, boolean z) {
        this.blockManager.applicationRemoved(str, z);
    }

    public void reregisterExecutor(ExternalShuffleBlockResolver.AppExecId appExecId, ExecutorShuffleInfo executorShuffleInfo) {
        this.blockManager.registerExecutor(appExecId.appId, appExecId.execId, executorShuffleInfo);
    }

    public void close() {
        this.blockManager.close();
    }

    private void checkAuth(TransportClient transportClient, String str) {
        if (transportClient.getClientId() != null && !transportClient.getClientId().equals(str)) {
            throw new SecurityException(String.format("Client for %s not authorized for application %s.", transportClient.getClientId(), str));
        }
    }
}
