/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor;
import org.apache.beam.sdk.io.kafka.WatchKafkaTopicPartitionDoFn;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={Instant.class})
public class WatchKafkaTopicPartitionDoFnTest {
    @Mock
    Consumer<byte[], byte[]> mockConsumer;
    @Mock
    Timer timer;
    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFn = new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>(){

        public Consumer<byte[], byte[]> apply(Map<String, Object> input) {
            return WatchKafkaTopicPartitionDoFnTest.this.mockConsumer;
        }
    };

    @Test
    public void testGetAllTopicPartitions() throws Exception {
        Mockito.when((Object)this.mockConsumer.listTopics()).thenReturn((Object)ImmutableMap.of((Object)"topic1", (Object)ImmutableList.of((Object)new PartitionInfo("topic1", 0, null, null, null), (Object)new PartitionInfo("topic1", 1, null, null, null)), (Object)"topic2", (Object)ImmutableList.of((Object)new PartitionInfo("topic2", 0, null, null, null), (Object)new PartitionInfo("topic2", 1, null, null, null))));
        WatchKafkaTopicPartitionDoFn dofnInstance = new WatchKafkaTopicPartitionDoFn(Duration.millis((long)1L), this.consumerFn, null, (Map)ImmutableMap.of(), null, null);
        Assert.assertEquals((Object)ImmutableSet.of((Object)new TopicPartition("topic1", 0), (Object)new TopicPartition("topic1", 1), (Object)new TopicPartition("topic2", 0), (Object)new TopicPartition("topic2", 1)), (Object)dofnInstance.getAllTopicPartitions());
    }

    @Test
    public void testGetAllTopicPartitionsWithGivenTopics() throws Exception {
        ImmutableList givenTopics = ImmutableList.of((Object)"topic1", (Object)"topic2");
        Mockito.when((Object)this.mockConsumer.partitionsFor("topic1")).thenReturn((Object)ImmutableList.of((Object)new PartitionInfo("topic1", 0, null, null, null), (Object)new PartitionInfo("topic1", 1, null, null, null)));
        Mockito.when((Object)this.mockConsumer.partitionsFor("topic2")).thenReturn((Object)ImmutableList.of((Object)new PartitionInfo("topic2", 0, null, null, null), (Object)new PartitionInfo("topic2", 1, null, null, null)));
        WatchKafkaTopicPartitionDoFn dofnInstance = new WatchKafkaTopicPartitionDoFn(Duration.millis((long)1L), this.consumerFn, null, (Map)ImmutableMap.of(), null, (List)givenTopics);
        ((Consumer)Mockito.verify(this.mockConsumer, (VerificationMode)Mockito.never())).listTopics();
        Assert.assertEquals((Object)ImmutableSet.of((Object)new TopicPartition("topic1", 0), (Object)new TopicPartition("topic1", 1), (Object)new TopicPartition("topic2", 0), (Object)new TopicPartition("topic2", 1)), (Object)dofnInstance.getAllTopicPartitions());
    }

    @Test
    public void testProcessElementWhenNoAvailableTopicPartition() throws Exception {
        WatchKafkaTopicPartitionDoFn dofnInstance = new WatchKafkaTopicPartitionDoFn(Duration.millis((long)600L), this.consumerFn, null, (Map)ImmutableMap.of(), null, null);
        MockOutputReceiver outputReceiver = new MockOutputReceiver();
        Mockito.when((Object)this.mockConsumer.listTopics()).thenReturn((Object)ImmutableMap.of());
        MockBagState bagState = new MockBagState((List<TopicPartition>)ImmutableList.of());
        Mockito.when((Object)this.timer.offset(Duration.millis((long)600L))).thenReturn((Object)this.timer);
        dofnInstance.processElement(this.timer, (BagState)bagState, (DoFn.OutputReceiver)outputReceiver);
        ((Timer)Mockito.verify((Object)this.timer, (VerificationMode)Mockito.times((int)1))).setRelative();
        Assert.assertTrue((boolean)outputReceiver.getOutputs().isEmpty());
        Assert.assertTrue((boolean)bagState.getCurrentStates().isEmpty());
    }

