package org.apache.storm.messaging.local;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.storm.grouping.Load;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.IConnectionCallback;
import org.apache.storm.messaging.IContext;
import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.messaging.netty.BackPressureStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/messaging/local/Context.class */
public class Context implements IContext {
    private static final Logger LOG = LoggerFactory.getLogger(Context.class);
    private final ConcurrentHashMap<String, LocalServer> registry = new ConcurrentHashMap<>();

    /* loaded from: input_file:org/apache/storm/messaging/local/Context$LocalClient.class */
    private class LocalClient implements IConnection {
        private final LinkedBlockingQueue<TaskMessage> pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
        private final ScheduledExecutorService pendingFlusher;
        private final int port;
        private final String registryKey;

        LocalClient(String str, int i) {
            this.port = i;
            this.registryKey = Context.getNodeKey(str, i);
            this.pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory() { // from class: org.apache.storm.messaging.local.Context.LocalClient.1
                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    Thread thread = new Thread(runnable);
                    thread.setName("LocalClientFlusher-" + thread.getId());
                    thread.setDaemon(true);
                    return thread;
                }
            });
            this.pendingFlusher.scheduleAtFixedRate(new Runnable() { // from class: org.apache.storm.messaging.local.Context.LocalClient.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        LocalClient.this.flushPending();
                    } catch (Throwable th) {
                        Context.LOG.error("Uncaught throwable in pending message flusher thread, messages may be lost", th);
                        throw new RuntimeException(th);
                    }
                }
            }, 5L, 5L, TimeUnit.SECONDS);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flushPending() {
            LocalServer localServer = (LocalServer) Context.this.registry.get(this.registryKey);
            if (localServer == null || this.pendingDueToUnregisteredServer.isEmpty()) {
                return;
            }
            ArrayList arrayList = new ArrayList();
            this.pendingDueToUnregisteredServer.drainTo(arrayList);
            localServer.cb.recv(arrayList);
        }

        @Override // org.apache.storm.messaging.IConnection
        public void send(Iterator<TaskMessage> it) {
            LocalServer localServer = (LocalServer) Context.this.registry.get(this.registryKey);
            if (localServer == null) {
                while (it.hasNext()) {
                    this.pendingDueToUnregisteredServer.add(it.next());
                }
                return;
            }
            flushPending();
            ArrayList arrayList = new ArrayList();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            localServer.cb.recv(arrayList);
        }

        @Override // org.apache.storm.messaging.IConnection
        public Map<Integer, Load> getLoad(Collection<Integer> collection) {
            LocalServer localServer = (LocalServer) Context.this.registry.get(this.registryKey);
            return localServer != null ? localServer.getLoad(collection) : Collections.emptyMap();
        }

        @Override // org.apache.storm.messaging.IConnection
        public void sendLoadMetrics(Map<Integer, Double> map) {
            LocalServer localServer = (LocalServer) Context.this.registry.get(this.registryKey);
            if (localServer != null) {
                localServer.sendLoadMetrics(map);
            }
        }

        @Override // org.apache.storm.messaging.IConnection
        public void sendBackPressureStatus(BackPressureStatus backPressureStatus) {
            throw new RuntimeException("Local Client connection should not send BackPressure status");
        }

        @Override // org.apache.storm.messaging.IConnection
        public int getPort() {
            return this.port;
        }

        @Override // org.apache.storm.messaging.IConnection, java.lang.AutoCloseable
        public void close() {
            this.pendingFlusher.shutdown();
            try {
                this.pendingFlusher.awaitTermination(5L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while awaiting flusher shutdown", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/messaging/local/Context$LocalServer.class */
    public class LocalServer implements IConnection {
        final ConcurrentHashMap<Integer, Double> load = new ConcurrentHashMap<>();
        final int port;
        final IConnectionCallback cb;

        LocalServer(int i, IConnectionCallback iConnectionCallback) {
            this.port = i;
            this.cb = iConnectionCallback;
        }

        @Override // org.apache.storm.messaging.IConnection
        public void send(Iterator<TaskMessage> it) {
            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
        }

        @Override // org.apache.storm.messaging.IConnection
        public Map<Integer, Load> getLoad(Collection<Integer> collection) {
            HashMap hashMap = new HashMap();
            for (Integer num : collection) {
                Double d = this.load.get(num);
                if (d != null) {
                    hashMap.put(num, new Load(true, d.doubleValue(), 0.0d));
                }
            }
            return hashMap;
        }

        @Override // org.apache.storm.messaging.IConnection
        public void sendLoadMetrics(Map<Integer, Double> map) {
            this.load.putAll(map);
        }

        @Override // org.apache.storm.messaging.IConnection
        public void sendBackPressureStatus(BackPressureStatus backPressureStatus) {
            throw new RuntimeException("Local Server connection should not send BackPressure status");
        }

        @Override // org.apache.storm.messaging.IConnection
        public int getPort() {
            return this.port;
        }

        @Override // org.apache.storm.messaging.IConnection, java.lang.AutoCloseable
        public void close() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getNodeKey(String str, int i) {
        return str + "-" + i;
    }

    private LocalServer createLocalServer(String str, int i, IConnectionCallback iConnectionCallback) {
        String nodeKey = getNodeKey(str, i);
        LocalServer localServer = new LocalServer(i, iConnectionCallback);
        LocalServer put = this.registry.put(nodeKey, localServer);
        if (put != null) {
            LOG.info("Replacing existing server for key {}", new Object[]{put, localServer, nodeKey});
        }
        return localServer;
    }

    @Override // org.apache.storm.messaging.IContext
    public void prepare(Map<String, Object> map) {
    }

    @Override // org.apache.storm.messaging.IContext
    public IConnection bind(String str, int i, IConnectionCallback iConnectionCallback, Supplier<Object> supplier) {
        return createLocalServer(str, i, iConnectionCallback);
    }

    @Override // org.apache.storm.messaging.IContext
    public IConnection connect(String str, String str2, int i, AtomicBoolean[] atomicBooleanArr) {
        return new LocalClient(str, i);
    }

    @Override // org.apache.storm.messaging.IContext
    public void term() {
    }
}
