/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamTaskTest {
    private final Serializer<Integer> intSerializer = new IntegerSerializer();
    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
    private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
    private final String[] topic1 = new String[]{"topic1"};
    private final String[] topic2 = new String[]{"topic2"};
    private final TopicPartition partition1 = new TopicPartition(this.topic1[0], 1);
    private final TopicPartition partition2 = new TopicPartition(this.topic2[0], 1);
    private final Set<TopicPartition> partitions = Utils.mkSet((Object[])new TopicPartition[]{this.partition1, this.partition2});
    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<Integer, Integer>(this.topic1, this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<Integer, Integer>(this.topic2, this.intDeserializer, this.intDeserializer);
    private final MockProcessorNode<Integer, Integer> processor = new MockProcessorNode(10L);
    private final ProcessorTopology topology = new ProcessorTopology(Arrays.asList(new ProcessorNode[]{this.source1, this.source2, this.processor}), (Map)new HashMap<String, SourceNode>(){
        {
            this.put(StreamTaskTest.this.topic1[0], StreamTaskTest.this.source1);
            this.put(StreamTaskTest.this.topic2[0], StreamTaskTest.this.source2);
        }
    }, Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap());
    private File baseDir;
    private StateDirectory stateDirectory;
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final MockProducer<byte[], byte[]> producer = new MockProducer(false, this.bytesSerializer, this.bytesSerializer);
    private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final byte[] recordValue = this.intSerializer.serialize(null, (Object)10);
    private final byte[] recordKey = this.intSerializer.serialize(null, (Object)1);

    private StreamsConfig createConfig(final File baseDir) throws Exception {
        return new StreamsConfig((Map)new Properties(){
            {
                this.setProperty("application.id", "stream-task-test");
                this.setProperty("bootstrap.servers", "localhost:2171");
                this.setProperty("buffered.records.per.partition", "3");
                this.setProperty("state.dir", baseDir.getCanonicalPath());
                this.setProperty("timestamp.extractor", MockTimestampExtractor.class.getName());
            }
        });
    }

    @Before
    public void setup() {
        this.consumer.assign(Arrays.asList(this.partition1, this.partition2));
        this.source1.addChild(this.processor);
        this.source2.addChild(this.processor);
        this.baseDir = TestUtils.tempDirectory();
        this.stateDirectory = new StateDirectory("applicationId", this.baseDir.getPath());
    }

    @After
    public void cleanup() {
        Utils.delete((File)this.baseDir);
    }

    @Test
    public void testProcessOrder() throws Exception {
        StreamsConfig config = this.createConfig(this.baseDir);
        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", this.partitions, this.topology, this.consumer, this.producer, this.restoreStateConsumer, config, null, this.stateDirectory, null);
        task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        task.addRecords(this.partition2, this.records(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 25L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertEquals((long)5L, (long)task.process());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)4L, (long)task.process());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)3L, (long)task.process());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertEquals((long)2L, (long)task.process());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)task.process());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertEquals((long)0L, (long)task.process());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        task.close();
    }

    @Test
    public void testPauseResume() throws Exception {
        StreamsConfig config = this.createConfig(this.baseDir);
        StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", this.partitions, this.topology, this.consumer, this.producer, this.restoreStateConsumer, config, null, this.stateDirectory, null);
        task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        task.addRecords(this.partition2, this.records(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 55L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 65L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertEquals((long)5L, (long)task.process());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 40L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 50L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertEquals((long)2L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition1));
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertEquals((long)7L, (long)task.process());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertEquals((long)6L, (long)task.process());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertEquals((long)5L, (long)task.process());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertEquals((long)0L, (long)this.consumer.paused().size());
        task.close();
    }

    @Test
    public void testMaybePunctuate() throws Exception {
        StreamsConfig config = this.createConfig(this.baseDir);
        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", this.partitions, this.topology, this.consumer, this.producer, this.restoreStateConsumer, config, null, this.stateDirectory, null);
        task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 40L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        task.addRecords(this.partition2, this.records(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 25L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertTrue((boolean)task.maybePunctuate());
        Assert.assertEquals((long)5L, (long)task.process());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)task.maybePunctuate());
        Assert.assertEquals((long)4L, (long)task.process());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)task.maybePunctuate());
        Assert.assertEquals((long)3L, (long)task.process());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)task.maybePunctuate());
        Assert.assertEquals((long)2L, (long)task.process());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)task.maybePunctuate());
        Assert.assertEquals((long)1L, (long)task.process());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)task.maybePunctuate());
        Assert.assertEquals((long)0L, (long)task.process());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)task.maybePunctuate());
        this.processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L);
        task.close();
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() throws Exception {
        StreamsConfig config = this.createConfig(this.baseDir);
        MockSourceNode processorNode = new MockSourceNode(this.topic1, this.intDeserializer, this.intDeserializer){

            public void process(Object key, Object value) {
                throw new KafkaException("KABOOM!");
            }
        };
        List<3> processorNodes = Collections.singletonList(processorNode);
        Map<String, 3> sourceNodes = Collections.singletonMap(this.topic1[0], processorNode);
        ProcessorTopology topology = new ProcessorTopology(processorNodes, sourceNodes, Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap());
        StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", this.partitions, topology, this.consumer, this.producer, this.restoreStateConsumer, config, null, this.stateDirectory, new ThreadCache(0L));
        int offset = 20;
        streamTask.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        try {
            streamTask.process();
            Assert.fail((String)"Should've thrown StreamsException");
        }
        catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue((String)("message=" + message + " should contain topic"), (boolean)message.contains("topic=" + this.topic1[0]));
            Assert.assertTrue((String)("message=" + message + " should contain partition"), (boolean)message.contains("partition=" + this.partition1.partition()));
            Assert.assertTrue((String)("message=" + message + " should contain offset"), (boolean)message.contains("offset=20"));
            Assert.assertTrue((String)("message=" + message + " should contain processor"), (boolean)message.contains("processor=" + processorNode.name()));
        }
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuating() throws Exception {
        StreamsConfig config = this.createConfig(this.baseDir);
        ProcessorNode punctuator = new ProcessorNode("test", (Processor)new AbstractProcessor(){

            public void init(ProcessorContext context) {
                context.schedule(1L);
            }

            public void process(Object key, Object value) {
            }

            public void punctuate(long timestamp) {
                throw new KafkaException("KABOOM!");
            }
        }, Collections.emptySet());
        List<ProcessorNode> processorNodes = Collections.singletonList(punctuator);
        ProcessorTopology topology = new ProcessorTopology(processorNodes, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap());
        StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", this.partitions, topology, this.consumer, this.producer, this.restoreStateConsumer, config, null, this.stateDirectory, new ThreadCache(0L));
        try {
            streamTask.punctuate(punctuator, 1L);
            Assert.fail((String)"Should've thrown StreamsException");
        }
        catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue((String)("message=" + message + " should contain processor"), (boolean)message.contains("processor=test"));
        }
    }

    private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]> ... recs) {
        return Arrays.asList(recs);
    }
}