    @Test
    public void testProcessElementWithAvailableTopicPartitions() throws Exception {
        Instant startReadTime = Instant.ofEpochMilli((long)1L);
        WatchKafkaTopicPartitionDoFn dofnInstance = new WatchKafkaTopicPartitionDoFn(Duration.millis((long)600L), this.consumerFn, null, (Map)ImmutableMap.of(), startReadTime, null);
        MockOutputReceiver outputReceiver = new MockOutputReceiver();
        Mockito.when((Object)this.mockConsumer.listTopics()).thenReturn((Object)ImmutableMap.of((Object)"topic1", (Object)ImmutableList.of((Object)new PartitionInfo("topic1", 0, null, null, null), (Object)new PartitionInfo("topic1", 1, null, null, null)), (Object)"topic2", (Object)ImmutableList.of((Object)new PartitionInfo("topic2", 0, null, null, null), (Object)new PartitionInfo("topic2", 1, null, null, null))));
        MockBagState bagState = new MockBagState((List<TopicPartition>)ImmutableList.of());
        Mockito.when((Object)this.timer.offset(Duration.millis((long)600L))).thenReturn((Object)this.timer);
        dofnInstance.processElement(this.timer, (BagState)bagState, (DoFn.OutputReceiver)outputReceiver);
        ((Timer)Mockito.verify((Object)this.timer, (VerificationMode)Mockito.times((int)1))).setRelative();
        ImmutableSet expectedOutputTopicPartitions = ImmutableSet.of((Object)new TopicPartition("topic1", 0), (Object)new TopicPartition("topic1", 1), (Object)new TopicPartition("topic2", 0), (Object)new TopicPartition("topic2", 1));
        Set<KafkaSourceDescriptor> expectedOutputDescriptor = this.generateDescriptorsFromTopicPartitions((Set<TopicPartition>)expectedOutputTopicPartitions, startReadTime);
        Assert.assertEquals(expectedOutputDescriptor, new HashSet<KafkaSourceDescriptor>(outputReceiver.getOutputs()));
        Assert.assertEquals((Object)expectedOutputTopicPartitions, bagState.getCurrentStates());
    }

    @Test
    public void testProcessElementWithStoppingReadingTopicPartition() throws Exception {
        Instant startReadTime = Instant.ofEpochMilli((long)1L);
        SerializableFunction<TopicPartition, Boolean> checkStopReadingFn = new SerializableFunction<TopicPartition, Boolean>(){

            public Boolean apply(TopicPartition input) {
                if (input.equals((Object)new TopicPartition("topic1", 1))) {
                    return true;
                }
                return false;
            }
        };
        WatchKafkaTopicPartitionDoFn dofnInstance = new WatchKafkaTopicPartitionDoFn(Duration.millis((long)600L), this.consumerFn, (SerializableFunction)checkStopReadingFn, (Map)ImmutableMap.of(), startReadTime, null);
        MockOutputReceiver outputReceiver = new MockOutputReceiver();
        Mockito.when((Object)this.mockConsumer.listTopics()).thenReturn((Object)ImmutableMap.of((Object)"topic1", (Object)ImmutableList.of((Object)new PartitionInfo("topic1", 0, null, null, null), (Object)new PartitionInfo("topic1", 1, null, null, null)), (Object)"topic2", (Object)ImmutableList.of((Object)new PartitionInfo("topic2", 0, null, null, null), (Object)new PartitionInfo("topic2", 1, null, null, null))));
        MockBagState bagState = new MockBagState((List<TopicPartition>)ImmutableList.of());
        Mockito.when((Object)this.timer.offset(Duration.millis((long)600L))).thenReturn((Object)this.timer);
        dofnInstance.processElement(this.timer, (BagState)bagState, (DoFn.OutputReceiver)outputReceiver);
        ((Timer)Mockito.verify((Object)this.timer, (VerificationMode)Mockito.times((int)1))).setRelative();
        ImmutableSet expectedOutputTopicPartitions = ImmutableSet.of((Object)new TopicPartition("topic1", 0), (Object)new TopicPartition("topic2", 0), (Object)new TopicPartition("topic2", 1));
        Set<KafkaSourceDescriptor> expectedOutputDescriptor = this.generateDescriptorsFromTopicPartitions((Set<TopicPartition>)expectedOutputTopicPartitions, startReadTime);
        Assert.assertEquals(expectedOutputDescriptor, new HashSet<KafkaSourceDescriptor>(outputReceiver.getOutputs()));
        Assert.assertEquals((Object)expectedOutputTopicPartitions, bagState.getCurrentStates());
    }

