package net.jacobpeterson.alpaca.websocket.streaming;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import net.jacobpeterson.alpaca.model.endpoint.streaming.StreamingMessage;
import net.jacobpeterson.alpaca.model.endpoint.streaming.authorization.AuthorizationData;
import net.jacobpeterson.alpaca.model.endpoint.streaming.authorization.AuthorizationMessage;
import net.jacobpeterson.alpaca.model.endpoint.streaming.enums.StreamingMessageType;
import net.jacobpeterson.alpaca.model.endpoint.streaming.listening.ListeningMessage;
import net.jacobpeterson.alpaca.model.endpoint.streaming.trade.TradeUpdateMessage;
import net.jacobpeterson.alpaca.util.gson.GsonUtil;
import net.jacobpeterson.alpaca.websocket.AlpacaWebsocket;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.WebSocket;
import okio.ByteString;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/jacobpeterson/alpaca/websocket/streaming/StreamingWebsocket.class */
public class StreamingWebsocket extends AlpacaWebsocket<StreamingMessageType, StreamingMessage, StreamingListener> implements StreamingWebsocketInterface {
    private static final String STREAM_ELEMENT_KEY = "stream";
    private final Set<StreamingMessageType> listenedStreamMessageTypes;
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamingWebsocket.class);
    private static final List<StreamingMessageType> SUBSCRIBABLE_STREAMING_MESSAGE_TYPES = Collections.singletonList(StreamingMessageType.TRADE_UPDATES);

    private static HttpUrl createWebsocketURL(String str) {
        return new HttpUrl.Builder().scheme("https").host(str + ".alpaca.markets").addPathSegment(STREAM_ELEMENT_KEY).build();
    }

    public StreamingWebsocket(OkHttpClient okHttpClient, String str, String str2, String str3, String str4) {
        super(okHttpClient, createWebsocketURL(str), "Streaming", str2, str3, str4);
        this.listenedStreamMessageTypes = new HashSet();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // net.jacobpeterson.alpaca.websocket.AlpacaWebsocket
    public void cleanupState() {
        super.cleanupState();
        this.listenedStreamMessageTypes.clear();
    }

    @Override // net.jacobpeterson.alpaca.websocket.AlpacaWebsocket
    protected void onConnection() {
        sendAuthenticationMessage();
    }

    @Override // net.jacobpeterson.alpaca.websocket.AlpacaWebsocket
    protected void onReconnection() {
        sendAuthenticationMessage();
        if (waitForAuthorization()) {
            streams((StreamingMessageType[]) Iterables.toArray(this.listenedStreamMessageTypes, StreamingMessageType.class));
        }
    }

    @Override // net.jacobpeterson.alpaca.websocket.AlpacaWebsocket
    protected void sendAuthenticationMessage() {
        getAuthorizationFuture();
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("action", "authenticate");
        JsonObject jsonObject2 = new JsonObject();
        if (this.useOAuth) {
            jsonObject2.addProperty("oauth_token", this.oAuthToken);
        } else {
            jsonObject2.addProperty("key_id", this.keyID);
            jsonObject2.addProperty("secret_key", this.secretKey);
        }
        jsonObject.add("data", jsonObject2);
        LOGGER.info("{} websocket sending authentication message...", this.websocketName);
        this.websocket.send(jsonObject.toString());
    }

    public void onMessage(@NotNull WebSocket webSocket, @NotNull ByteString byteString) {
        StreamingMessage streamingMessage;
        JsonElement parseString = JsonParser.parseString(byteString.utf8());
        Preconditions.checkState(parseString instanceof JsonObject, "Message must be a JsonObject! Received: %s", parseString);
        JsonObject asJsonObject = parseString.getAsJsonObject();
        JsonElement jsonElement = asJsonObject.get(STREAM_ELEMENT_KEY);
        Preconditions.checkState(jsonElement instanceof JsonPrimitive, "Message must contain %s element! Received: %s", STREAM_ELEMENT_KEY, parseString);
        StreamingMessageType streamingMessageType = (StreamingMessageType) GsonUtil.GSON.fromJson(jsonElement, StreamingMessageType.class);
        Preconditions.checkNotNull(streamingMessageType, "StreamingMessageType not found in message: %s", asJsonObject);
        switch (streamingMessageType) {
            case AUTHORIZATION:
                streamingMessage = (StreamingMessage) GsonUtil.GSON.fromJson(asJsonObject, AuthorizationMessage.class);
                this.authenticated = isAuthorizationMessageSuccess((AuthorizationMessage) streamingMessage);
                if (this.authenticated) {
                    LOGGER.info("{} websocket authenticated.", this.websocketName);
                    LOGGER.debug("{}", streamingMessage);
                } else {
                    LOGGER.error("{} websocket not authenticated! Received: {}.", this.websocketName, streamingMessage);
                }
                if (this.authenticationMessageFuture != null) {
                    this.authenticationMessageFuture.complete(Boolean.valueOf(this.authenticated));
                    break;
                }
                break;
            case LISTENING:
                streamingMessage = (StreamingMessage) GsonUtil.GSON.fromJson(asJsonObject, ListeningMessage.class);
                LOGGER.debug("{}", streamingMessage);
                ArrayList<StreamingMessageType> streams = ((ListeningMessage) streamingMessage).getData().getStreams();
                Stream<StreamingMessageType> stream = streams.stream();
                Set<StreamingMessageType> set = this.listenedStreamMessageTypes;
                set.getClass();
                Stream<StreamingMessageType> filter = stream.filter(Predicates.not((v1) -> {
                    return r1.contains(v1);
                }));
                Set<StreamingMessageType> set2 = this.listenedStreamMessageTypes;
                set2.getClass();
                filter.forEach((v1) -> {
                    r1.remove(v1);
                });
                this.listenedStreamMessageTypes.addAll(streams);
                break;
            case TRADE_UPDATES:
                streamingMessage = (StreamingMessage) GsonUtil.GSON.fromJson(asJsonObject, TradeUpdateMessage.class);
                LOGGER.debug("{}", streamingMessage);
                break;
            default:
                throw new UnsupportedOperationException();
        }
        if (this.listenedStreamMessageTypes.contains(streamingMessageType)) {
            callListener(streamingMessageType, streamingMessage);
        }
    }

    private boolean isAuthorizationMessageSuccess(AuthorizationMessage authorizationMessage) {
        AuthorizationData data = authorizationMessage.getData();
        return data.getStatus().equalsIgnoreCase("authorized") && data.getAction().equalsIgnoreCase("authenticate");
    }

    @Override // net.jacobpeterson.alpaca.websocket.streaming.StreamingWebsocketInterface
    public void streams(StreamingMessageType... streamingMessageTypeArr) {
        if (streamingMessageTypeArr == null || streamingMessageTypeArr.length == 0) {
            return;
        }
        Stream stream = Arrays.stream(streamingMessageTypeArr);
        List<StreamingMessageType> list = SUBSCRIBABLE_STREAMING_MESSAGE_TYPES;
        list.getClass();
        Stream filter = stream.filter(Predicates.not((v1) -> {
            return r1.contains(v1);
        }));
        Set<StreamingMessageType> set = this.listenedStreamMessageTypes;
        set.getClass();
        filter.forEach((v1) -> {
            r1.add(v1);
        });
        if (!isConnected()) {
            connect();
            if (!waitForAuthorization()) {
                LOGGER.error("Not subscribing to streams due to unauthorized {} websocket.", this.websocketName);
                return;
            }
        }
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("action", "listen");
        JsonArray jsonArray = new JsonArray();
        Stream stream2 = Arrays.stream(streamingMessageTypeArr);
        List<StreamingMessageType> list2 = SUBSCRIBABLE_STREAMING_MESSAGE_TYPES;
        list2.getClass();
        stream2.filter((v1) -> {
            return r1.contains(v1);
        }).forEach(streamingMessageType -> {
            jsonArray.add(streamingMessageType.toString());
        });
        if (jsonArray.isEmpty()) {
            return;
        }
        JsonObject jsonObject2 = new JsonObject();
        jsonObject2.add("streams", jsonArray);
        jsonObject.add("data", jsonObject2);
        this.websocket.send(jsonObject.toString());
        LOGGER.info("Requested streams: {}.", jsonArray);
    }

    @Override // net.jacobpeterson.alpaca.websocket.streaming.StreamingWebsocketInterface
    public Collection<StreamingMessageType> streams() {
        return new HashSet(this.listenedStreamMessageTypes);
    }
}
