package org.apache.nemo.runtime.common.message.grpc;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.comm.GrpcMessageService;
import org.apache.nemo.runtime.common.comm.MessageServiceGrpc;
import org.apache.nemo.runtime.common.message.MessageListener;
import org.apache.reef.io.network.naming.NameResolver;
import org.apache.reef.wake.IdentifierFactory;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/nemo/runtime/common/message/grpc/GrpcMessageServer.class */
public final class GrpcMessageServer {
    private static final Logger LOG = LoggerFactory.getLogger(GrpcMessageServer.class);
    private static final int NAME_SERVER_REGISTER_RETRY_COUNT = 3;
    private static final int NAME_SERVER_REGISTER_RETRY_DELAY_MS = 100;
    private final LocalAddressProvider localAddressProvider;
    private final NameResolver nameResolver;
    private final IdentifierFactory idFactory;
    private final String localSenderId;
    private final ConcurrentMap<String, MessageListener> listenerMap = new ConcurrentHashMap();
    private Server server;

    /* loaded from: input_file:org/apache/nemo/runtime/common/message/grpc/GrpcMessageServer$MessageService.class */
    private class MessageService extends MessageServiceGrpc.MessageServiceImplBase {
        private final GrpcMessageService.Void voidMessage;

        private MessageService() {
            this.voidMessage = GrpcMessageService.Void.newBuilder().m821build();
        }

        @Override // org.apache.nemo.runtime.common.comm.MessageServiceGrpc.MessageServiceImplBase
        public void send(ControlMessage.Message message, StreamObserver<GrpcMessageService.Void> streamObserver) {
            MessageListener messageListener = (MessageListener) GrpcMessageServer.this.listenerMap.get(message.getListenerId());
            if (messageListener == null) {
                GrpcMessageServer.LOG.warn("A msg is ignored since there is no registered listener. msg.id={}, msg.listenerId={}, msg.type={}", new Object[]{Long.valueOf(message.getId()), message.getListenerId(), message.getType()});
                streamObserver.onNext(this.voidMessage);
                streamObserver.onCompleted();
            } else {
                GrpcMessageServer.LOG.debug("[SEND] request msg.id={}, msg.listenerId={}, msg.type={}", new Object[]{Long.valueOf(message.getId()), message.getListenerId(), message.getType()});
                messageListener.onMessage(message);
                streamObserver.onNext(this.voidMessage);
                streamObserver.onCompleted();
            }
        }

        @Override // org.apache.nemo.runtime.common.comm.MessageServiceGrpc.MessageServiceImplBase
        public void request(ControlMessage.Message message, StreamObserver<ControlMessage.Message> streamObserver) {
            GrpcMessageServer.LOG.debug("[REQUEST] request msg.id={}, msg.listenerId={}, msg.type={}", new Object[]{Long.valueOf(message.getId()), message.getListenerId(), message.getType()});
            MessageListener messageListener = (MessageListener) GrpcMessageServer.this.listenerMap.get(message.getListenerId());
            if (messageListener != null) {
                messageListener.onMessageWithContext(message, new GrpcMessageContext(streamObserver));
            } else {
                GrpcMessageServer.LOG.warn("A message arrived, which has no registered listener. msg.id={}, msg.listenerId={}, msg.type={}", new Object[]{Long.valueOf(message.getId()), message.getListenerId(), message.getType()});
                streamObserver.onError(new Exception("There is no registered listener id=" + message.getListenerId() + " for message type=" + message.getType()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GrpcMessageServer(LocalAddressProvider localAddressProvider, NameResolver nameResolver, IdentifierFactory identifierFactory, String str) {
        this.localAddressProvider = localAddressProvider;
        this.nameResolver = nameResolver;
        this.idFactory = identifierFactory;
        this.localSenderId = str;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setupListener(String str, MessageListener<ControlMessage.Message> messageListener) {
        if (this.listenerMap.putIfAbsent(str, messageListener) != null) {
            throw new RuntimeException("A listener for " + str + " was already setup");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeListener(String str) {
        this.listenerMap.remove(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() throws Exception {
        this.server = ServerBuilder.forPort(0).addService(new MessageService()).build();
        this.server.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            this.server.shutdown();
        }));
        registerToNameServer(this.server.getPort());
    }

    private void registerToNameServer(int i) throws Exception {
        InetSocketAddress inetSocketAddress = new InetSocketAddress(this.localAddressProvider.getLocalAddress(), i);
        for (int i2 = 0; i2 < 3; i2++) {
            try {
                this.nameResolver.register(this.idFactory.getNewInstance(this.localSenderId), inetSocketAddress);
                return;
            } catch (Exception e) {
                LOG.warn("Exception occurred while registering a server to the name server. id=" + this.localSenderId, e);
                Thread.sleep(100L);
            }
        }
        throw new Exception("Failed to register id=" + this.localSenderId + " after 3 retries");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.server.shutdown();
    }
}