    @Test
    public void testOnTimerWithNoAvailableTopicPartition() throws Exception {
        WatchKafkaTopicPartitionDoFn dofnInstance = new WatchKafkaTopicPartitionDoFn(Duration.millis((long)600L), this.consumerFn, null, (Map)ImmutableMap.of(), null, null);
        MockOutputReceiver outputReceiver = new MockOutputReceiver();
        Mockito.when((Object)this.mockConsumer.listTopics()).thenReturn((Object)ImmutableMap.of());
        MockBagState bagState = new MockBagState((List<TopicPartition>)ImmutableList.of((Object)new TopicPartition("topic1", 0)));
        Instant now = Instant.EPOCH;
        PowerMockito.mockStatic(Instant.class, (Class[])new Class[0]);
        Mockito.when((Object)Instant.now()).thenReturn((Object)now);
        dofnInstance.onTimer(this.timer, (BagState)bagState, (DoFn.OutputReceiver)outputReceiver);
        ((Timer)Mockito.verify((Object)this.timer, (VerificationMode)Mockito.times((int)1))).set(now.plus(600L));
        Assert.assertTrue((boolean)outputReceiver.getOutputs().isEmpty());
        Assert.assertTrue((boolean)bagState.getCurrentStates().isEmpty());
    }

    @Test
    public void testOnTimerWithAdditionOnly() throws Exception {
        Instant startReadTime = Instant.ofEpochMilli((long)1L);
        WatchKafkaTopicPartitionDoFn dofnInstance = new WatchKafkaTopicPartitionDoFn(Duration.millis((long)600L), this.consumerFn, null, (Map)ImmutableMap.of(), startReadTime, null);
        MockOutputReceiver outputReceiver = new MockOutputReceiver();
        Mockito.when((Object)this.mockConsumer.listTopics()).thenReturn((Object)ImmutableMap.of((Object)"topic1", (Object)ImmutableList.of((Object)new PartitionInfo("topic1", 0, null, null, null), (Object)new PartitionInfo("topic1", 1, null, null, null)), (Object)"topic2", (Object)ImmutableList.of((Object)new PartitionInfo("topic2", 0, null, null, null), (Object)new PartitionInfo("topic2", 1, null, null, null))));
        MockBagState bagState = new MockBagState((List<TopicPartition>)ImmutableList.of((Object)new TopicPartition("topic1", 0), (Object)new TopicPartition("topic1", 1)));
        Instant now = Instant.EPOCH;
        PowerMockito.mockStatic(Instant.class, (Class[])new Class[0]);
        Mockito.when((Object)Instant.now()).thenReturn((Object)now);
        dofnInstance.onTimer(this.timer, (BagState)bagState, (DoFn.OutputReceiver)outputReceiver);
        ((Timer)Mockito.verify((Object)this.timer, (VerificationMode)Mockito.times((int)1))).set(now.plus(600L));
        ImmutableSet expectedOutputTopicPartitions = ImmutableSet.of((Object)new TopicPartition("topic2", 0), (Object)new TopicPartition("topic2", 1));
        ImmutableSet expectedCurrentTopicPartitions = ImmutableSet.of((Object)new TopicPartition("topic1", 0), (Object)new TopicPartition("topic1", 1), (Object)new TopicPartition("topic2", 0), (Object)new TopicPartition("topic2", 1));
        Set<KafkaSourceDescriptor> expectedOutputDescriptor = this.generateDescriptorsFromTopicPartitions((Set<TopicPartition>)expectedOutputTopicPartitions, startReadTime);
        Assert.assertEquals(expectedOutputDescriptor, new HashSet<KafkaSourceDescriptor>(outputReceiver.getOutputs()));
        Assert.assertEquals((Object)expectedCurrentTopicPartitions, bagState.getCurrentStates());
    }

    @Test
    public void testOnTimerWithRemovalOnly() throws Exception {
        Instant startReadTime = Instant.ofEpochMilli((long)1L);
        WatchKafkaTopicPartitionDoFn dofnInstance = new WatchKafkaTopicPartitionDoFn(Duration.millis((long)600L), this.consumerFn, null, (Map)ImmutableMap.of(), startReadTime, null);
        MockOutputReceiver outputReceiver = new MockOutputReceiver();
        Mockito.when((Object)this.mockConsumer.listTopics()).thenReturn((Object)ImmutableMap.of((Object)"topic1", (Object)ImmutableList.of((Object)new PartitionInfo("topic1", 0, null, null, null)), (Object)"topic2", (Object)ImmutableList.of((Object)new PartitionInfo("topic2", 0, null, null, null), (Object)new PartitionInfo("topic2", 1, null, null, null))));
        MockBagState bagState = new MockBagState((List<TopicPartition>)ImmutableList.of((Object)new TopicPartition("topic1", 0), (Object)new TopicPartition("topic1", 1), (Object)new TopicPartition("topic2", 0), (Object)new TopicPartition("topic2", 1)));
        Instant now = Instant.EPOCH;
        PowerMockito.mockStatic(Instant.class, (Class[])new Class[0]);
        Mockito.when((Object)Instant.now()).thenReturn((Object)now);
        dofnInstance.onTimer(this.timer, (BagState)bagState, (DoFn.OutputReceiver)outputReceiver);
        ((Timer)Mockito.verify((Object)this.timer, (VerificationMode)Mockito.times((int)1))).set(now.plus(600L));
        ImmutableSet expectedCurrentTopicPartitions = ImmutableSet.of((Object)new TopicPartition("topic1", 0), (Object)new TopicPartition("topic2", 0), (Object)new TopicPartition("topic2", 1));
        Assert.assertTrue((boolean)outputReceiver.getOutputs().isEmpty());
        Assert.assertEquals((Object)expectedCurrentTopicPartitions, bagState.getCurrentStates());
    }

