/*
 * Decompiled with CFR 0.152.
 */
package tech.guyi.component.message.stream.websocket;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Resource;
import lombok.NonNull;
import org.java_websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.guyi.component.message.stream.api.stream.MessageStream;
import tech.guyi.component.message.stream.api.stream.entry.Message;
import tech.guyi.component.message.stream.api.worker.MessageStreamWorker;
import tech.guyi.component.message.stream.websocket.WebSocketConfiguration;
import tech.guyi.component.message.stream.websocket.connection.WebsocketConnection;
import tech.guyi.component.message.stream.websocket.exception.ConnectionNotReadyException;
import tech.guyi.component.message.stream.websocket.executor.WebSocketServerExecutors;
import tech.guyi.component.message.stream.websocket.topic.TopicHandlerFactory;

public class WebSocketMessageStream
implements MessageStream {
    private static final Logger log = LoggerFactory.getLogger(WebSocketMessageStream.class);
    @Resource
    private TopicHandlerFactory factory;
    @Resource
    private WebSocketConfiguration configuration;
    @Resource
    private WebSocketServerExecutors executors;
    @Resource
    private MessageStreamWorker worker;
    private boolean run;
    private ScheduledFuture<?> future;
    private WebsocketConnection connection;

    private WebsocketConnection createConnection(Consumer<Message> receiver) throws URISyntaxException {
        return new WebsocketConnection(new URI(this.executors.replace(this.configuration.getServer())), origin -> {
            byte[] bytes = origin.getBytes(StandardCharsets.UTF_8);
            receiver.accept(new Message(this.factory.get().getTopic(bytes), bytes));
        }, e -> {
            this.run = true;
            Optional.ofNullable(this.future).ifPresent(future -> future.cancel(true));
            log.info("Websocket\u8fde\u63a5\u5efa\u7acb");
        }, () -> this.reconnect(receiver), e -> {
            log.error("WebSocket\u8fde\u63a5\u5f02\u5e38", (Throwable)e);
            this.reconnect(receiver);
        });
    }

    private void reconnect(Consumer<Message> receiver) {
        if (this.run) {
            this.run = false;
            this.future = this.worker.scheduleWithFixedDelay(() -> this.connect(receiver), 0L, 10L, TimeUnit.SECONDS);
        }
    }

    private void connect(Consumer<Message> receiver) {
        if (this.connection != null && this.connection.isOpen()) {
            this.connection.close();
        }
        this.connection = this.createConnection(receiver);
        this.connection.connectBlocking();
    }

    @NonNull
    public String getName() {
        return "websocket";
    }

    public void close() {
        this.run = false;
        Optional.ofNullable(this.connection).ifPresent(WebSocketClient::close);
    }

    public void register(String topic) {
    }

    public void unregister(String topic) {
    }

    public void open(Consumer<Message> receiver) {
        this.connect(receiver);
    }

    public void publish(Message message) {
        if (!this.connection.isOpen()) {
            throw new ConnectionNotReadyException();
        }
        this.connection.send(this.factory.get().setTopic(message.getTopic(), message.getBytes()));
    }
}

