/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.junit.Assert;
import org.junit.Test;

public class StreamThreadTest {
    private final String clientId = "clientId";
    private final String applicationId = "stream-thread-test";
    private final UUID processId = UUID.randomUUID();
    private TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private TopicPartition t2p2 = new TopicPartition("topic2", 2);
    private TopicPartition t3p1 = new TopicPartition("topic3", 1);
    private TopicPartition t3p2 = new TopicPartition("topic3", 2);
    private List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]));
    private Cluster metadata = new Cluster("cluster", Arrays.asList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"), this.subscriptionUserData());
    private final TaskId task1 = new TaskId(0, 1);
    private final TaskId task2 = new TaskId(0, 2);
    private final TaskId task3 = new TaskId(0, 3);
    private final TaskId task4 = new TaskId(1, 1);
    private final TaskId task5 = new TaskId(1, 2);

    private ByteBuffer subscriptionUserData() {
        UUID uuid = UUID.randomUUID();
        ByteBuffer buf = ByteBuffer.allocate(28);
        buf.putInt(1);
        buf.putLong(uuid.getMostSignificantBits());
        buf.putLong(uuid.getLeastSignificantBits());
        buf.putInt(0);
        buf.putInt(0);
        buf.rewind();
        return buf;
    }

    private Properties configProps() {
        return new Properties(){
            {
                this.setProperty("application.id", "stream-thread-test");
                this.setProperty("bootstrap.servers", "localhost:2171");
                this.setProperty("buffered.records.per.partition", "3");
                this.setProperty("timestamp.extractor", MockTimestampExtractor.class.getName());
            }
        };
    }

    @Test
    public void testPartitionAssignmentChange() throws Exception {
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
        builder.addSource("source1", new String[]{"topic1"});
        builder.addSource("source2", new String[]{"topic2"});
        builder.addSource("source3", new String[]{"topic3"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source2", "source3"});
        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)new SystemTime(), new StreamsMetadataState(builder)){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                ProcessorTopology topology = this.builder.build(Integer.valueOf(id.topicGroupId));
                return new TestStreamTask(id, this.applicationId, partitionsForTask, topology, (Consumer<byte[], byte[]>)this.consumer, (Producer<byte[], byte[]>)this.producer, (Consumer<byte[], byte[]>)this.restoreConsumer, this.config, this.stateDirectory);
            }
        };
        this.initPartitionGrouper(config, thread);
        ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
        List<Object> revokedPartitions = Collections.emptyList();
        List<Object> assignedPartitions = Collections.singletonList(this.t1p1);
        HashSet<TopicPartition> expectedGroup1 = new HashSet<TopicPartition>(Arrays.asList(this.t1p1));
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertEquals(expectedGroup1, (Object)((StreamTask)thread.tasks().get(this.task1)).partitions());
        Assert.assertEquals((long)1L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        assignedPartitions = Collections.singletonList(this.t1p2);
        HashSet<TopicPartition> expectedGroup2 = new HashSet<TopicPartition>(Arrays.asList(this.t1p2));
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task2));
        Assert.assertEquals(expectedGroup2, (Object)((StreamTask)thread.tasks().get(this.task2)).partitions());
        Assert.assertEquals((long)1L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        assignedPartitions = Arrays.asList(this.t1p1, this.t1p2);
        expectedGroup1 = new HashSet<TopicPartition>(Collections.singleton(this.t1p1));
        expectedGroup2 = new HashSet<TopicPartition>(Collections.singleton(this.t1p2));
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task2));
        Assert.assertEquals(expectedGroup1, (Object)((StreamTask)thread.tasks().get(this.task1)).partitions());
        Assert.assertEquals(expectedGroup2, (Object)((StreamTask)thread.tasks().get(this.task2)).partitions());
        Assert.assertEquals((long)2L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        assignedPartitions = Arrays.asList(this.t2p1, this.t2p2, this.t3p1, this.t3p2);
        expectedGroup1 = new HashSet<TopicPartition>(Arrays.asList(this.t2p1, this.t3p1));
        expectedGroup2 = new HashSet<TopicPartition>(Arrays.asList(this.t2p2, this.t3p2));
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task4));
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task5));
        Assert.assertEquals(expectedGroup1, (Object)((StreamTask)thread.tasks().get(this.task4)).partitions());
        Assert.assertEquals(expectedGroup2, (Object)((StreamTask)thread.tasks().get(this.task5)).partitions());
        Assert.assertEquals((long)2L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        assignedPartitions = Arrays.asList(this.t1p1, this.t2p1, this.t3p1);
        expectedGroup1 = new HashSet<TopicPartition>(Arrays.asList(this.t1p1));
        expectedGroup2 = new HashSet<TopicPartition>(Arrays.asList(this.t2p1, this.t3p1));
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task4));
        Assert.assertEquals(expectedGroup1, (Object)((StreamTask)thread.tasks().get(this.task1)).partitions());
        Assert.assertEquals(expectedGroup2, (Object)((StreamTask)thread.tasks().get(this.task4)).partitions());
        Assert.assertEquals((long)2L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        assignedPartitions = Collections.emptyList();
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMaybeClean() throws Exception {
        File baseDir = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            long cleanupDelay = 1000L;
            Properties props = this.configProps();
            props.setProperty("state.cleanup.delay.ms", Long.toString(1000L));
            props.setProperty("state.dir", baseDir.getCanonicalPath());
            StreamsConfig config = new StreamsConfig((Map)props);
            File applicationDir = new File(baseDir, "stream-thread-test");
            applicationDir.mkdir();
            File stateDir1 = new File(applicationDir, this.task1.toString());
            File stateDir2 = new File(applicationDir, this.task2.toString());
            File stateDir3 = new File(applicationDir, this.task3.toString());
            File extraDir = new File(applicationDir, "X");
            stateDir1.mkdir();
            stateDir2.mkdir();
            stateDir3.mkdir();
            extraDir.mkdir();
            MockTime mockTime = new MockTime();
            TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
            builder.addSource("source1", new String[]{"topic1"});
            StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)mockTime, new StreamsMetadataState(builder)){

                public void maybeClean() {
                    super.maybeClean();
                }

                protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                    ProcessorTopology topology = this.builder.build(Integer.valueOf(id.topicGroupId));
                    return new TestStreamTask(id, this.applicationId, partitionsForTask, topology, (Consumer<byte[], byte[]>)this.consumer, (Producer<byte[], byte[]>)this.producer, (Consumer<byte[], byte[]>)this.restoreConsumer, this.config, this.stateDirectory);
                }
            };
            this.initPartitionGrouper(config, thread);
            ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
            Assert.assertTrue((boolean)thread.tasks().isEmpty());
            mockTime.sleep(1000L);
            Assert.assertTrue((boolean)stateDir1.exists());
            Assert.assertTrue((boolean)stateDir2.exists());
            Assert.assertTrue((boolean)stateDir3.exists());
            Assert.assertTrue((boolean)extraDir.exists());
            List<Object> revokedPartitions = Collections.emptyList();
            List<Object> assignedPartitions = Arrays.asList(this.t1p1, this.t1p2);
            HashMap prevTasks = new HashMap(thread.tasks());
            rebalanceListener.onPartitionsRevoked(revokedPartitions);
            rebalanceListener.onPartitionsAssigned(assignedPartitions);
            Assert.assertTrue((boolean)prevTasks.isEmpty());
            Assert.assertEquals((long)2L, (long)thread.tasks().size());
            mockTime.sleep(990L);
            thread.maybeClean();
            Assert.assertTrue((boolean)stateDir1.exists());
            Assert.assertTrue((boolean)stateDir2.exists());
            Assert.assertTrue((boolean)stateDir3.exists());
            Assert.assertTrue((boolean)extraDir.exists());
            mockTime.sleep(11L);
            thread.maybeClean();
            Assert.assertTrue((boolean)stateDir1.exists());
            Assert.assertTrue((boolean)stateDir2.exists());
            Assert.assertFalse((boolean)stateDir3.exists());
            Assert.assertTrue((boolean)extraDir.exists());
            revokedPartitions = assignedPartitions;
            assignedPartitions = Collections.emptyList();
            prevTasks = new HashMap(thread.tasks());
            rebalanceListener.onPartitionsRevoked(revokedPartitions);
            rebalanceListener.onPartitionsAssigned(assignedPartitions);
            Assert.assertEquals((long)2L, (long)prevTasks.size());
            for (StreamTask task : prevTasks.values()) {
                Assert.assertTrue((boolean)((TestStreamTask)task).committed);
                ((TestStreamTask)task).committed = false;
            }
            Assert.assertTrue((boolean)thread.tasks().isEmpty());
            mockTime.sleep(990L);
            thread.maybeClean();
            Assert.assertTrue((boolean)stateDir1.exists());
            Assert.assertTrue((boolean)stateDir2.exists());
            Assert.assertFalse((boolean)stateDir3.exists());
            Assert.assertTrue((boolean)extraDir.exists());
            mockTime.sleep(11L);
            thread.maybeClean();
            Assert.assertFalse((boolean)stateDir1.exists());
            Assert.assertFalse((boolean)stateDir2.exists());
            Assert.assertFalse((boolean)stateDir3.exists());
            Assert.assertTrue((boolean)extraDir.exists());
        }
        finally {
            Utils.delete((File)baseDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMaybeCommit() throws Exception {
        File baseDir = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            long commitInterval = 1000L;
            Properties props = this.configProps();
            props.setProperty("state.dir", baseDir.getCanonicalPath());
            props.setProperty("commit.interval.ms", Long.toString(1000L));
            StreamsConfig config = new StreamsConfig((Map)props);
            MockTime mockTime = new MockTime();
            TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
            builder.addSource("source1", new String[]{"topic1"});
            StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)mockTime, new StreamsMetadataState(builder)){

                public void maybeCommit() {
                    super.maybeCommit();
                }

                protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                    ProcessorTopology topology = this.builder.build(Integer.valueOf(id.topicGroupId));
                    return new TestStreamTask(id, this.applicationId, partitionsForTask, topology, (Consumer<byte[], byte[]>)this.consumer, (Producer<byte[], byte[]>)this.producer, (Consumer<byte[], byte[]>)this.restoreConsumer, this.config, this.stateDirectory);
                }
            };
            this.initPartitionGrouper(config, thread);
            ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
            List revokedPartitions = Collections.emptyList();
            List<TopicPartition> assignedPartitions = Arrays.asList(this.t1p1, this.t1p2);
            rebalanceListener.onPartitionsRevoked(revokedPartitions);
            rebalanceListener.onPartitionsAssigned(assignedPartitions);
            Assert.assertEquals((long)2L, (long)thread.tasks().size());
            mockTime.sleep(990L);
            thread.maybeCommit();
            for (StreamTask task : thread.tasks().values()) {
                Assert.assertFalse((boolean)((TestStreamTask)task).committed);
            }
            mockTime.sleep(11L);
            thread.maybeCommit();
            for (StreamTask task : thread.tasks().values()) {
                Assert.assertTrue((boolean)((TestStreamTask)task).committed);
                ((TestStreamTask)task).committed = false;
            }
            mockTime.sleep(990L);
            thread.maybeCommit();
            for (StreamTask task : thread.tasks().values()) {
                Assert.assertFalse((boolean)((TestStreamTask)task).committed);
            }
            mockTime.sleep(11L);
            thread.maybeCommit();
            for (StreamTask task : thread.tasks().values()) {
                Assert.assertTrue((boolean)((TestStreamTask)task).committed);
                ((TestStreamTask)task).committed = false;
            }
        }
        finally {
            Utils.delete((File)baseDir);
        }
    }

    @Test
    public void testInjectClients() {
        TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        MockClientSupplier clientSupplier = new MockClientSupplier();
        StreamThread thread = new StreamThread(builder, config, (KafkaClientSupplier)clientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)new MockTime(), new StreamsMetadataState(builder));
        Assert.assertSame(clientSupplier.producer, (Object)thread.producer);
        Assert.assertSame(clientSupplier.consumer, (Object)thread.consumer);
        Assert.assertSame(clientSupplier.restoreConsumer, (Object)thread.restoreConsumer);
    }

    private void initPartitionGrouper(StreamsConfig config, StreamThread thread) {
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        partitionAssignor.configure(config.getConsumerConfigs(thread, thread.applicationId, thread.clientId));
        Map assignments = partitionAssignor.assign(this.metadata, Collections.singletonMap("client", this.subscription));
        partitionAssignor.onAssignment((PartitionAssignor.Assignment)assignments.get("client"));
    }

    private static class TestStreamTask
    extends StreamTask {
        public boolean committed = false;

        public TestStreamTask(TaskId id, String applicationId, Collection<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, Producer<byte[], byte[]> producer, Consumer<byte[], byte[]> restoreConsumer, StreamsConfig config, StateDirectory stateDirectory) {
            super(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, null, stateDirectory, null);
        }

        public void commit() {
            super.commit();
            this.committed = true;
        }

        public void commitOffsets() {
            super.commitOffsets();
            this.committed = true;
        }

        protected void initializeOffsetLimits() {
        }
    }
}

