package net.jacobpeterson.alpaca.websocket.marketdata;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import net.jacobpeterson.alpaca.model.endpoint.marketdata.realtime.MarketDataMessage;
import net.jacobpeterson.alpaca.model.endpoint.marketdata.realtime.bar.BarMessage;
import net.jacobpeterson.alpaca.model.endpoint.marketdata.realtime.control.ErrorMessage;
import net.jacobpeterson.alpaca.model.endpoint.marketdata.realtime.control.SubscriptionsMessage;
import net.jacobpeterson.alpaca.model.endpoint.marketdata.realtime.control.SuccessMessage;
import net.jacobpeterson.alpaca.model.endpoint.marketdata.realtime.enums.MarketDataMessageType;
import net.jacobpeterson.alpaca.model.endpoint.marketdata.realtime.quote.QuoteMessage;
import net.jacobpeterson.alpaca.model.endpoint.marketdata.realtime.trade.TradeMessage;
import net.jacobpeterson.alpaca.model.properties.DataAPIType;
import net.jacobpeterson.alpaca.util.gson.GsonUtil;
import net.jacobpeterson.alpaca.websocket.AlpacaWebsocket;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.WebSocket;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/jacobpeterson/alpaca/websocket/marketdata/MarketDataWebsocket.class */
public class MarketDataWebsocket extends AlpacaWebsocket<MarketDataMessageType, MarketDataMessage, MarketDataListener> implements MarketDataWebsocketInterface {
    private static final String MESSAGE_TYPE_ELEMENT_KEY = "T";
    private final Set<MarketDataMessageType> listenedMarketDataMessageTypes;
    private final Set<String> subscribedTrades;
    private final Set<String> subscribedQuotes;
    private final Set<String> subscribedBars;
    private static final Logger LOGGER = LoggerFactory.getLogger(MarketDataWebsocket.class);
    private static final List<String> AUTH_FAILURE_MESSAGES = Arrays.asList("auth failed", "auth timeout", "not authenticated");
    private static final List<MarketDataMessageType> SUBSCRIBABLE_MARKET_DATA_MESSAGE_TYPES = Arrays.asList(MarketDataMessageType.TRADE, MarketDataMessageType.QUOTE, MarketDataMessageType.BAR);

    private static HttpUrl createWebsocketURL(DataAPIType dataAPIType) {
        return new HttpUrl.Builder().scheme("https").host("stream.data.alpaca.markets").addPathSegment("v2").addPathSegment(dataAPIType.toString()).build();
    }

