/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.test;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.TestUtils;

public class ProcessorTopologyTestDriver {
    private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
    private final String applicationId = "test-driver-application";
    private final TaskId id;
    private final ProcessorTopology topology;
    private final StreamTask task;
    private final MockConsumer<byte[], byte[]> consumer;
    private final MockProducer<byte[], byte[]> producer;
    private final MockConsumer<byte[], byte[]> restoreStateConsumer;
    private final Map<String, TopicPartition> partitionsByTopic = new HashMap<String, TopicPartition>();
    private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<TopicPartition, AtomicLong>();
    private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<String, Queue<ProducerRecord<byte[], byte[]>>>();

    public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String ... storeNames) {
        this.id = new TaskId(0, 0);
        this.topology = builder.setApplicationId("ProcessorTopologyTestDriver").build(null);
        this.consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        this.producer = new MockProducer<byte[], byte[]>(true, this.bytesSerializer, this.bytesSerializer){

            public List<PartitionInfo> partitionsFor(String topic) {
                return Collections.singletonList(new PartitionInfo(topic, 0, null, null, null));
            }
        };
        this.restoreStateConsumer = this.createRestoreConsumer(this.id, storeNames);
        for (String topic : this.topology.sourceTopics()) {
            TopicPartition tp = new TopicPartition(topic, 1);
            this.partitionsByTopic.put(topic, tp);
            this.offsetsByTopicPartition.put(tp, new AtomicLong());
        }
        this.consumer.assign(this.offsetsByTopicPartition.keySet());
        this.task = new StreamTask(this.id, "test-driver-application", this.partitionsByTopic.values(), this.topology, this.consumer, this.producer, this.restoreStateConsumer, config, new StreamsMetrics(){

            public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String ... tags) {
                return null;
            }

            public void recordLatency(Sensor sensor, long startNs, long endNs) {
            }
        }, new StateDirectory("test-driver-application", TestUtils.tempDirectory().getPath()), new ThreadCache(0x100000L));
    }

    public void process(String topicName, byte[] key, byte[] value) {
        TopicPartition tp = this.partitionsByTopic.get(topicName);
        if (tp == null) {
            throw new IllegalArgumentException("Unexpected topic: " + topicName);
        }
        long offset = this.offsetsByTopicPartition.get(tp).incrementAndGet();
        this.task.addRecords(tp, this.records((ConsumerRecord<byte[], byte[]>)new ConsumerRecord(tp.topic(), tp.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)key, (Object)value)));
        this.producer.clear();
        this.task.process();
        ((InternalProcessorContext)this.task.context()).setRecordContext((RecordContext)new ProcessorRecordContext(0L, offset, tp.partition(), topicName));
        this.task.commit();
        for (ProducerRecord record : this.producer.history()) {
            Queue<ProducerRecord<byte[], byte[]>> outputRecords = this.outputRecordsByTopic.get(record.topic());
            if (outputRecords == null) {
                outputRecords = new LinkedList<ProducerRecord<byte[], byte[]>>();
                this.outputRecordsByTopic.put(record.topic(), outputRecords);
            }
            outputRecords.add((ProducerRecord<byte[], byte[]>)record);
        }
    }

    public <K, V> void process(String topicName, K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this.process(topicName, keySerializer.serialize(topicName, key), valueSerializer.serialize(topicName, value));
    }

    public ProducerRecord<byte[], byte[]> readOutput(String topic) {
        Queue<ProducerRecord<byte[], byte[]>> outputRecords = this.outputRecordsByTopic.get(topic);
        if (outputRecords == null) {
            return null;
        }
        return outputRecords.poll();
    }

    public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        ProducerRecord<byte[], byte[]> record = this.readOutput(topic);
        if (record == null) {
            return null;
        }
        Object key = keyDeserializer.deserialize(record.topic(), (byte[])record.key());
        Object value = valueDeserializer.deserialize(record.topic(), (byte[])record.value());
        return new ProducerRecord(record.topic(), record.partition(), key, value);
    }

    private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]> record) {
        return Collections.singleton(record);
    }

    public StateStore getStateStore(String name) {
        return ((ProcessorContextImpl)this.task.context()).getStateMgr().getStore(name);
    }

    public <K, V> KeyValueStore<K, V> getKeyValueStore(String name) {
        StateStore store = this.getStateStore(name);
        return store instanceof KeyValueStore ? (KeyValueStore)this.getStateStore(name) : null;
    }

    public void close() {
        this.task.close();
    }

    protected MockConsumer<byte[], byte[]> createRestoreConsumer(TaskId id, String ... storeNames) {
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST){

            public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
            }

            public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
            }

            public synchronized long position(TopicPartition partition) {
                return 0L;
            }
        };
        for (String storeName : storeNames) {
            String topicName = ProcessorStateManager.storeChangelogTopic((String)"test-driver-application", (String)storeName);
            ArrayList<PartitionInfo> partitionInfos = new ArrayList<PartitionInfo>();
            partitionInfos.add(new PartitionInfo(topicName, id.partition, null, null, null));
            consumer.updatePartitions(topicName, partitionInfos);
            consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id.partition), 0L));
        }
        return consumer;
    }
}

