package org.apache.eventmesh.client.tcp.impl;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.eventmesh.client.tcp.SimpleSubClient;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.common.MessageUtils;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.common.RequestContext;
import org.apache.eventmesh.client.tcp.common.TcpClient;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.class */
public class SimpleSubClientImpl extends TcpClient implements SimpleSubClient {
    private Logger logger;
    private UserAgent userAgent;
    private ReceiveMsgHook callback;
    private List<SubscriptionItem> subscriptionItems;
    private ScheduledFuture<?> task;

    @ChannelHandler.Sharable
    /* loaded from: input_file:org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl$Handler.class */
    private class Handler extends SimpleChannelInboundHandler<Package> {
        private Handler() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void channelRead0(ChannelHandlerContext channelHandlerContext, Package r7) throws Exception {
            Command command = r7.getHeader().getCommand();
            SimpleSubClientImpl.this.logger.info(SimpleSubClientImpl.class.getSimpleName() + "|receive|type={}|msg={}", command, r7);
            if (command == Command.REQUEST_TO_CLIENT) {
                if (SimpleSubClientImpl.this.callback != null) {
                    SimpleSubClientImpl.this.callback.handle(r7, channelHandlerContext);
                }
                SimpleSubClientImpl.this.send(MessageUtils.requestToClientAck(r7));
            } else if (command == Command.ASYNC_MESSAGE_TO_CLIENT) {
                Package asyncMessageAck = MessageUtils.asyncMessageAck(r7);
                if (SimpleSubClientImpl.this.callback != null) {
                    SimpleSubClientImpl.this.callback.handle(r7, channelHandlerContext);
                }
                SimpleSubClientImpl.this.send(asyncMessageAck);
            } else if (command == Command.BROADCAST_MESSAGE_TO_CLIENT) {
                Package broadcastMessageAck = MessageUtils.broadcastMessageAck(r7);
                if (SimpleSubClientImpl.this.callback != null) {
                    SimpleSubClientImpl.this.callback.handle(r7, channelHandlerContext);
                }
                SimpleSubClientImpl.this.send(broadcastMessageAck);
            } else if (command != Command.SERVER_GOODBYE_REQUEST) {
                SimpleSubClientImpl.this.logger.error("msg ignored|{}|{}", command, r7);
            }
            RequestContext requestContext = (RequestContext) SimpleSubClientImpl.this.contexts.get(RequestContext._key(r7));
            if (requestContext == null) {
                SimpleSubClientImpl.this.logger.error("msg ignored,context not found.|{}|{}", command, r7);
            } else {
                SimpleSubClientImpl.this.contexts.remove(requestContext.getKey());
                requestContext.finish(r7);
            }
        }
    }

    public SimpleSubClientImpl(String str, int i, UserAgent userAgent) {
        super(str, i);
        this.logger = LoggerFactory.getLogger(getClass());
        this.subscriptionItems = new ArrayList();
        this.userAgent = userAgent;
    }

    @Override // org.apache.eventmesh.client.tcp.SimpleSubClient
    public void registerBusiHandler(ReceiveMsgHook receiveMsgHook) throws Exception {
        this.callback = receiveMsgHook;
    }

    @Override // org.apache.eventmesh.client.tcp.SimpleSubClient
    public void init() throws Exception {
        open(new Handler());
        hello();
        this.logger.info("SimpleSubClientImpl|{}|started!", Integer.valueOf(this.clientNo));
    }

    @Override // org.apache.eventmesh.client.tcp.common.TcpClient, org.apache.eventmesh.client.tcp.SimplePubClient
    public void reconnect() throws Exception {
        super.reconnect();
        hello();
        if (!CollectionUtils.isEmpty(this.subscriptionItems)) {
            for (SubscriptionItem subscriptionItem : this.subscriptionItems) {
                io(MessageUtils.subscribe(subscriptionItem.getTopic(), subscriptionItem.getMode(), subscriptionItem.getType()), EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
            }
        }
        listen();
    }

    @Override // org.apache.eventmesh.client.tcp.common.TcpClient, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.task.cancel(false);
            goodbye();
            super.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override // org.apache.eventmesh.client.tcp.SimpleSubClient
    public void heartbeat() throws Exception {
        this.task = scheduler.scheduleAtFixedRate(new Runnable() { // from class: org.apache.eventmesh.client.tcp.impl.SimpleSubClientImpl.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (!SimpleSubClientImpl.this.isActive()) {
                        SimpleSubClientImpl.this.reconnect();
                    }
                    SimpleSubClientImpl.this.io(MessageUtils.heartBeat(), EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
                } catch (Exception e) {
                }
            }
        }, EventMeshCommon.HEATBEAT, EventMeshCommon.HEATBEAT, TimeUnit.MILLISECONDS);
    }

    private void goodbye() throws Exception {
        io(MessageUtils.goodbye(), EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
    }

    private void hello() throws Exception {
        io(MessageUtils.hello(this.userAgent), EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
    }

    @Override // org.apache.eventmesh.client.tcp.SimpleSubClient
    public void listen() throws Exception {
        io(MessageUtils.listen(), EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
    }

    @Override // org.apache.eventmesh.client.tcp.SimpleSubClient
    public void subscribe(String str, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
        this.subscriptionItems.add(new SubscriptionItem(str, subscriptionMode, subcriptionType));
        io(MessageUtils.subscribe(str, subscriptionMode, subcriptionType), EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
    }

    @Override // org.apache.eventmesh.client.tcp.SimpleSubClient
    public void unsubscribe() throws Exception {
        io(MessageUtils.unsubscribe(), EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
    }

    @Override // org.apache.eventmesh.client.tcp.SimpleSubClient
    public UserAgent getUserAgent() {
        return this.userAgent;
    }

    public String toString() {
        return "SimpleSubClientImpl|clientNo=" + this.clientNo + "|" + this.userAgent;
    }
}