    @Test
    public void testOnTimerWithStoppedTopicPartitions() throws Exception {
        Instant startReadTime = Instant.ofEpochMilli((long)1L);
        SerializableFunction<TopicPartition, Boolean> checkStopReadingFn = new SerializableFunction<TopicPartition, Boolean>(){

            public Boolean apply(TopicPartition input) {
                if (input.equals((Object)new TopicPartition("topic1", 1))) {
                    return true;
                }
                return false;
            }
        };
        WatchKafkaTopicPartitionDoFn dofnInstance = new WatchKafkaTopicPartitionDoFn(Duration.millis((long)600L), this.consumerFn, (SerializableFunction)checkStopReadingFn, (Map)ImmutableMap.of(), startReadTime, null);
        MockOutputReceiver outputReceiver = new MockOutputReceiver();
        Mockito.when((Object)this.mockConsumer.listTopics()).thenReturn((Object)ImmutableMap.of((Object)"topic1", (Object)ImmutableList.of((Object)new PartitionInfo("topic1", 0, null, null, null), (Object)new PartitionInfo("topic1", 1, null, null, null)), (Object)"topic2", (Object)ImmutableList.of((Object)new PartitionInfo("topic2", 0, null, null, null), (Object)new PartitionInfo("topic2", 1, null, null, null))));
        MockBagState bagState = new MockBagState((List<TopicPartition>)ImmutableList.of((Object)new TopicPartition("topic1", 0), (Object)new TopicPartition("topic2", 0), (Object)new TopicPartition("topic2", 1)));
        Instant now = Instant.EPOCH;
        PowerMockito.mockStatic(Instant.class, (Class[])new Class[0]);
        Mockito.when((Object)Instant.now()).thenReturn((Object)now);
        dofnInstance.onTimer(this.timer, (BagState)bagState, (DoFn.OutputReceiver)outputReceiver);
        ImmutableSet expectedCurrentTopicPartitions = ImmutableSet.of((Object)new TopicPartition("topic1", 0), (Object)new TopicPartition("topic2", 0), (Object)new TopicPartition("topic2", 1));
        ((Timer)Mockito.verify((Object)this.timer, (VerificationMode)Mockito.times((int)1))).set(now.plus(600L));
        Assert.assertTrue((boolean)outputReceiver.getOutputs().isEmpty());
        Assert.assertEquals((Object)expectedCurrentTopicPartitions, bagState.getCurrentStates());
    }

    private Set<KafkaSourceDescriptor> generateDescriptorsFromTopicPartitions(Set<TopicPartition> topicPartitions, Instant startReadTime) {
        return topicPartitions.stream().map(topicPartition -> KafkaSourceDescriptor.of((TopicPartition)topicPartition, null, (Instant)startReadTime, null)).collect(Collectors.toSet());
    }

    private static class MockBagState
    implements BagState<TopicPartition> {
        private Set<TopicPartition> topicPartitions = new HashSet<TopicPartition>();

        MockBagState(List<TopicPartition> readReturn) {
            this.topicPartitions.addAll(readReturn);
        }

        public Iterable<TopicPartition> read() {
            return this.topicPartitions;
        }

        public void add(TopicPartition value) {
            this.topicPartitions.add(value);
        }

        public ReadableState<Boolean> isEmpty() {
            return null;
        }

        public BagState<TopicPartition> readLater() {
            return null;
        }

        public void clear() {
            this.topicPartitions.clear();
        }

        public Set<TopicPartition> getCurrentStates() {
            return this.topicPartitions;
        }
    }

    private static class MockOutputReceiver
    implements DoFn.OutputReceiver<KafkaSourceDescriptor> {
        private List<KafkaSourceDescriptor> outputs = new ArrayList<KafkaSourceDescriptor>();

        private MockOutputReceiver() {
        }

        public void output(KafkaSourceDescriptor output) {
            this.outputs.add(output);
        }

        public void outputWithTimestamp(KafkaSourceDescriptor output, Instant timestamp) {
        }

        public List<KafkaSourceDescriptor> getOutputs() {
            return this.outputs;
        }
    }
}

