/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.junit.Assert;
import org.junit.Test;

public class RecordCollectorTest {
    private List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]));
    private Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
    private final StringSerializer stringSerializer = new StringSerializer();
    private final StreamPartitioner<String, String> streamPartitioner = new StreamPartitioner<String, String>(){

        public Integer partition(String key, String value, int numPartitions) {
            return Integer.parseInt(key) % numPartitions;
        }
    };

    @Test
    public void testSpecificPartition() {
        RecordCollector collector = new RecordCollector((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer), "RecordCollectorTest-TestSpecificPartition");
        collector.send(new ProducerRecord("topic1", Integer.valueOf(0), (Object)"999", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send(new ProducerRecord("topic1", Integer.valueOf(0), (Object)"999", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send(new ProducerRecord("topic1", Integer.valueOf(0), (Object)"999", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send(new ProducerRecord("topic1", Integer.valueOf(1), (Object)"999", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send(new ProducerRecord("topic1", Integer.valueOf(1), (Object)"999", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send(new ProducerRecord("topic1", Integer.valueOf(2), (Object)"999", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        Map offsets = collector.offsets();
        Assert.assertEquals((Object)2L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals((Object)1L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals((Object)0L, offsets.get(new TopicPartition("topic1", 2)));
        collector.send(new ProducerRecord("topic1", Integer.valueOf(0), (Object)"999", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send(new ProducerRecord("topic1", Integer.valueOf(1), (Object)"999", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send(new ProducerRecord("topic1", Integer.valueOf(2), (Object)"999", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        Assert.assertEquals((Object)3L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals((Object)2L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals((Object)1L, offsets.get(new TopicPartition("topic1", 2)));
    }

    @Test
    public void testStreamPartitioner() {
        RecordCollector collector = new RecordCollector((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer), "RecordCollectorTest-TestStreamPartitioner");
        collector.send(new ProducerRecord("topic1", (Object)"3", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send(new ProducerRecord("topic1", (Object)"9", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send(new ProducerRecord("topic1", (Object)"27", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send(new ProducerRecord("topic1", (Object)"81", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send(new ProducerRecord("topic1", (Object)"243", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send(new ProducerRecord("topic1", (Object)"28", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send(new ProducerRecord("topic1", (Object)"82", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send(new ProducerRecord("topic1", (Object)"244", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send(new ProducerRecord("topic1", (Object)"245", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        Map offsets = collector.offsets();
        Assert.assertEquals((Object)4L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals((Object)2L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals((Object)0L, offsets.get(new TopicPartition("topic1", 2)));
    }

    @Test
    public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
        final AtomicInteger attempt = new AtomicInteger(0);
        RecordCollector collector = new RecordCollector((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                if (attempt.getAndIncrement() == 0) {
                    throw new TimeoutException();
                }
                return super.send(record, callback);
            }
        }, "test");
        collector.send(new ProducerRecord("topic1", (Object)"3", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        Long offset = (Long)collector.offsets().get(new TopicPartition("topic1", 0));
        Assert.assertEquals((Object)0L, (Object)offset);
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception {
        RecordCollector collector = new RecordCollector((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                throw new TimeoutException();
            }
        }, "test");
        collector.send(new ProducerRecord("topic1", (Object)"3", (Object)"0"), (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }
}

