package io.confluent.kafkarest.integration;

import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.v2.JsonConsumerRecord;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.GenericType;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafkarest/integration/ConsumerJsonTest.class */
public class ConsumerJsonTest extends AbstractConsumerTest {
    private static final String topicName = "topic1";
    private static final String groupName = "testconsumergroup";
    private final List<ProducerRecord<Object, Object>> recordsWithKeys = Arrays.asList(new ProducerRecord(topicName, "key", "value"), new ProducerRecord(topicName, "key", (Object) null), new ProducerRecord(topicName, "key", Double.valueOf(43.2d)), new ProducerRecord(topicName, "key", 999), new ProducerRecord(topicName, "key", exampleMapValue()), new ProducerRecord(topicName, "key", exampleListValue()));
    private final List<ProducerRecord<Object, Object>> recordsOnlyValues = Arrays.asList(new ProducerRecord(topicName, "value"), new ProducerRecord(topicName, (Object) null), new ProducerRecord(topicName, Double.valueOf(43.2d)), new ProducerRecord(topicName, 999), new ProducerRecord(topicName, exampleMapValue()), new ProducerRecord(topicName, exampleListValue()));
    private static final Logger log = LoggerFactory.getLogger(ConsumerJsonTest.class);
    private static final GenericType<List<JsonConsumerRecord>> jsonConsumerRecordType = new GenericType<List<JsonConsumerRecord>>() { // from class: io.confluent.kafkarest.integration.ConsumerJsonTest.1
    };

    private Map<String, Object> exampleMapValue() {
        HashMap hashMap = new HashMap();
        hashMap.put("foo", "bar");
        hashMap.put("bar", null);
        hashMap.put("baz", Double.valueOf(53.4d));
        hashMap.put("taz", 45);
        return hashMap;
    }

    private List<Object> exampleListValue() {
        ArrayList arrayList = new ArrayList();
        arrayList.add("foo");
        arrayList.add(null);
        arrayList.add(Double.valueOf(53.4d));
        arrayList.add(45);
        return arrayList;
    }

