/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.io.kafka.KafkaCommitOffset;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class KafkaCommitOffsetTest {
    private final TopicPartition partition = new TopicPartition("topic", 0);
    @Rule
    public ExpectedLogs expectedLogs = ExpectedLogs.none(KafkaCommitOffset.CommitOffsetDoFn.class);
    private final KafkaCommitOffsetMockConsumer consumer = new KafkaCommitOffsetMockConsumer(null, false);
    private final KafkaCommitOffsetMockConsumer errorConsumer = new KafkaCommitOffsetMockConsumer(null, true);

    @Test
    public void testCommitOffsetDoFn() {
        HashMap<String, String> configMap = new HashMap<String, String>();
        configMap.put("group.id", "group1");
        KafkaIO.ReadSourceDescriptors descriptors = KafkaIO.ReadSourceDescriptors.read().withBootstrapServers("bootstrap_server").withConsumerConfigUpdates(configMap).withConsumerFactoryFn((SerializableFunction)new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>(){

            public Consumer<byte[], byte[]> apply(Map<String, Object> input) {
                Assert.assertEquals((Object)"group1", (Object)input.get("group.id"));
                return KafkaCommitOffsetTest.this.consumer;
            }
        });
        KafkaCommitOffset.CommitOffsetDoFn doFn = new KafkaCommitOffset.CommitOffsetDoFn(descriptors);
        doFn.processElement(KV.of((Object)KafkaSourceDescriptor.of((TopicPartition)this.partition, null, null, null, null, null), (Object)1L));
        Assert.assertEquals((long)2L, (long)this.consumer.commit.get(this.partition).offset());
    }

    @Test
    public void testCommitOffsetError() {
        HashMap<String, String> configMap = new HashMap<String, String>();
        configMap.put("group.id", "group1");
        KafkaIO.ReadSourceDescriptors descriptors = KafkaIO.ReadSourceDescriptors.read().withBootstrapServers("bootstrap_server").withConsumerConfigUpdates(configMap).withConsumerFactoryFn((SerializableFunction)new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>(){

            public Consumer<byte[], byte[]> apply(Map<String, Object> input) {
                Assert.assertEquals((Object)"group1", (Object)input.get("group.id"));
                return KafkaCommitOffsetTest.this.errorConsumer;
            }
        });
        KafkaCommitOffset.CommitOffsetDoFn doFn = new KafkaCommitOffset.CommitOffsetDoFn(descriptors);
        doFn.processElement(KV.of((Object)KafkaSourceDescriptor.of((TopicPartition)this.partition, null, null, null, null, null), (Object)1L));
        this.expectedLogs.verifyWarn("Getting exception when committing offset: Test Exception");
    }

    private static class KafkaCommitOffsetMockConsumer
    extends MockConsumer<byte[], byte[]> {
        public Map<TopicPartition, OffsetAndMetadata> commit;
        private boolean throwException;

        public KafkaCommitOffsetMockConsumer(OffsetResetStrategy offsetResetStrategy, boolean throwException) {
            super(offsetResetStrategy);
            this.throwException = throwException;
        }

        public synchronized void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
            if (this.throwException) {
                throw new RuntimeException("Test Exception");
            }
            this.commitAsync(offsets, null);
            this.commit = offsets;
        }
    }
}

