/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateManager;
import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateManager;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.TestUtils;

public class ProcessorTopologyTestDriver {
    private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
    private final String applicationId = "test-driver-application";
    private final TaskId id;
    private final ProcessorTopology topology;
    private final MockConsumer<byte[], byte[]> consumer;
    private final MockProducer<byte[], byte[]> producer;
    private final MockConsumer<byte[], byte[]> restoreStateConsumer;
    private final Map<String, TopicPartition> partitionsByTopic = new HashMap<String, TopicPartition>();
    private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<TopicPartition, AtomicLong>();
    private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<String, Queue<ProducerRecord<byte[], byte[]>>>();
    private final ProcessorTopology globalTopology;
    private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap<String, TopicPartition>();
    private StreamTask task;
    private GlobalStateUpdateTask globalStateTask;

    public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String ... storeNames) {
        this.id = new TaskId(0, 0);
        this.topology = builder.setApplicationId("test-driver-application").build(null);
        this.globalTopology = builder.buildGlobalStateTopology();
        this.consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        this.producer = new MockProducer<byte[], byte[]>(true, this.bytesSerializer, this.bytesSerializer){

            public List<PartitionInfo> partitionsFor(String topic) {
                return Collections.singletonList(new PartitionInfo(topic, 0, null, null, null));
            }
        };
        this.restoreStateConsumer = this.createRestoreConsumer(this.id, storeNames);
        for (String topic : this.topology.sourceTopics()) {
            TopicPartition tp = new TopicPartition(topic, 1);
            this.partitionsByTopic.put(topic, tp);
            this.offsetsByTopicPartition.put(tp, new AtomicLong());
        }
        this.consumer.assign(this.offsetsByTopicPartition.keySet());
        StateDirectory stateDirectory = new StateDirectory("test-driver-application", TestUtils.tempDirectory().getPath());
        MockStreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
        ThreadCache cache = new ThreadCache("mock", 0x100000L, (StreamsMetrics)streamsMetrics);
        if (this.globalTopology != null) {
            MockConsumer<byte[], byte[]> globalConsumer = this.createGlobalConsumer();
            for (String topicName : this.globalTopology.sourceTopics()) {
                ArrayList<PartitionInfo> partitionInfos = new ArrayList<PartitionInfo>();
                partitionInfos.add(new PartitionInfo(topicName, 1, null, null, null));
                globalConsumer.updatePartitions(topicName, partitionInfos);
                TopicPartition partition = new TopicPartition(topicName, 1);
                globalConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
                this.globalPartitionsByTopic.put(topicName, partition);
                this.offsetsByTopicPartition.put(partition, new AtomicLong());
            }
            GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(this.globalTopology, globalConsumer, stateDirectory);
            this.globalStateTask = new GlobalStateUpdateTask(this.globalTopology, (InternalProcessorContext)new GlobalProcessorContextImpl(config, (StateManager)stateManager, (StreamsMetrics)streamsMetrics, cache), (GlobalStateManager)stateManager);
            this.globalStateTask.initialize();
        }
        if (!this.partitionsByTopic.isEmpty()) {
            this.task = new StreamTask(this.id, "test-driver-application", this.partitionsByTopic.values(), this.topology, this.consumer, this.restoreStateConsumer, config, (StreamsMetrics)streamsMetrics, stateDirectory, cache, (Time)new MockTime(), (RecordCollector)new RecordCollectorImpl(this.producer, "id"));
        }
    }

    public void process(String topicName, byte[] key, byte[] value) {
        TopicPartition tp = this.partitionsByTopic.get(topicName);
        if (tp != null) {
            long offset = this.offsetsByTopicPartition.get(tp).incrementAndGet();
            this.task.addRecords(tp, this.records((ConsumerRecord<byte[], byte[]>)new ConsumerRecord(tp.topic(), tp.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)key, (Object)value)));
            this.producer.clear();
            this.task.process();
            ((InternalProcessorContext)this.task.context()).setRecordContext((RecordContext)new ProcessorRecordContext(0L, offset, tp.partition(), topicName));
            this.task.commit();
            for (ProducerRecord record : this.producer.history()) {
                Queue<ProducerRecord<byte[], byte[]>> outputRecords = this.outputRecordsByTopic.get(record.topic());
                if (outputRecords == null) {
                    outputRecords = new LinkedList<ProducerRecord<byte[], byte[]>>();
                    this.outputRecordsByTopic.put(record.topic(), outputRecords);
                }
                outputRecords.add((ProducerRecord<byte[], byte[]>)record);
            }
        } else {
            TopicPartition global = this.globalPartitionsByTopic.get(topicName);
            if (global == null) {
                throw new IllegalArgumentException("Unexpected topic: " + topicName);
            }
            long offset = this.offsetsByTopicPartition.get(global).incrementAndGet();
            this.globalStateTask.update(new ConsumerRecord(global.topic(), global.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)key, (Object)value));
            this.globalStateTask.flushState();
        }
    }

    public <K, V> void process(String topicName, K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this.process(topicName, keySerializer.serialize(topicName, key), valueSerializer.serialize(topicName, value));
    }

    public ProducerRecord<byte[], byte[]> readOutput(String topic) {
        Queue<ProducerRecord<byte[], byte[]>> outputRecords = this.outputRecordsByTopic.get(topic);
        if (outputRecords == null) {
            return null;
        }
        return outputRecords.poll();
    }

    public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        ProducerRecord<byte[], byte[]> record = this.readOutput(topic);
        if (record == null) {
            return null;
        }
        Object key = keyDeserializer.deserialize(record.topic(), (byte[])record.key());
        Object value = valueDeserializer.deserialize(record.topic(), (byte[])record.value());
        return new ProducerRecord(record.topic(), record.partition(), key, value);
    }

    private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]> record) {
        return Collections.singleton(record);
    }

    public StateStore getStateStore(String name) {
        return ((ProcessorContextImpl)this.task.context()).getStateMgr().getStore(name);
    }

    public <K, V> KeyValueStore<K, V> getKeyValueStore(String name) {
        StateStore store = this.getStateStore(name);
        return store instanceof KeyValueStore ? (KeyValueStore)this.getStateStore(name) : null;
    }

    public void close() {
        if (this.task != null) {
            this.task.close();
        }
        if (this.globalStateTask != null) {
            try {
                this.globalStateTask.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    protected MockConsumer<byte[], byte[]> createRestoreConsumer(TaskId id, String ... storeNames) {
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST){

            public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
            }

            public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
            }

            public synchronized long position(TopicPartition partition) {
                return 0L;
            }
        };
        for (String storeName : storeNames) {
            String topicName = ProcessorStateManager.storeChangelogTopic((String)"test-driver-application", (String)storeName);
            ArrayList<PartitionInfo> partitionInfos = new ArrayList<PartitionInfo>();
            partitionInfos.add(new PartitionInfo(topicName, id.partition, null, null, null));
            consumer.updatePartitions(topicName, partitionInfos);
            consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id.partition), 0L));
        }
        return consumer;
    }

    protected MockConsumer<byte[], byte[]> createGlobalConsumer() {
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST){

            public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
            }

            public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
            }

            public synchronized long position(TopicPartition partition) {
                return 0L;
            }
        };
        return consumer;
    }
}

