package org.apache.kafka.test;

import java.io.File;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.junit.rules.ExternalResource;

/* loaded from: input_file:org/apache/kafka/test/KStreamTestDriver.class */
public class KStreamTestDriver extends ExternalResource {
    private static final long DEFAULT_CACHE_SIZE_BYTES = 1048576;
    private ProcessorTopology topology;
    private MockProcessorContext context;
    private ProcessorTopology globalTopology;
    private final LogContext logContext = new LogContext("testCache ");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/test/KStreamTestDriver$MockRecordCollector.class */
    public class MockRecordCollector extends RecordCollectorImpl {
        MockRecordCollector() {
            super((Producer) null, "KStreamTestDriver", new LogContext("KStreamTestDriver "));
        }

        public <K, V> void send(String str, K k, V v, Long l, Serializer<K> serializer, Serializer<V> serializer2, StreamPartitioner<? super K, ? super V> streamPartitioner) {
            if (KStreamTestDriver.this.sourceNodeByTopicName(str) != null) {
                KStreamTestDriver.this.process(str, k, v);
            }
        }

        public <K, V> void send(String str, K k, V v, Integer num, Long l, Serializer<K> serializer, Serializer<V> serializer2) {
            if (KStreamTestDriver.this.sourceNodeByTopicName(str) != null) {
                KStreamTestDriver.this.process(str, k, v);
            }
        }

        public void flush() {
        }

        public void close() {
        }
    }

    @Deprecated
    public void setUp(KStreamBuilder kStreamBuilder) {
        setUp(kStreamBuilder, (File) null, Serdes.ByteArray(), Serdes.ByteArray());
    }

    @Deprecated
    public void setUp(KStreamBuilder kStreamBuilder, File file) {
        setUp(kStreamBuilder, file, Serdes.ByteArray(), Serdes.ByteArray());
    }

    @Deprecated
    public void setUp(KStreamBuilder kStreamBuilder, File file, long j) {
        setUp(kStreamBuilder, file, Serdes.ByteArray(), Serdes.ByteArray(), j);
    }

    @Deprecated
    public void setUp(KStreamBuilder kStreamBuilder, File file, Serde<?> serde, Serde<?> serde2) {
        setUp(kStreamBuilder, file, serde, serde2, DEFAULT_CACHE_SIZE_BYTES);
    }

