package io.confluent.kafkarest.integration;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaJsonSerializer;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.ScalaConsumersContext;
import io.confluent.kafkarest.TestUtils;
import io.confluent.kafkarest.entities.BinaryConsumerRecord;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.CreateConsumerInstanceResponse;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.Assert;

/* loaded from: input_file:io/confluent/kafkarest/integration/AbstractConsumerTest.class */
public class AbstractConsumerTest extends ClusterTestHarness {

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/confluent/kafkarest/integration/AbstractConsumerTest$Converter.class */
    public interface Converter {
        Object convert(Object obj);
    }

    public AbstractConsumerTest() {
    }

    public AbstractConsumerTest(int i, boolean z) {
        super(i, z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void produceBinaryMessages(List<ProducerRecord<byte[], byte[]>> list) {
        Properties properties = new Properties();
        properties.put("key.serializer", ByteArraySerializer.class);
        properties.put("value.serializer", ByteArraySerializer.class);
        properties.setProperty("bootstrap.servers", this.brokerList);
        properties.setProperty("acks", "all");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Iterator<ProducerRecord<byte[], byte[]>> it = list.iterator();
        while (it.hasNext()) {
            try {
                kafkaProducer.send(it.next()).get();
            } catch (Exception e) {
                Assert.fail("Consumer test couldn't produce input messages to Kafka: " + e);
            }
        }
        kafkaProducer.close();
    }

    @Override // io.confluent.kafkarest.integration.ClusterTestHarness
    protected ScalaConsumersContext getScalaConsumersContext(KafkaRestConfig kafkaRestConfig) {
        return new ScalaConsumersContext(kafkaRestConfig);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void produceJsonMessages(List<ProducerRecord<Object, Object>> list) {
        Properties properties = new Properties();
        properties.put("key.serializer", KafkaJsonSerializer.class);
        properties.put("value.serializer", KafkaJsonSerializer.class);
        properties.setProperty("bootstrap.servers", this.brokerList);
        properties.setProperty("acks", "all");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Iterator<ProducerRecord<Object, Object>> it = list.iterator();
        while (it.hasNext()) {
            try {
                kafkaProducer.send(it.next()).get();
            } catch (Exception e) {
                Assert.fail("Consumer test couldn't produce input messages to Kafka: " + e);
            }
        }
        kafkaProducer.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void produceAvroMessages(List<ProducerRecord<Object, Object>> list) {
        HashMap hashMap = new HashMap();
        hashMap.put("schema.registry.url", this.schemaRegConnect);
        KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer();
        kafkaAvroSerializer.configure(hashMap, true);
        KafkaAvroSerializer kafkaAvroSerializer2 = new KafkaAvroSerializer();
        kafkaAvroSerializer2.configure(hashMap, false);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerList);
        properties.put("acks", "all");
        properties.put("key.serializer", ByteArraySerializer.class);
        properties.put("value.serializer", ByteArraySerializer.class);
        KafkaProducer kafkaProducer = new KafkaProducer(properties, kafkaAvroSerializer, kafkaAvroSerializer2);
        Iterator<ProducerRecord<Object, Object>> it = list.iterator();
        while (it.hasNext()) {
            try {
                kafkaProducer.send(it.next()).get();
            } catch (Exception e) {
                Assert.fail("Consumer test couldn't produce input messages to Kafka: " + e);
            }
        }
        kafkaProducer.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Response createConsumerInstance(String str, String str2, String str3, EmbeddedFormat embeddedFormat) {
        ConsumerInstanceConfig consumerInstanceConfig = null;
        if (str2 != null || str3 != null || embeddedFormat != null) {
            consumerInstanceConfig = new ConsumerInstanceConfig(str2, str3, embeddedFormat != null ? embeddedFormat.toString() : null, (String) null, (String) null, (Integer) null, (Integer) null);
        }
        return request("/consumers/" + str).post(Entity.entity(consumerInstanceConfig, "application/vnd.kafka.v1+json"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String consumerNameFromInstanceUrl(String str) {
        try {
            String[] split = new URL(str).getPath().split("/");
            return split[split.length - 1];
        } catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String startConsumeMessages(String str, String str2, EmbeddedFormat embeddedFormat, String str3) {
        return startConsumeMessages(str, str2, embeddedFormat, str3, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String startConsumeMessages(String str, String str2, EmbeddedFormat embeddedFormat, String str3, boolean z) {
        Response createConsumerInstance = createConsumerInstance(str, null, null, embeddedFormat);
        TestUtils.assertOKResponse(createConsumerInstance, "application/vnd.kafka.v1+json");
        CreateConsumerInstanceResponse createConsumerInstanceResponse = (CreateConsumerInstanceResponse) TestUtils.tryReadEntityOrLog(createConsumerInstance, CreateConsumerInstanceResponse.class);
        Assert.assertNotNull(createConsumerInstanceResponse.getInstanceId());
        Assert.assertTrue(createConsumerInstanceResponse.getInstanceId().length() > 0);
        Assert.assertTrue("Base URI should contain the consumer instance ID", createConsumerInstanceResponse.getBaseUri().contains(createConsumerInstanceResponse.getInstanceId()));
        Response response = request(createConsumerInstanceResponse.getBaseUri() + "/topics/" + str2).accept(new String[]{str3}).get();
        if (z) {
            TestUtils.assertErrorResponse(Response.Status.NOT_FOUND, response, 40401, "Topic not found.", str3);
        } else {
            TestUtils.assertOKResponse(response, str3);
            Assert.assertEquals(0L, ((List) TestUtils.tryReadEntityOrLog(response, new GenericType<List<BinaryConsumerRecord>>() { // from class: io.confluent.kafkarest.integration.AbstractConsumerTest.1
            })).size());
        }
        return createConsumerInstanceResponse.getBaseUri();
    }

    protected <KafkaK, KafkaV, ClientK, ClientV, RecordType extends ConsumerRecord<ClientK, ClientV>> void assertEqualsMessages(List<ProducerRecord<KafkaK, KafkaV>> list, List<RecordType> list2, Converter converter) {
        HashMap hashMap = new HashMap();
        for (ProducerRecord<KafkaK, KafkaV> producerRecord : list) {
            Object encodeComparable = TestUtils.encodeComparable(converter != null ? converter.convert(producerRecord.key()) : producerRecord.key());
            Object encodeComparable2 = TestUtils.encodeComparable(converter != null ? converter.convert(producerRecord.value()) : producerRecord.value());
            hashMap.put(encodeComparable, Integer.valueOf((hashMap.get(encodeComparable) == null ? 0 : ((Integer) hashMap.get(encodeComparable)).intValue()) + 1));
            hashMap.put(encodeComparable2, Integer.valueOf((hashMap.get(encodeComparable2) == null ? 0 : ((Integer) hashMap.get(encodeComparable2)).intValue()) + 1));
        }
        HashMap hashMap2 = new HashMap();
        for (RecordType recordtype : list2) {
            Object encodeComparable3 = TestUtils.encodeComparable(recordtype.getKey());
            Object encodeComparable4 = TestUtils.encodeComparable(recordtype.getValue());
            hashMap2.put(encodeComparable3, Integer.valueOf((hashMap2.get(encodeComparable3) == null ? 0 : ((Integer) hashMap2.get(encodeComparable3)).intValue()) + 1));
            hashMap2.put(encodeComparable4, Integer.valueOf((hashMap2.get(encodeComparable4) == null ? 0 : ((Integer) hashMap2.get(encodeComparable4)).intValue()) + 1));
        }
        Assert.assertEquals(hashMap, hashMap2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <KafkaK, KafkaV, ClientK, ClientV, RecordType extends ConsumerRecord<ClientK, ClientV>> void simpleConsumeMessages(String str, int i, Integer num, List<ProducerRecord<KafkaK, KafkaV>> list, String str2, String str3, GenericType<List<RecordType>> genericType, Converter converter) {
        HashMap hashMap = new HashMap();
        hashMap.put("offset", Integer.toString(i));
        if (num != null) {
            hashMap.put("count", num.toString());
        }
        Response response = request("/topics/" + str + "/partitions/0/messages", hashMap).accept(new String[]{str2}).get();
        TestUtils.assertOKResponse(response, str3);
        List<RecordType> list2 = (List) TestUtils.tryReadEntityOrLog(response, genericType);
        Assert.assertEquals(list.size(), list2.size());
        assertEqualsMessages(list, list2, converter);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <KafkaK, KafkaV, ClientK, ClientV, RecordType extends ConsumerRecord<ClientK, ClientV>> void consumeMessages(String str, String str2, List<ProducerRecord<KafkaK, KafkaV>> list, String str3, String str4, GenericType<List<RecordType>> genericType, Converter converter) {
        Response response = request(str + "/topics/" + str2).accept(new String[]{str3}).get();
        TestUtils.assertOKResponse(response, str4);
        List<RecordType> list2 = (List) TestUtils.tryReadEntityOrLog(response, genericType);
        Assert.assertEquals(list.size(), list2.size());
        assertEqualsMessages(list, list2, converter);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <K, V, RecordType extends ConsumerRecord<K, V>> void consumeForTimeout(String str, String str2, String str3, String str4, GenericType<List<RecordType>> genericType) {
        long currentTimeMillis = System.currentTimeMillis();
        Response response = request(str + "/topics/" + str2).accept(new String[]{str3}).get();
        long currentTimeMillis2 = System.currentTimeMillis();
        TestUtils.assertOKResponse(response, str4);
        Assert.assertEquals(0L, ((List) TestUtils.tryReadEntityOrLog(response, genericType)).size());
        int intValue = this.restConfig.getInt("consumer.request.timeout.ms").intValue();
        int intValue2 = this.restConfig.getInt("consumer.iterator.backoff.ms").intValue() + this.restConfig.getInt("consumer.iterator.timeout.ms").intValue() + 500;
        long j = currentTimeMillis2 - currentTimeMillis;
        Assert.assertTrue("Consumer request should not return before the timeout when no data is available", j > ((long) intValue));
        Assert.assertTrue("Consumer request should timeout approximately within the request timeout period", j - ((long) intValue) < ((long) intValue2));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void commitOffsets(String str) {
        Response post = request(str + "/offsets/").post(Entity.entity((Object) null, "application/vnd.kafka.v1+json"));
        TestUtils.assertOKResponse(post, "application/vnd.kafka.v1+json");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void consumeForNotFoundError(String str, String str2) {
        TestUtils.assertErrorResponse(Response.Status.NOT_FOUND, request(str + "/topics/" + str2).get(), 40403, "Consumer instance not found.", "application/vnd.kafka.binary.v1+json");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void deleteConsumer(String str) {
        TestUtils.assertErrorResponse(Response.Status.NO_CONTENT, request(str).delete(), 0, null, "application/vnd.kafka.v1+json");
    }
}
