package com.datatorrent.stram.util;

import com.datatorrent.stram.webapp.StramWebServices;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/util/SharedPubSubWebSocketClient.class */
public class SharedPubSubWebSocketClient extends PubSubWebSocketClient {
    public static final String LAST_INDEX_TOPIC_PREFIX = "_internal.lastIndex";
    private static final Logger LOG = LoggerFactory.getLogger(SharedPubSubWebSocketClient.class);
    private final Map<String, List<Handler>> topicHandlers;
    private long lastConnectTryTime;
    private final long minWaitConnectionRetry = 5000;
    private final long timeoutMillis;

    /* loaded from: input_file:com/datatorrent/stram/util/SharedPubSubWebSocketClient$Handler.class */
    public interface Handler {
        void onMessage(String str, String str2, Object obj);

        void onClose();
    }

    public SharedPubSubWebSocketClient(URI uri, long j) {
        this.topicHandlers = new HashMap();
        this.minWaitConnectionRetry = StramWebServices.WAIT_TIME;
        setUri(uri);
        this.lastConnectTryTime = System.currentTimeMillis();
        this.timeoutMillis = j;
    }

    public SharedPubSubWebSocketClient(String str, long j) throws URISyntaxException {
        this(new URI(str), j);
    }

    public synchronized void openConnection() throws IOException, ExecutionException, InterruptedException, TimeoutException {
        if (isConnectionSetup()) {
            return;
        }
        super.openConnection(this.timeoutMillis);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v20, types: [java.util.List] */
    public synchronized void addHandler(String str, boolean z, Handler handler) {
        ArrayList arrayList;
        if (z) {
            str = str + ".numSubscribers";
        }
        if (this.topicHandlers.containsKey(str)) {
            arrayList = (List) this.topicHandlers.get(str);
        } else {
            arrayList = new ArrayList();
            this.topicHandlers.put(str, arrayList);
        }
        arrayList.add(handler);
        try {
            if (isConnectionOpen()) {
                if (z) {
                    subscribeNumSubscribers(str);
                } else {
                    subscribe(str);
                }
            }
        } catch (IOException e) {
            LOG.warn("Cannot subscribe to {}", str);
        }
    }

    @Override // com.datatorrent.stram.util.PubSubWebSocketClient
    public void publish(String str, Object obj) throws IOException {
        synchronized (this) {
            if (!isConnectionOpen()) {
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    if (this.lastConnectTryTime + StramWebServices.WAIT_TIME < currentTimeMillis) {
                        this.lastConnectTryTime = currentTimeMillis;
                        openConnectionAsync();
                    }
                } catch (Exception e) {
                    LOG.debug("Failed attempt to reconnect to websocket server", e);
                }
            }
        }
        super.publish(str, obj);
    }

    @Override // com.datatorrent.stram.util.PubSubWebSocketClient
    public void onOpen(WebSocket webSocket) {
        for (String str : this.topicHandlers.keySet()) {
            try {
                subscribe(str);
            } catch (IOException e) {
                LOG.warn("Cannot subscribe to {}", str);
            }
        }
    }

    @Override // com.datatorrent.stram.util.PubSubWebSocketClient
    public synchronized void onMessage(String str, String str2, Object obj) {
        List<Handler> list = this.topicHandlers.get(str2);
        if (list != null) {
            Iterator<Handler> it = list.iterator();
            while (it.hasNext()) {
                it.next().onMessage(str, str2, obj);
            }
        }
    }

    @Override // com.datatorrent.stram.util.PubSubWebSocketClient
    public void onClose(WebSocket webSocket) {
        Iterator<Map.Entry<String, List<Handler>>> it = this.topicHandlers.entrySet().iterator();
        while (it.hasNext()) {
            Iterator<Handler> it2 = it.next().getValue().iterator();
            while (it2.hasNext()) {
                it2.next().onClose();
            }
        }
    }
}
