package org.apache.kafka.test;

import java.io.File;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;

/* loaded from: input_file:org/apache/kafka/test/KStreamTestDriver.class */
public class KStreamTestDriver {
    private final ProcessorTopology topology;
    private final MockProcessorContext context;
    private ThreadCache cache;
    private static final long DEFAULT_CACHE_SIZE_BYTES = 1048576;
    public final File stateDir;
    private ProcessorNode currNode;

    /* loaded from: input_file:org/apache/kafka/test/KStreamTestDriver$MockRecordCollector.class */
    private class MockRecordCollector extends RecordCollectorImpl {
        public MockRecordCollector() {
            super((Producer) null, "KStreamTestDriver");
        }

        public <K, V> void send(ProducerRecord<K, V> producerRecord, Serializer<K> serializer, Serializer<V> serializer2, StreamPartitioner<K, V> streamPartitioner) {
            KStreamTestDriver.this.process(producerRecord.topic(), producerRecord.key(), producerRecord.value());
        }

        public <K, V> void send(ProducerRecord<K, V> producerRecord, Serializer<K> serializer, Serializer<V> serializer2) {
            KStreamTestDriver.this.process(producerRecord.topic(), producerRecord.key(), producerRecord.value());
        }

        public void flush() {
        }

        public void close() {
        }
    }

    public KStreamTestDriver(KStreamBuilder kStreamBuilder) {
        this(kStreamBuilder, null, Serdes.ByteArray(), Serdes.ByteArray());
    }

    public KStreamTestDriver(KStreamBuilder kStreamBuilder, File file) {
        this(kStreamBuilder, file, Serdes.ByteArray(), Serdes.ByteArray());
    }

    public KStreamTestDriver(KStreamBuilder kStreamBuilder, File file, long j) {
        this(kStreamBuilder, file, Serdes.ByteArray(), Serdes.ByteArray(), j);
    }

    public KStreamTestDriver(KStreamBuilder kStreamBuilder, File file, Serde<?> serde, Serde<?> serde2) {
        this(kStreamBuilder, file, serde, serde2, DEFAULT_CACHE_SIZE_BYTES);
    }

    public KStreamTestDriver(KStreamBuilder kStreamBuilder, File file, Serde<?> serde, Serde<?> serde2, long j) {
        kStreamBuilder.setApplicationId("TestDriver");
        this.topology = kStreamBuilder.build((Integer) null);
        this.stateDir = file;
        this.cache = new ThreadCache(j);
        this.context = new MockProcessorContext(this, file, serde, serde2, (RecordCollector) new MockRecordCollector(), this.cache);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, "topic"));
        for (StateStore stateStore : this.topology.stateStores()) {
            stateStore.init(this.context, stateStore);
        }
        for (ProcessorNode processorNode : this.topology.processors()) {
            this.context.setCurrentNode(processorNode);
            try {
                processorNode.init(this.context);
                this.context.setCurrentNode(null);
            } catch (Throwable th) {
                this.context.setCurrentNode(null);
                throw th;
            }
        }
    }

    public ProcessorContext context() {
        return this.context;
    }

    public void process(String str, Object obj, Object obj2) {
        ProcessorNode processorNode = this.currNode;
        this.currNode = this.topology.source(str);
        if (str.endsWith("-changelog")) {
            this.currNode = processorNode;
            return;
        }
        this.context.setRecordContext(createRecordContext(this.context.timestamp()));
        this.context.setCurrentNode(this.currNode);
        try {
            forward(obj, obj2);
            this.currNode = null;
            this.context.setCurrentNode(null);
        } catch (Throwable th) {
            this.currNode = null;
            this.context.setCurrentNode(null);
            throw th;
        }
    }

    private ProcessorRecordContext createRecordContext(long j) {
        return new ProcessorRecordContext(j, -1L, -1, "topic");
    }

    public void punctuate(long j) {
        for (ProcessorNode processorNode : this.topology.processors()) {
            if (processorNode.processor() != null) {
                this.currNode = processorNode;
                try {
                    this.context.setRecordContext(createRecordContext(j));
                    processorNode.processor().punctuate(j);
                    this.currNode = null;
                } catch (Throwable th) {
                    this.currNode = null;
                    throw th;
                }
            }
        }
    }

    public void setTime(long j) {
        this.context.setTime(j);
    }

    public <K, V> void forward(K k, V v) {
        ProcessorNode processorNode = this.currNode;
        for (ProcessorNode processorNode2 : this.currNode.children()) {
            this.currNode = processorNode2;
            try {
                processorNode2.process(k, v);
                this.currNode = processorNode;
            } catch (Throwable th) {
                this.currNode = processorNode;
                throw th;
            }
        }
    }

    public <K, V> void forward(K k, V v, int i) {
        ProcessorNode processorNode = this.currNode;
        ProcessorNode processorNode2 = (ProcessorNode) processorNode.children().get(i);
        this.currNode = processorNode2;
        try {
            processorNode2.process(k, v);
            this.currNode = processorNode;
        } catch (Throwable th) {
            this.currNode = processorNode;
            throw th;
        }
    }

    public <K, V> void forward(K k, V v, String str) {
        ProcessorNode processorNode = this.currNode;
        for (ProcessorNode processorNode2 : processorNode.children()) {
            if (processorNode2.name().equals(str)) {
                this.currNode = processorNode2;
                try {
                    processorNode2.process(k, v);
                    this.currNode = processorNode;
                    return;
                } catch (Throwable th) {
                    this.currNode = processorNode;
                    throw th;
                }
            }
        }
    }

    public void close() {
        for (ProcessorNode processorNode : this.topology.processors()) {
            this.currNode = processorNode;
            try {
                processorNode.close();
            } finally {
                this.currNode = null;
            }
        }
        flushState();
    }

    public Set<String> allProcessorNames() {
        HashSet hashSet = new HashSet();
        Iterator it = this.topology.processors().iterator();
        while (it.hasNext()) {
            hashSet.add(((ProcessorNode) it.next()).name());
        }
        return hashSet;
    }

    public ProcessorNode processor(String str) {
        for (ProcessorNode processorNode : this.topology.processors()) {
            if (processorNode.name().equals(str)) {
                return processorNode;
            }
        }
        return null;
    }

    public Map<String, StateStore> allStateStores() {
        return this.context.allStateStores();
    }

    public void flushState() {
        ProcessorNode processorNode = this.currNode;
        try {
            for (StateStore stateStore : this.context.allStateStores().values()) {
                ProcessorNode processorNode2 = (ProcessorNode) this.topology.storeToProcessorNodeMap().get(stateStore);
                if (processorNode2 != null) {
                    this.currNode = processorNode2;
                }
                stateStore.flush();
            }
        } finally {
            this.currNode = processorNode;
        }
    }

    public void setCurrentNode(ProcessorNode processorNode) {
        this.currNode = processorNode;
    }
}
