package org.apache.shenyu.protocol.mqtt;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.util.CharsetUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.shenyu.common.utils.Singleton;
import org.apache.shenyu.protocol.mqtt.repositories.TopicRepository;

/* loaded from: input_file:org/apache/shenyu/protocol/mqtt/Subscribe.class */
public class Subscribe extends MessageType {
    @Override // org.apache.shenyu.protocol.mqtt.MessageType, org.apache.shenyu.protocol.mqtt.AbstractMessageType
    public void subscribe(ChannelHandlerContext channelHandlerContext, MqttSubscribeMessage mqttSubscribeMessage) {
        Channel channel = channelHandlerContext.channel();
        if (isConnected()) {
            channel.close().addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
            return;
        }
        List list = mqttSubscribeMessage.payload().topicSubscriptions();
        int messageId = mqttSubscribeMessage.variableHeader().messageId();
        List<String> list2 = (List) list.stream().filter(mqttTopicSubscription -> {
            return mqttTopicSubscription.qualityOfService() != MqttQoS.FAILURE;
        }).map((v0) -> {
            return v0.topicName();
        }).collect(Collectors.toList());
        for (String str : list2) {
            sendSubMessage(str, ((TopicRepository) Singleton.INST.get(TopicRepository.class)).get(str), messageId, channel);
        }
        sendSubAckMessage(messageId, list2, channel);
    }

    private void sendSubAckMessage(int i, List<String> list, Channel channel) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < list.size(); i2++) {
            arrayList.add(Integer.valueOf(MqttQoS.AT_MOST_ONCE.value()));
        }
        channel.writeAndFlush(new MqttSubAckMessage(new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(i), new MqttSubAckPayload(arrayList)));
    }

    private void sendSubMessage(String str, String str2, int i, Channel channel) {
        channel.writeAndFlush(new MqttPublishMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, true, 0), new MqttPublishVariableHeader(str, i), Unpooled.copiedBuffer(str2, CharsetUtil.UTF_8)));
    }
}