    public MarketDataWebsocket(OkHttpClient okHttpClient, DataAPIType dataAPIType, String str, String str2) {
        super(okHttpClient, createWebsocketURL(dataAPIType), "Market Data", str, str2, null);
        this.listenedMarketDataMessageTypes = new HashSet();
        this.subscribedTrades = new HashSet();
        this.subscribedQuotes = new HashSet();
        this.subscribedBars = new HashSet();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // net.jacobpeterson.alpaca.websocket.AlpacaWebsocket
    public void cleanupState() {
        super.cleanupState();
        this.listenedMarketDataMessageTypes.clear();
    }

    @Override // net.jacobpeterson.alpaca.websocket.AlpacaWebsocket
    protected void onConnection() {
        sendAuthenticationMessage();
    }

    @Override // net.jacobpeterson.alpaca.websocket.AlpacaWebsocket
    protected void onReconnection() {
        sendAuthenticationMessage();
        if (waitForAuthorization()) {
            subscribeToControl((MarketDataMessageType[]) Iterables.toArray(this.listenedMarketDataMessageTypes, MarketDataMessageType.class));
            subscribe(this.subscribedTrades, this.subscribedQuotes, this.subscribedBars);
        }
    }

    @Override // net.jacobpeterson.alpaca.websocket.AlpacaWebsocket
    protected void sendAuthenticationMessage() {
        getAuthorizationFuture();
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("action", "auth");
        jsonObject.addProperty("key", this.keyID);
        jsonObject.addProperty("secret", this.secretKey);
        LOGGER.info("{} websocket sending authentication message...", this.websocketName);
        this.websocket.send(jsonObject.toString());
    }

    public void onMessage(@NotNull WebSocket webSocket, @NotNull String str) {
        MarketDataMessage marketDataMessage;
        JsonElement parseString = JsonParser.parseString(str);
        Preconditions.checkState(parseString instanceof JsonArray, "Message must be a JsonArray! Received: %s", parseString);
        Iterator it = parseString.getAsJsonArray().iterator();
        while (it.hasNext()) {
            JsonElement jsonElement = (JsonElement) it.next();
            Preconditions.checkState(jsonElement instanceof JsonObject, "'arrayElement' must be a JsonObject! Received: %s", jsonElement);
            JsonObject asJsonObject = jsonElement.getAsJsonObject();
            MarketDataMessageType marketDataMessageType = (MarketDataMessageType) GsonUtil.GSON.fromJson(asJsonObject.get(MESSAGE_TYPE_ELEMENT_KEY), MarketDataMessageType.class);
            Preconditions.checkNotNull(marketDataMessageType, "MarketDataMessageType not found in message: %s", asJsonObject);
            switch (marketDataMessageType) {
                case SUCCESS:
                    marketDataMessage = (MarketDataMessage) GsonUtil.GSON.fromJson(asJsonObject, SuccessMessage.class);
                    LOGGER.debug("{}", marketDataMessage);
                    if (isSuccessMessageAuthenticated((SuccessMessage) marketDataMessage)) {
                        LOGGER.info("{} websocket authenticated.", this.websocketName);
                        this.authenticated = true;
                        if (this.authenticationMessageFuture != null) {
                            this.authenticationMessageFuture.complete(true);
                            break;
                        }
                    }
                    break;
                case ERROR:
                    marketDataMessage = (MarketDataMessage) GsonUtil.GSON.fromJson(asJsonObject, ErrorMessage.class);
                    if (isErrorMessageAuthFailure((ErrorMessage) marketDataMessage) && this.authenticationMessageFuture != null) {
                        LOGGER.error("{} websocket not authenticated! Received: {}.", this.websocketName, marketDataMessage);
                        this.authenticated = false;
                        if (this.authenticationMessageFuture != null) {
                            this.authenticationMessageFuture.complete(false);
                            break;
                        }
                    } else {
                        LOGGER.error("{} websocket error message: {}", this.websocketName, marketDataMessage);
                        break;
                    }
                    break;
                case SUBSCRIPTION:
                    marketDataMessage = (MarketDataMessage) GsonUtil.GSON.fromJson(asJsonObject, SubscriptionsMessage.class);
                    LOGGER.debug("{}", marketDataMessage);
                    SubscriptionsMessage subscriptionsMessage = (SubscriptionsMessage) marketDataMessage;
                    handleSubscriptionMessageList(MarketDataMessageType.TRADE, subscriptionsMessage.getTrades(), this.subscribedTrades);
                    handleSubscriptionMessageList(MarketDataMessageType.QUOTE, subscriptionsMessage.getQuotes(), this.subscribedQuotes);
                    handleSubscriptionMessageList(MarketDataMessageType.BAR, subscriptionsMessage.getBars(), this.subscribedBars);
                    break;
                case TRADE:
                    marketDataMessage = (MarketDataMessage) GsonUtil.GSON.fromJson(asJsonObject, TradeMessage.class);
                    break;
                case QUOTE:
                    marketDataMessage = (MarketDataMessage) GsonUtil.GSON.fromJson(asJsonObject, QuoteMessage.class);
                    break;
                case BAR:
                    marketDataMessage = (MarketDataMessage) GsonUtil.GSON.fromJson(asJsonObject, BarMessage.class);
                    break;
                default:
                    LOGGER.error("Message type {} not implemented!", marketDataMessageType);
                    continue;
            }
            if (this.listenedMarketDataMessageTypes.contains(marketDataMessageType)) {
                callListener(marketDataMessageType, marketDataMessage);
            }
        }
    }

    private boolean isSuccessMessageAuthenticated(SuccessMessage successMessage) {
        return successMessage.getMessage().equalsIgnoreCase("authenticated");
    }

    private void handleSubscriptionMessageList(MarketDataMessageType marketDataMessageType, Collection<String> collection, Set<String> set) {
        if (collection == null || collection.isEmpty()) {
            this.listenedMarketDataMessageTypes.remove(marketDataMessageType);
            set.clear();
        } else {
            this.listenedMarketDataMessageTypes.add(marketDataMessageType);
            set.clear();
            set.addAll(collection);
        }
    }

    private boolean isErrorMessageAuthFailure(ErrorMessage errorMessage) {
        return AUTH_FAILURE_MESSAGES.contains(errorMessage.getMessage().toLowerCase());
    }

    @Override // net.jacobpeterson.alpaca.websocket.marketdata.MarketDataWebsocketInterface
    public void subscribeToControl(MarketDataMessageType... marketDataMessageTypeArr) {
        if (marketDataMessageTypeArr == null) {
            return;
        }
        Stream stream = Arrays.stream(marketDataMessageTypeArr);
        List<MarketDataMessageType> list = SUBSCRIBABLE_MARKET_DATA_MESSAGE_TYPES;
        list.getClass();
        Stream filter = stream.filter(Predicates.not((v1) -> {
            return r1.contains(v1);
        }));
        Set<MarketDataMessageType> set = this.listenedMarketDataMessageTypes;
        set.getClass();
        filter.forEach((v1) -> {
            r1.add(v1);
        });
    }

    @Override // net.jacobpeterson.alpaca.websocket.marketdata.MarketDataWebsocketInterface
    public void subscribe(Collection<String> collection, Collection<String> collection2, Collection<String> collection3) {
        sendSubscriptionUpdate(collection, collection2, collection3, true);
    }

    @Override // net.jacobpeterson.alpaca.websocket.marketdata.MarketDataWebsocketInterface
    public void unsubscribe(Collection<String> collection, Collection<String> collection2, Collection<String> collection3) {
        sendSubscriptionUpdate(collection, collection2, collection3, false);
    }

    private void sendSubscriptionUpdate(Collection<String> collection, Collection<String> collection2, Collection<String> collection3, boolean z) {
        if (!isConnected()) {
            connect();
            if (!waitForAuthorization()) {
                LOGGER.error("Not subscribing to streams due to unauthorized {} websocket.", this.websocketName);
                return;
            }
        }
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("action", z ? "subscribe" : "unsubscribe");
        addSubscriptionUpdateList(jsonObject, "trades", collection);
        addSubscriptionUpdateList(jsonObject, "quotes", collection2);
        addSubscriptionUpdateList(jsonObject, "bars", collection3);
        if (jsonObject.size() > 1) {
            this.websocket.send(jsonObject.toString());
            LOGGER.info("Requested subscriptions update: {}.", jsonObject);
        }
    }

    private void addSubscriptionUpdateList(JsonObject jsonObject, String str, Collection<String> collection) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        JsonArray jsonArray = new JsonArray();
        jsonArray.getClass();
        collection.forEach(jsonArray::add);
        jsonObject.add(str, jsonArray);
    }

    @Override // net.jacobpeterson.alpaca.websocket.marketdata.MarketDataWebsocketInterface
    public Collection<MarketDataMessageType> subscribedControls() {
        return new HashSet(this.listenedMarketDataMessageTypes);
    }

    @Override // net.jacobpeterson.alpaca.websocket.marketdata.MarketDataWebsocketInterface
    public Collection<String> subscribedTrades() {
        return new HashSet(this.subscribedTrades);
    }

    @Override // net.jacobpeterson.alpaca.websocket.marketdata.MarketDataWebsocketInterface
    public Collection<String> subscribedQuotes() {
        return new HashSet(this.subscribedQuotes);
    }

    @Override // net.jacobpeterson.alpaca.websocket.marketdata.MarketDataWebsocketInterface
    public Collection<String> subscribedBars() {
        return new HashSet(this.subscribedBars);
    }
}
