package org.apache.rocketmq.streams.window.shuffle;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
import org.apache.rocketmq.streams.common.channel.builder.IShuffleChannelBuilder;
import org.apache.rocketmq.streams.common.channel.impl.memory.MemoryCache;
import org.apache.rocketmq.streams.common.channel.impl.memory.MemoryChannel;
import org.apache.rocketmq.streams.common.channel.impl.memory.MemorySink;
import org.apache.rocketmq.streams.common.channel.impl.memory.MemorySource;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.configurable.IConfigurableIdentification;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.interfaces.ISystemMessageProcessor;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
import org.apache.rocketmq.streams.common.utils.ReflectUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent;

/* loaded from: input_file:org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.class */
public abstract class AbstractSystemChannel implements IConfigurableIdentification, ISystemMessageProcessor, IStreamOperator {
    protected static final Log LOG = LogFactory.getLog(AbstractSystemChannel.class);
    protected static final String CHANNEL_PROPERTY_KEY_PREFIX = "CHANNEL_PROPERTY_KEY_PREFIX";
    protected static final String CHANNEL_TYPE = "CHANNEL_TYPE";
    protected ISource consumer;
    protected AbstractSupportShuffleSink producer;
    protected Map<String, String> channelConfig = new HashMap();
    protected boolean hasCreateShuffleChannel = false;

    public void startChannel() {
        if (this.consumer == null) {
            return;
        }
        this.consumer.start(this);
    }

    public void autoCreateShuffleChannel(ChainPipeline chainPipeline) {
        if (this.hasCreateShuffleChannel) {
            return;
        }
        synchronized (this) {
            if (!this.hasCreateShuffleChannel) {
                ISource source = chainPipeline.getSource();
                IShuffleChannelBuilder iShuffleChannelBuilder = (IChannelBuilder) ComponentCreator.getComponent(IChannelBuilder.class.getName(), ServiceLoaderComponent.class).loadService(source.getClass().getSimpleName());
                if (iShuffleChannelBuilder == null) {
                    throw new RuntimeException("can not create shuffle channel, not find channel builder " + source.toJson());
                }
                if (!(iShuffleChannelBuilder instanceof IShuffleChannelBuilder)) {
                    throw new RuntimeException("can not create shuffle channel, builder not impl IShuffleChannelBuilder " + source.toJson());
                }
                IShuffleChannelBuilder iShuffleChannelBuilder2 = iShuffleChannelBuilder;
                MemorySink createBySource = iShuffleChannelBuilder2.createBySource(source);
                createBySource.init();
                if (!(createBySource instanceof MemoryChannel) && !(createBySource instanceof AbstractSupportShuffleSink)) {
                    throw new RuntimeException("can not create shuffle channel, sink not extends AbstractSupportShuffleSink " + source.toJson());
                }
                MemorySource memorySource = null;
                if (createBySource instanceof MemoryChannel) {
                    MemoryCache memoryCache = new MemoryCache();
                    memoryCache.setNameSpace(createShuffleChannelNameSpace(chainPipeline));
                    memoryCache.setConfigureName(createShuffleChannelName(chainPipeline));
                    createBySource = new MemorySink();
                    memorySource = new MemorySource();
                    createBySource.setMemoryCache(memoryCache);
                    memorySource.setMemoryCache(memoryCache);
                    memoryCache.init();
                }
                Properties properties = new Properties();
                putDynamicPropertyValue(new HashSet(), properties);
                AbstractSupportShuffleSink abstractSupportShuffleSink = (AbstractSupportShuffleSink) createBySource;
                abstractSupportShuffleSink.setSplitNum(getShuffleSplitCount(abstractSupportShuffleSink));
                abstractSupportShuffleSink.setNameSpace(createShuffleChannelNameSpace(chainPipeline));
                abstractSupportShuffleSink.setConfigureName(createShuffleChannelName(chainPipeline));
                String shuffleTopicFieldName = abstractSupportShuffleSink.getShuffleTopicFieldName();
                String str = null;
                if (StringUtil.isNotEmpty(shuffleTopicFieldName)) {
                    str = createShuffleTopic((String) ReflectUtil.getDeclaredField(abstractSupportShuffleSink, shuffleTopicFieldName), chainPipeline);
                    ReflectUtil.setBeanFieldValue(abstractSupportShuffleSink, shuffleTopicFieldName, str);
                }
                Iterator it = ReflectUtil.getDeclaredFieldsContainsParentClass(createBySource.getClass()).iterator();
                while (it.hasNext()) {
                    String name = ((Field) it.next()).getName();
                    String property = properties.getProperty(name);
                    if (StringUtil.isNotEmpty(property)) {
                        ReflectUtil.setBeanFieldValue(createBySource, name, property);
                    }
                }
                abstractSupportShuffleSink.setHasInit(false);
                abstractSupportShuffleSink.init();
                if (memorySource == null) {
                    memorySource = iShuffleChannelBuilder2.copy(source);
                }
                Iterator it2 = ReflectUtil.getDeclaredFieldsContainsParentClass(memorySource.getClass()).iterator();
                while (it2.hasNext()) {
                    String name2 = ((Field) it2.next()).getName();
                    String property2 = properties.getProperty(name2);
                    if (StringUtil.isNotEmpty(property2)) {
                        ReflectUtil.setBeanFieldValue(memorySource, name2, property2);
                    }
                }
                memorySource.setNameSpace(createBySource.getNameSpace());
                memorySource.setConfigureName(createBySource.getConfigureName());
                if (str != null && shuffleTopicFieldName != null) {
                    ReflectUtil.setBeanFieldValue(memorySource, shuffleTopicFieldName, str);
                }
                if (AbstractSource.class.isInstance(memorySource)) {
                    ((AbstractSource) memorySource).setHasInit(false);
                }
                memorySource.init();
                this.producer = abstractSupportShuffleSink;
                this.consumer = memorySource;
            }
        }
    }

