/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.test;

import java.io.File;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockProcessorContext;

public class KStreamTestDriver {
    private final ProcessorTopology topology;
    private final MockProcessorContext context;
    private ThreadCache cache;
    private static final long DEFAULT_CACHE_SIZE_BYTES = 0x100000L;
    public final File stateDir;
    private ProcessorNode currNode;

    public KStreamTestDriver(KStreamBuilder builder) {
        this(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
    }

    public KStreamTestDriver(KStreamBuilder builder, File stateDir) {
        this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public KStreamTestDriver(KStreamBuilder builder, File stateDir, Serde<?> keySerde, Serde<?> valSerde) {
        builder.setApplicationId("TestDriver");
        this.topology = builder.build(null);
        this.stateDir = stateDir;
        this.cache = new ThreadCache(0x100000L);
        this.context = new MockProcessorContext(this, stateDir, keySerde, valSerde, new MockRecordCollector(), this.cache);
        this.context.setRecordContext((RecordContext)new ProcessorRecordContext(0L, 0L, 0, "topic"));
        for (StateStore store : this.topology.stateStores()) {
            store.init((ProcessorContext)this.context, store);
        }
        Iterator i$ = this.topology.processors().iterator();
        while (i$.hasNext()) {
            ProcessorNode node;
            this.currNode = node = (ProcessorNode)i$.next();
            try {
                node.init((ProcessorContext)this.context);
            }
            finally {
                this.currNode = null;
            }
        }
    }

    public ProcessorContext context() {
        return this.context;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(String topicName, Object key, Object value) {
        this.currNode = this.topology.source(topicName);
        if (topicName.endsWith("-changelog")) {
            return;
        }
        this.context.setRecordContext((RecordContext)this.createRecordContext(this.context.timestamp()));
        this.context.setCurrentNode(this.currNode);
        try {
            this.forward(key, value);
        }
        finally {
            this.currNode = null;
            this.context.setCurrentNode(null);
        }
    }

    private ProcessorRecordContext createRecordContext(long timestamp) {
        return new ProcessorRecordContext(timestamp, -1L, -1, "topic");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void punctuate(long timestamp) {
        for (ProcessorNode processor : this.topology.processors()) {
            if (processor.processor() == null) continue;
            this.currNode = processor;
            try {
                this.context.setRecordContext((RecordContext)this.createRecordContext(timestamp));
                processor.processor().punctuate(timestamp);
            }
            finally {
                this.currNode = null;
            }
        }
    }

    public void setTime(long timestamp) {
        this.context.setTime(timestamp);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <K, V> void forward(K key, V value) {
        ProcessorNode thisNode = this.currNode;
        Iterator i$ = this.currNode.children().iterator();
        while (i$.hasNext()) {
            ProcessorNode childNode;
            this.currNode = childNode = (ProcessorNode)i$.next();
            try {
                childNode.process(key, value);
            }
            finally {
                this.currNode = thisNode;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <K, V> void forward(K key, V value, int childIndex) {
        ProcessorNode childNode;
        ProcessorNode thisNode = this.currNode;
        this.currNode = childNode = (ProcessorNode)thisNode.children().get(childIndex);
        try {
            childNode.process(key, value);
        }
        finally {
            this.currNode = thisNode;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <K, V> void forward(K key, V value, String childName) {
        ProcessorNode thisNode = this.currNode;
        for (ProcessorNode childNode : thisNode.children()) {
            if (!childNode.name().equals(childName)) continue;
            this.currNode = childNode;
            try {
                childNode.process(key, value);
                break;
            }
            finally {
                this.currNode = thisNode;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        Iterator i$ = this.topology.processors().iterator();
        while (i$.hasNext()) {
            ProcessorNode node;
            this.currNode = node = (ProcessorNode)i$.next();
            try {
                node.close();
            }
            finally {
                this.currNode = null;
            }
        }
        this.flushState();
    }

    public Set<String> allProcessorNames() {
        HashSet<String> names = new HashSet<String>();
        List nodes = this.topology.processors();
        for (ProcessorNode node : nodes) {
            names.add(node.name());
        }
        return names;
    }

    public ProcessorNode processor(String name) {
        List nodes = this.topology.processors();
        for (ProcessorNode node : nodes) {
            if (!node.name().equals(name)) continue;
            return node;
        }
        return null;
    }

    public Map<String, StateStore> allStateStores() {
        return this.context.allStateStores();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void flushState() {
        ProcessorNode current = this.currNode;
        try {
            for (StateStore stateStore : this.context.allStateStores().values()) {
                ProcessorNode processorNode = (ProcessorNode)this.topology.storeToProcessorNodeMap().get(stateStore);
                if (processorNode != null) {
                    this.currNode = processorNode;
                }
                stateStore.flush();
            }
        }
        finally {
            this.currNode = current;
        }
    }

    private class MockRecordCollector
    extends RecordCollector {
        public MockRecordCollector() {
            super(null, "KStreamTestDriver");
        }

        public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<K, V> partitioner) {
            KStreamTestDriver.this.process(record.topic(), record.key(), record.value());
        }

        public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
            KStreamTestDriver.this.process(record.topic(), record.key(), record.value());
        }

        public void flush() {
        }

        public void close() {
        }
    }
}