    @Override // io.confluent.kafkarest.integration.ClusterTestHarness
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        createTopic(topicName, 3, (short) 1);
    }

    @Test
    public void testConsumeWithKeys() {
        String startConsumeMessages = startConsumeMessages(groupName, topicName, EmbeddedFormat.JSON, "application/vnd.kafka.json.v2+json", "earliest");
        produceJsonMessages(this.recordsWithKeys);
        consumeMessages(startConsumeMessages, this.recordsWithKeys, "application/vnd.kafka.json.v2+json", "application/vnd.kafka.json.v2+json", jsonConsumerRecordType, null, (v0) -> {
            return v0.toConsumerRecord();
        });
        commitOffsets(startConsumeMessages);
    }

    @Test
    public void testConsumeOnlyValues() {
        String startConsumeMessages = startConsumeMessages(groupName, topicName, EmbeddedFormat.JSON, "application/vnd.kafka.json.v2+json", "earliest");
        produceJsonMessages(this.recordsOnlyValues);
        consumeMessages(startConsumeMessages, this.recordsOnlyValues, "application/vnd.kafka.json.v2+json", "application/vnd.kafka.json.v2+json", jsonConsumerRecordType, null, (v0) -> {
            return v0.toConsumerRecord();
        });
        commitOffsets(startConsumeMessages);
    }

    @Disabled("This test doesn't verify produce records and is flaky, to be fixed in KREST-10370")
    @Test
    public void testConsumeWithMultipleParallelConsumers() throws InterruptedException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        CountDownLatch countDownLatch4 = new CountDownLatch(1);
        newFixedThreadPool.submit(new Callable<Void>(0, countDownLatch, countDownLatch3, countDownLatch4) { // from class: io.confluent.kafkarest.integration.ConsumerJsonTest.1ConsumerTask
            private int index;
            private CountDownLatch doneLatch;
            private CountDownLatch readyForConsumeLoop;
            private CountDownLatch otherConsumerReadyForConsumeLoop;

            {
                this.index = 0;
                this.index = r5;
                this.doneLatch = countDownLatch;
                this.readyForConsumeLoop = countDownLatch3;
                this.otherConsumerReadyForConsumeLoop = countDownLatch4;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws InterruptedException {
                ConsumerJsonTest.log.info("Consumer instance {}, begin.", Integer.valueOf(this.index));
                String startConsumeMessages = ConsumerJsonTest.this.startConsumeMessages(ConsumerJsonTest.groupName, ConsumerJsonTest.topicName, EmbeddedFormat.JSON, "application/vnd.kafka.json.v2+json", "earliest");
                this.readyForConsumeLoop.countDown();
                ConsumerJsonTest.log.info("Consumer instance {}, subscribed, ready to consume in a loop.", Integer.valueOf(this.index));
                ConsumerJsonTest.log.info("Consumer instance {}, wait for other consumer to be ready to consume in a loop.");
                this.otherConsumerReadyForConsumeLoop.await();
                ConsumerJsonTest.log.info("Consumer instance {}, other consumer ready, will consume 20 time more.", Integer.valueOf(this.index));
                for (int i = 0; i < 20; i++) {
                    ConsumerJsonTest.this.consumeMessages(startConsumeMessages, new ArrayList(), "application/vnd.kafka.json.v2+json", "application/vnd.kafka.json.v2+json", ConsumerJsonTest.jsonConsumerRecordType, null, (v0) -> {
                        return v0.toConsumerRecord();
                    });
                }
                this.doneLatch.countDown();
                ConsumerJsonTest.log.info("Consumer instance {}, done.", Integer.valueOf(this.index));
                return null;
            }
        });
        newFixedThreadPool.submit(new Callable<Void>(1, countDownLatch2, countDownLatch4, countDownLatch3) { // from class: io.confluent.kafkarest.integration.ConsumerJsonTest.1ConsumerTask
            private int index;
            private CountDownLatch doneLatch;
            private CountDownLatch readyForConsumeLoop;
            private CountDownLatch otherConsumerReadyForConsumeLoop;

            {
                this.index = 0;
                this.index = r5;
                this.doneLatch = countDownLatch2;
                this.readyForConsumeLoop = countDownLatch4;
                this.otherConsumerReadyForConsumeLoop = countDownLatch3;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws InterruptedException {
                ConsumerJsonTest.log.info("Consumer instance {}, begin.", Integer.valueOf(this.index));
                String startConsumeMessages = ConsumerJsonTest.this.startConsumeMessages(ConsumerJsonTest.groupName, ConsumerJsonTest.topicName, EmbeddedFormat.JSON, "application/vnd.kafka.json.v2+json", "earliest");
                this.readyForConsumeLoop.countDown();
                ConsumerJsonTest.log.info("Consumer instance {}, subscribed, ready to consume in a loop.", Integer.valueOf(this.index));
                ConsumerJsonTest.log.info("Consumer instance {}, wait for other consumer to be ready to consume in a loop.");
                this.otherConsumerReadyForConsumeLoop.await();
                ConsumerJsonTest.log.info("Consumer instance {}, other consumer ready, will consume 20 time more.", Integer.valueOf(this.index));
                for (int i = 0; i < 20; i++) {
                    ConsumerJsonTest.this.consumeMessages(startConsumeMessages, new ArrayList(), "application/vnd.kafka.json.v2+json", "application/vnd.kafka.json.v2+json", ConsumerJsonTest.jsonConsumerRecordType, null, (v0) -> {
                        return v0.toConsumerRecord();
                    });
                }
                this.doneLatch.countDown();
                ConsumerJsonTest.log.info("Consumer instance {}, done.", Integer.valueOf(this.index));
                return null;
            }
        });
        Assertions.assertTrue(countDownLatch.await(120L, TimeUnit.SECONDS) && countDownLatch2.await(120L, TimeUnit.SECONDS), "test timed out waiting for consumers to finish.");
    }
}