    protected abstract String createShuffleTopic(String str, ChainPipeline chainPipeline);

    protected abstract int getShuffleSplitCount(AbstractSupportShuffleSink abstractSupportShuffleSink);

    protected abstract String createShuffleChannelName(ChainPipeline chainPipeline);

    protected abstract String createShuffleChannelNameSpace(ChainPipeline chainPipeline);

    protected Map<String, String> getChannelConfig() {
        return this.channelConfig;
    }

    protected abstract String getDynamicPropertyValue();

    /* JADX INFO: Access modifiers changed from: protected */
    public ISource createSource(String str, String str2) {
        IChannelBuilder createBuilder = createBuilder();
        if (createBuilder == null) {
            return null;
        }
        MemorySource createSource = createBuilder.createSource(str, str2, createChannelProperties(str), (MetaData) null);
        if (MemorySource.class.isInstance(createSource)) {
            MemorySource memorySource = createSource;
            MemoryCache memoryCache = new MemoryCache();
            memorySource.setMemoryCache(memoryCache);
            memoryCache.init();
        }
        createSource.init();
        return createSource;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSupportShuffleSink createSink(String str, String str2) {
        IChannelBuilder createBuilder = createBuilder();
        if (createBuilder == null) {
            return null;
        }
        MemorySink createSink = createBuilder.createSink(str, str2, createChannelProperties(str), (MetaData) null);
        if (!AbstractSupportShuffleSink.class.isInstance(createSink)) {
            throw new RuntimeException("can not support shuffle " + createSink.toJson());
        }
        if (MemorySink.class.isInstance(createSink)) {
            MemorySink memorySink = createSink;
            if (!MemorySource.class.isInstance(this.consumer)) {
                throw new RuntimeException("shuffle cosumer need memory, real is " + this.consumer);
            }
            memorySink.setMemoryCache(this.consumer.getMemoryCache());
        }
        createSink.init();
        return (AbstractSupportShuffleSink) createSink;
    }

    protected IChannelBuilder createBuilder() {
        String property = ComponentCreator.getProperties().getProperty(getChannelConfig().get(CHANNEL_TYPE));
        if (StringUtil.isEmpty(property)) {
            return null;
        }
        return (IChannelBuilder) ComponentCreator.getComponent(IChannelBuilder.class.getName(), ServiceLoaderComponent.class).loadService(property);
    }

    protected Properties createChannelProperties(String str) {
        Properties properties = new Properties();
        for (Map.Entry entry : ComponentCreator.getProperties().entrySet()) {
            String str2 = (String) entry.getKey();
            String str3 = (String) entry.getValue();
            if (str2.startsWith(getChannelConfig().get(CHANNEL_PROPERTY_KEY_PREFIX))) {
                String replace = str2.replace(getChannelConfig().get(CHANNEL_PROPERTY_KEY_PREFIX), "");
                if (replace.startsWith(str)) {
                    properties.put(replace.replace(str, ""), str3);
                } else if (!properties.containsKey(replace)) {
                    properties.put(replace, str3);
                }
            }
        }
        Set<String> hashSet = new HashSet<>();
        String property = properties.getProperty("dynamic.property");
        if (property != null) {
            String dynamicPropertyValue = getDynamicPropertyValue();
            for (String str4 : property.split(",")) {
                properties.put(str4, dynamicPropertyValue);
                hashSet.add(str4);
            }
        }
        putDynamicPropertyValue(hashSet, properties);
        return properties;
    }

    protected void putDynamicPropertyValue(Set<String> set, Properties properties) {
    }

    public ISource getConsumer() {
        return this.consumer;
    }

    public ISink getProducer() {
        return this.producer;
    }

    public void sendMessage(IMessage iMessage) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(iMessage);
        this.producer.batchSave(arrayList);
        this.producer.flush();
    }
}
