package org.apache.nemo.runtime.executor.bytetransfer;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/nemo/runtime/executor/bytetransfer/ByteTransfer.class */
public final class ByteTransfer {
    private static final Logger LOG = LoggerFactory.getLogger(ByteTransfer.class);
    private final ByteTransport byteTransport;
    private final ConcurrentMap<String, ChannelFuture> executorIdToChannelFutureMap = new ConcurrentHashMap();

    @Inject
    private ByteTransfer(ByteTransport byteTransport) {
        this.byteTransport = byteTransport;
    }

    public CompletableFuture<ByteInputContext> newInputContext(String str, byte[] bArr, boolean z) {
        return connectTo(str).thenApply(contextManager -> {
            return contextManager.newInputContext(str, bArr, z);
        });
    }

    public CompletableFuture<ByteOutputContext> newOutputContext(String str, byte[] bArr, boolean z) {
        return connectTo(str).thenApply(contextManager -> {
            return contextManager.newOutputContext(str, bArr, z);
        });
    }

    private CompletableFuture<ContextManager> connectTo(String str) {
        CompletableFuture<ContextManager> completableFuture = new CompletableFuture<>();
        try {
            ChannelFuture compute = this.executorIdToChannelFutureMap.compute(str, (str2, channelFuture) -> {
                if (channelFuture != null && (channelFuture.channel().isOpen() || channelFuture.channel().isActive())) {
                    return channelFuture;
                }
                ChannelFuture connectTo = this.byteTransport.connectTo(str2);
                connectTo.channel().closeFuture().addListener(future -> {
                    this.executorIdToChannelFutureMap.remove(str2, connectTo);
                });
                return connectTo;
            });
            compute.addListener(future -> {
                if (future.isSuccess()) {
                    completableFuture.complete(compute.channel().pipeline().get(ContextManager.class));
                } else {
                    this.executorIdToChannelFutureMap.remove(str, compute);
                    completableFuture.completeExceptionally(future.cause());
                }
            });
            return completableFuture;
        } catch (RuntimeException e) {
            completableFuture.completeExceptionally(e);
            return completableFuture;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onNewContextByRemoteExecutor(String str, Channel channel) {
        this.executorIdToChannelFutureMap.compute(str, (str2, channelFuture) -> {
            if (channelFuture == null) {
                LOG.debug("Remote {}({}) connected to this executor", new Object[]{str2, channel.remoteAddress()});
                return channel.newSucceededFuture();
            }
            if (channel == channelFuture.channel()) {
                return channelFuture;
            }
            LOG.warn("Duplicate channel for remote {}({}) and this executor", new Object[]{str2, channel.remoteAddress()});
            return channel.newSucceededFuture();
        });
    }
}
