package com.github.rishabh9.riko.upstox.websockets;

import com.github.rishabh9.riko.upstox.websockets.exceptions.SlowSubscriberException;
import com.github.rishabh9.riko.upstox.websockets.messages.BinaryMessage;
import com.github.rishabh9.riko.upstox.websockets.messages.ClosingMessage;
import com.github.rishabh9.riko.upstox.websockets.messages.ConnectedMessage;
import com.github.rishabh9.riko.upstox.websockets.messages.DisconnectedMessage;
import com.github.rishabh9.riko.upstox.websockets.messages.ErrorMessage;
import com.github.rishabh9.riko.upstox.websockets.messages.TextMessage;
import com.github.rishabh9.riko.upstox.websockets.messages.WebSocketMessage;
import com.github.rishabh9.riko.upstox.websockets.models.WrappedWebSocket;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/github/rishabh9/riko/upstox/websockets/MessageListener.class */
public final class MessageListener extends WebSocketListener implements Flow.Publisher<WebSocketMessage> {
    private static final Logger log = LogManager.getLogger(MessageListener.class);
    private static final int MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE = 3;
    private final SubmissionPublisher<WebSocketMessage> publisher = new SubmissionPublisher<>(Executors.newWorkStealingPool(), Flow.defaultBufferSize());

    public MessageListener(@Nonnull List<MessageSubscriber> list) {
        List list2 = (List) Objects.requireNonNull(list);
        SubmissionPublisher<WebSocketMessage> submissionPublisher = this.publisher;
        Objects.requireNonNull(submissionPublisher);
        list2.forEach((v1) -> {
            r1.subscribe(v1);
        });
    }

    private void publishMessage(WebSocketMessage webSocketMessage) {
        int offer = this.publisher.offer(webSocketMessage, 3L, TimeUnit.SECONDS, (subscriber, webSocketMessage2) -> {
            subscriber.onError(new SlowSubscriberException("Subscriber " + ((MessageSubscriber) subscriber).getName() + " is slow in receiving messages. Dropping message: " + webSocketMessage2.toString()));
            return false;
        });
        if (offer < 0) {
            log.warn("Dropping {} messages", Integer.valueOf(-offer));
        } else {
            log.debug("The slowest consumer has {} messages in total to be picked up.", Integer.valueOf(offer));
        }
    }

    public void onOpen(WebSocket webSocket, Response response) {
        super.onOpen(webSocket, response);
        publishMessage(new ConnectedMessage(new WrappedWebSocket(webSocket), response.message()));
    }

    public void onMessage(WebSocket webSocket, String str) {
        super.onMessage(webSocket, str);
        publishMessage(new TextMessage(new WrappedWebSocket(webSocket), str));
    }

    public void onMessage(WebSocket webSocket, ByteString byteString) {
        super.onMessage(webSocket, byteString);
        publishMessage(new BinaryMessage(new WrappedWebSocket(webSocket), byteString));
    }

    public void onClosing(WebSocket webSocket, int i, String str) {
        super.onClosing(webSocket, i, str);
        publishMessage(new ClosingMessage(new WrappedWebSocket(webSocket), i, str));
    }

    public void onClosed(WebSocket webSocket, int i, String str) {
        super.onClosed(webSocket, i, str);
        publishMessage(new DisconnectedMessage(new WrappedWebSocket(webSocket), i, str));
        this.publisher.close();
    }

    public void onFailure(WebSocket webSocket, Throwable th, @Nullable Response response) {
        super.onFailure(webSocket, th, response);
        publishMessage(new ErrorMessage(new WrappedWebSocket(webSocket), th, response));
        this.publisher.closeExceptionally(th);
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super WebSocketMessage> subscriber) {
        this.publisher.subscribe(subscriber);
    }
}
