package org.apache.beam.sdk.io.kafka;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({Instant.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.class */
public class WatchKafkaTopicPartitionDoFnTest {

    @Mock
    Consumer<byte[], byte[]> mockConsumer;

    @Mock
    Timer timer;
    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFn = new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>() { // from class: org.apache.beam.sdk.io.kafka.WatchKafkaTopicPartitionDoFnTest.1
        public Consumer<byte[], byte[]> apply(Map<String, Object> map) {
            return WatchKafkaTopicPartitionDoFnTest.this.mockConsumer;
        }
    };

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest$MockBagState.class */
    private static class MockBagState implements BagState<TopicPartition> {
        private Set<TopicPartition> topicPartitions = new HashSet();

        MockBagState(List<TopicPartition> list) {
            this.topicPartitions.addAll(list);
        }

        /* renamed from: read, reason: merged with bridge method [inline-methods] */
        public Iterable<TopicPartition> m13read() {
            return this.topicPartitions;
        }

        public void add(TopicPartition topicPartition) {
            this.topicPartitions.add(topicPartition);
        }

        public ReadableState<Boolean> isEmpty() {
            return null;
        }

        /* renamed from: readLater, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public BagState<TopicPartition> m12readLater() {
            return null;
        }

        public void clear() {
            this.topicPartitions.clear();
        }

        public Set<TopicPartition> getCurrentStates() {
            return this.topicPartitions;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest$MockOutputReceiver.class */
    private static class MockOutputReceiver implements DoFn.OutputReceiver<KafkaSourceDescriptor> {
        private List<KafkaSourceDescriptor> outputs;

        private MockOutputReceiver() {
            this.outputs = new ArrayList();
        }

        public void output(KafkaSourceDescriptor kafkaSourceDescriptor) {
            this.outputs.add(kafkaSourceDescriptor);
        }

        public void outputWithTimestamp(KafkaSourceDescriptor kafkaSourceDescriptor, Instant instant) {
        }

        public List<KafkaSourceDescriptor> getOutputs() {
            return this.outputs;
        }
    }

    @Test
    public void testGetAllTopicPartitions() throws Exception {
        Mockito.when(this.mockConsumer.listTopics()).thenReturn(ImmutableMap.of("topic1", ImmutableList.of(new PartitionInfo("topic1", 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic1", 1, (Node) null, (Node[]) null, (Node[]) null)), "topic2", ImmutableList.of(new PartitionInfo("topic2", 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic2", 1, (Node) null, (Node[]) null, (Node[]) null))));
        Assert.assertEquals(ImmutableSet.of(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic2", 0), new TopicPartition("topic2", 1)), new WatchKafkaTopicPartitionDoFn(Duration.millis(1L), this.consumerFn, (SerializableFunction) null, ImmutableMap.of(), (Instant) null, (Instant) null, (List) null).getAllTopicPartitions());
    }

    @Test
    public void testGetAllTopicPartitionsWithGivenTopics() throws Exception {
        ImmutableList of = ImmutableList.of("topic1", "topic2");
        Mockito.when(this.mockConsumer.partitionsFor("topic1")).thenReturn(ImmutableList.of(new PartitionInfo("topic1", 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic1", 1, (Node) null, (Node[]) null, (Node[]) null)));
        Mockito.when(this.mockConsumer.partitionsFor("topic2")).thenReturn(ImmutableList.of(new PartitionInfo("topic2", 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic2", 1, (Node) null, (Node[]) null, (Node[]) null)));
        WatchKafkaTopicPartitionDoFn watchKafkaTopicPartitionDoFn = new WatchKafkaTopicPartitionDoFn(Duration.millis(1L), this.consumerFn, (SerializableFunction) null, ImmutableMap.of(), (Instant) null, (Instant) null, of);
        ((Consumer) Mockito.verify(this.mockConsumer, Mockito.never())).listTopics();
        Assert.assertEquals(ImmutableSet.of(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic2", 0), new TopicPartition("topic2", 1)), watchKafkaTopicPartitionDoFn.getAllTopicPartitions());
    }

    @Test
    public void testProcessElementWhenNoAvailableTopicPartition() throws Exception {
        WatchKafkaTopicPartitionDoFn watchKafkaTopicPartitionDoFn = new WatchKafkaTopicPartitionDoFn(Duration.millis(600L), this.consumerFn, (SerializableFunction) null, ImmutableMap.of(), (Instant) null, (Instant) null, (List) null);
        MockOutputReceiver mockOutputReceiver = new MockOutputReceiver();
        Mockito.when(this.mockConsumer.listTopics()).thenReturn(ImmutableMap.of());
        MockBagState mockBagState = new MockBagState(ImmutableList.of());
        Mockito.when(this.timer.offset(Duration.millis(600L))).thenReturn(this.timer);
        watchKafkaTopicPartitionDoFn.processElement(this.timer, mockBagState, mockOutputReceiver);
        ((Timer) Mockito.verify(this.timer, Mockito.times(1))).setRelative();
        Assert.assertTrue(mockOutputReceiver.getOutputs().isEmpty());
        Assert.assertTrue(mockBagState.getCurrentStates().isEmpty());
    }

    @Test
    public void testProcessElementWithAvailableTopicPartitions() throws Exception {
        Instant ofEpochMilli = Instant.ofEpochMilli(1L);
        WatchKafkaTopicPartitionDoFn watchKafkaTopicPartitionDoFn = new WatchKafkaTopicPartitionDoFn(Duration.millis(600L), this.consumerFn, (SerializableFunction) null, ImmutableMap.of(), ofEpochMilli, (Instant) null, (List) null);
        MockOutputReceiver mockOutputReceiver = new MockOutputReceiver();
        Mockito.when(this.mockConsumer.listTopics()).thenReturn(ImmutableMap.of("topic1", ImmutableList.of(new PartitionInfo("topic1", 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic1", 1, (Node) null, (Node[]) null, (Node[]) null)), "topic2", ImmutableList.of(new PartitionInfo("topic2", 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic2", 1, (Node) null, (Node[]) null, (Node[]) null))));
        MockBagState mockBagState = new MockBagState(ImmutableList.of());
        Mockito.when(this.timer.offset(Duration.millis(600L))).thenReturn(this.timer);
        watchKafkaTopicPartitionDoFn.processElement(this.timer, mockBagState, mockOutputReceiver);
        ((Timer) Mockito.verify(this.timer, Mockito.times(1))).setRelative();
        ImmutableSet of = ImmutableSet.of(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic2", 0), new TopicPartition("topic2", 1));
        Assert.assertEquals(generateDescriptorsFromTopicPartitions(of, ofEpochMilli), new HashSet(mockOutputReceiver.getOutputs()));
        Assert.assertEquals(of, mockBagState.getCurrentStates());
    }

    @Test
    public void testProcessElementWithStoppingReadingTopicPartition() throws Exception {
        Instant ofEpochMilli = Instant.ofEpochMilli(1L);
        WatchKafkaTopicPartitionDoFn watchKafkaTopicPartitionDoFn = new WatchKafkaTopicPartitionDoFn(Duration.millis(600L), this.consumerFn, new SerializableFunction<TopicPartition, Boolean>() { // from class: org.apache.beam.sdk.io.kafka.WatchKafkaTopicPartitionDoFnTest.2
            public Boolean apply(TopicPartition topicPartition) {
                return topicPartition.equals(new TopicPartition("topic1", 1));
            }
        }, ImmutableMap.of(), ofEpochMilli, (Instant) null, (List) null);
        MockOutputReceiver mockOutputReceiver = new MockOutputReceiver();
        Mockito.when(this.mockConsumer.listTopics()).thenReturn(ImmutableMap.of("topic1", ImmutableList.of(new PartitionInfo("topic1", 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic1", 1, (Node) null, (Node[]) null, (Node[]) null)), "topic2", ImmutableList.of(new PartitionInfo("topic2", 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic2", 1, (Node) null, (Node[]) null, (Node[]) null))));
        MockBagState mockBagState = new MockBagState(ImmutableList.of());
        Mockito.when(this.timer.offset(Duration.millis(600L))).thenReturn(this.timer);
        watchKafkaTopicPartitionDoFn.processElement(this.timer, mockBagState, mockOutputReceiver);
        ((Timer) Mockito.verify(this.timer, Mockito.times(1))).setRelative();
        ImmutableSet of = ImmutableSet.of(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0), new TopicPartition("topic2", 1));
        Assert.assertEquals(generateDescriptorsFromTopicPartitions(of, ofEpochMilli), new HashSet(mockOutputReceiver.getOutputs()));
        Assert.assertEquals(of, mockBagState.getCurrentStates());
    }

    @Test
    public void testOnTimerWithNoAvailableTopicPartition() throws Exception {
        WatchKafkaTopicPartitionDoFn watchKafkaTopicPartitionDoFn = new WatchKafkaTopicPartitionDoFn(Duration.millis(600L), this.consumerFn, (SerializableFunction) null, ImmutableMap.of(), (Instant) null, (Instant) null, (List) null);
        MockOutputReceiver mockOutputReceiver = new MockOutputReceiver();
        Mockito.when(this.mockConsumer.listTopics()).thenReturn(ImmutableMap.of());
        MockBagState mockBagState = new MockBagState(ImmutableList.of(new TopicPartition("topic1", 0)));
        Instant instant = Instant.EPOCH;
        PowerMockito.mockStatic(Instant.class, new Class[0]);
        Mockito.when(Instant.now()).thenReturn(instant);
        watchKafkaTopicPartitionDoFn.onTimer(this.timer, mockBagState, mockOutputReceiver);
        ((Timer) Mockito.verify(this.timer, Mockito.times(1))).set(instant.plus(Duration.millis(600L)));
        Assert.assertTrue(mockOutputReceiver.getOutputs().isEmpty());
        Assert.assertTrue(mockBagState.getCurrentStates().isEmpty());
    }

    @Test
    public void testOnTimerWithAdditionOnly() throws Exception {
        Instant ofEpochMilli = Instant.ofEpochMilli(1L);
        WatchKafkaTopicPartitionDoFn watchKafkaTopicPartitionDoFn = new WatchKafkaTopicPartitionDoFn(Duration.millis(600L), this.consumerFn, (SerializableFunction) null, ImmutableMap.of(), ofEpochMilli, (Instant) null, (List) null);
        MockOutputReceiver mockOutputReceiver = new MockOutputReceiver();
        Mockito.when(this.mockConsumer.listTopics()).thenReturn(ImmutableMap.of("topic1", ImmutableList.of(new PartitionInfo("topic1", 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic1", 1, (Node) null, (Node[]) null, (Node[]) null)), "topic2", ImmutableList.of(new PartitionInfo("topic2", 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic2", 1, (Node) null, (Node[]) null, (Node[]) null))));
        MockBagState mockBagState = new MockBagState(ImmutableList.of(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1)));
        Instant instant = Instant.EPOCH;
        PowerMockito.mockStatic(Instant.class, new Class[0]);
        Mockito.when(Instant.now()).thenReturn(instant);
        watchKafkaTopicPartitionDoFn.onTimer(this.timer, mockBagState, mockOutputReceiver);
        ((Timer) Mockito.verify(this.timer, Mockito.times(1))).set(instant.plus(Duration.millis(600L)));
        ImmutableSet of = ImmutableSet.of(new TopicPartition("topic2", 0), new TopicPartition("topic2", 1));
        ImmutableSet of2 = ImmutableSet.of(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic2", 0), new TopicPartition("topic2", 1));
        Assert.assertEquals(generateDescriptorsFromTopicPartitions(of, ofEpochMilli), new HashSet(mockOutputReceiver.getOutputs()));
        Assert.assertEquals(of2, mockBagState.getCurrentStates());
    }

    @Test
    public void testOnTimerWithRemovalOnly() throws Exception {
        WatchKafkaTopicPartitionDoFn watchKafkaTopicPartitionDoFn = new WatchKafkaTopicPartitionDoFn(Duration.millis(600L), this.consumerFn, (SerializableFunction) null, ImmutableMap.of(), Instant.ofEpochMilli(1L), (Instant) null, (List) null);
        MockOutputReceiver mockOutputReceiver = new MockOutputReceiver();
        Mockito.when(this.mockConsumer.listTopics()).thenReturn(ImmutableMap.of("topic1", ImmutableList.of(new PartitionInfo("topic1", 0, (Node) null, (Node[]) null, (Node[]) null)), "topic2", ImmutableList.of(new PartitionInfo("topic2", 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic2", 1, (Node) null, (Node[]) null, (Node[]) null))));
        MockBagState mockBagState = new MockBagState(ImmutableList.of(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic2", 0), new TopicPartition("topic2", 1)));
        Instant instant = Instant.EPOCH;
        PowerMockito.mockStatic(Instant.class, new Class[0]);
        Mockito.when(Instant.now()).thenReturn(instant);
        watchKafkaTopicPartitionDoFn.onTimer(this.timer, mockBagState, mockOutputReceiver);
        ((Timer) Mockito.verify(this.timer, Mockito.times(1))).set(instant.plus(Duration.millis(600L)));
        ImmutableSet of = ImmutableSet.of(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0), new TopicPartition("topic2", 1));
        Assert.assertTrue(mockOutputReceiver.getOutputs().isEmpty());
        Assert.assertEquals(of, mockBagState.getCurrentStates());
    }

    @Test
    public void testOnTimerWithStoppedTopicPartitions() throws Exception {
        Instant ofEpochMilli = Instant.ofEpochMilli(1L);
        WatchKafkaTopicPartitionDoFn watchKafkaTopicPartitionDoFn = new WatchKafkaTopicPartitionDoFn(Duration.millis(600L), this.consumerFn, new SerializableFunction<TopicPartition, Boolean>() { // from class: org.apache.beam.sdk.io.kafka.WatchKafkaTopicPartitionDoFnTest.3
            public Boolean apply(TopicPartition topicPartition) {
                return topicPartition.equals(new TopicPartition("topic1", 1));
            }
        }, ImmutableMap.of(), ofEpochMilli, (Instant) null, (List) null);
        MockOutputReceiver mockOutputReceiver = new MockOutputReceiver();
        Mockito.when(this.mockConsumer.listTopics()).thenReturn(ImmutableMap.of("topic1", ImmutableList.of(new PartitionInfo("topic1", 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic1", 1, (Node) null, (Node[]) null, (Node[]) null)), "topic2", ImmutableList.of(new PartitionInfo("topic2", 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic2", 1, (Node) null, (Node[]) null, (Node[]) null))));
        MockBagState mockBagState = new MockBagState(ImmutableList.of(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0), new TopicPartition("topic2", 1)));
        Instant instant = Instant.EPOCH;
        PowerMockito.mockStatic(Instant.class, new Class[0]);
        Mockito.when(Instant.now()).thenReturn(instant);
        watchKafkaTopicPartitionDoFn.onTimer(this.timer, mockBagState, mockOutputReceiver);
        ImmutableSet of = ImmutableSet.of(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0), new TopicPartition("topic2", 1));
        ((Timer) Mockito.verify(this.timer, Mockito.times(1))).set(instant.plus(Duration.millis(600L)));
        Assert.assertTrue(mockOutputReceiver.getOutputs().isEmpty());
        Assert.assertEquals(of, mockBagState.getCurrentStates());
    }

    private Set<KafkaSourceDescriptor> generateDescriptorsFromTopicPartitions(Set<TopicPartition> set, Instant instant) {
        return (Set) set.stream().map(topicPartition -> {
            return KafkaSourceDescriptor.of(topicPartition, (Long) null, instant, (Long) null, (Instant) null, (List) null);
        }).collect(Collectors.toSet());
    }
}