    @Deprecated
    public void setUp(KStreamBuilder kStreamBuilder, File file, Serde<?> serde, Serde<?> serde2, long j) {
        kStreamBuilder.setApplicationId("TestDriver");
        this.topology = kStreamBuilder.build((Integer) null);
        this.globalTopology = kStreamBuilder.buildGlobalStateTopology();
        this.context = new MockProcessorContext(file, serde, serde2, new MockRecordCollector(), new ThreadCache(this.logContext, j, new MockStreamsMetrics(new Metrics())));
        this.context.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, "topic"));
        if (this.globalTopology != null) {
            initTopology(this.globalTopology, this.globalTopology.globalStateStores());
        }
        initTopology(this.topology, this.topology.stateStores());
    }

    public void setUp(StreamsBuilder streamsBuilder) {
        setUp(streamsBuilder, (File) null, Serdes.ByteArray(), Serdes.ByteArray());
    }

    public void setUp(StreamsBuilder streamsBuilder, File file) {
        setUp(streamsBuilder, file, Serdes.ByteArray(), Serdes.ByteArray());
    }

    public void setUp(StreamsBuilder streamsBuilder, File file, long j) {
        setUp(streamsBuilder, file, Serdes.ByteArray(), Serdes.ByteArray(), j);
    }

    public void setUp(StreamsBuilder streamsBuilder, File file, Serde<?> serde, Serde<?> serde2) {
        setUp(streamsBuilder, file, serde, serde2, DEFAULT_CACHE_SIZE_BYTES);
    }

    public void setUp(StreamsBuilder streamsBuilder, File file, Serde<?> serde, Serde<?> serde2, long j) {
        InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(streamsBuilder);
        internalTopologyBuilder.setApplicationId("TestDriver");
        this.topology = internalTopologyBuilder.build((Integer) null);
        this.globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
        this.context = new MockProcessorContext(file, serde, serde2, new MockRecordCollector(), new ThreadCache(this.logContext, j, new MockStreamsMetrics(new Metrics())));
        this.context.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, "topic"));
        if (this.globalTopology != null) {
            initTopology(this.globalTopology, this.globalTopology.globalStateStores());
        }
        initTopology(this.topology, this.topology.stateStores());
    }

    protected void after() {
        if (this.topology != null) {
            close();
        }
    }

    private void initTopology(ProcessorTopology processorTopology, List<StateStore> list) {
        for (StateStore stateStore : list) {
            stateStore.init(this.context, stateStore);
        }
        for (ProcessorNode processorNode : processorTopology.processors()) {
            this.context.setCurrentNode(processorNode);
            try {
                processorNode.init(this.context);
                this.context.setCurrentNode(null);
            } catch (Throwable th) {
                this.context.setCurrentNode(null);
                throw th;
            }
        }
    }

    public ProcessorTopology topology() {
        return this.topology;
    }

    public ProcessorContext context() {
        return this.context;
    }

    public void process(String str, Object obj, Object obj2) {
        ProcessorNode currentNode = this.context.currentNode();
        ProcessorNode sourceNodeByTopicName = sourceNodeByTopicName(str);
        if (sourceNodeByTopicName != null) {
            this.context.setRecordContext(createRecordContext(this.context.timestamp()));
            this.context.setCurrentNode(sourceNodeByTopicName);
            try {
                this.context.forward(obj, obj2);
                this.context.setCurrentNode(currentNode);
            } catch (Throwable th) {
                this.context.setCurrentNode(currentNode);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ProcessorNode sourceNodeByTopicName(String str) {
        SourceNode source = this.topology.source(str);
        if (source == null && this.globalTopology != null) {
            source = this.globalTopology.source(str);
        }
        return source;
    }

    public void punctuate(long j) {
        ProcessorNode currentNode = this.context.currentNode();
        for (ProcessorNode processorNode : this.topology.processors()) {
            if (processorNode.processor() != null) {
                this.context.setRecordContext(createRecordContext(j));
                this.context.setCurrentNode(processorNode);
                try {
                    processorNode.processor().punctuate(j);
                    this.context.setCurrentNode(currentNode);
                } catch (Throwable th) {
                    this.context.setCurrentNode(currentNode);
                    throw th;
                }
            }
        }
    }

    public void setTime(long j) {
        this.context.setTime(j);
    }

    public void close() {
        for (ProcessorNode processorNode : this.topology.processors()) {
            this.context.setCurrentNode(processorNode);
            try {
                processorNode.close();
                this.context.setCurrentNode(null);
            } catch (Throwable th) {
                this.context.setCurrentNode(null);
                throw th;
            }
        }
        closeState();
        this.context.close();
    }

    public Set<String> allProcessorNames() {
        HashSet hashSet = new HashSet();
        Iterator it = this.topology.processors().iterator();
        while (it.hasNext()) {
            hashSet.add(((ProcessorNode) it.next()).name());
        }
        return hashSet;
    }

    public ProcessorNode processor(String str) {
        for (ProcessorNode processorNode : this.topology.processors()) {
            if (processorNode.name().equals(str)) {
                return processorNode;
            }
        }
        return null;
    }

    public Map<String, StateStore> allStateStores() {
        return this.context.allStateStores();
    }

    public void flushState() {
        Iterator<StateStore> it = this.context.allStateStores().values().iterator();
        while (it.hasNext()) {
            it.next().flush();
        }
    }

    private void closeState() {
        flushState();
        Iterator<StateStore> it = this.context.allStateStores().values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    private ProcessorRecordContext createRecordContext(long j) {
        return new ProcessorRecordContext(j, -1L, -1, "topic");
    }
}
