package io.confluent.kafka.schemaregistry.validator.integration;

import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.RestApp;
import io.confluent.kafka.schemaregistry.avro.AvroUtils;
import io.confluent.kafka.schemaregistry.validator.RecordSchemaValidator;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.server.KafkaConfig;
import kafka.utils.TestUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.junit.jupiter.api.Assertions;
import scala.Option;
import scala.collection.JavaConverters;

/* loaded from: input_file:io/confluent/kafka/schemaregistry/validator/integration/RecordSchemaValidatorIntegrationTestHarness.class */
public class RecordSchemaValidatorIntegrationTestHarness extends ClusterTestHarness {
    protected static final String TEST_TOPIC = "test";
    protected static final String TEST_SUBJECT = "testSubject";
    protected static final String TEST_SCHEMA_STRING = "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}";
    protected Admin adminClient;
    protected IndexedRecord mockAvroRecord;

    public RecordSchemaValidatorIntegrationTestHarness(int i, boolean z, String str) {
        super(i, z, str);
        this.mockAvroRecord = createAvroRecord(TEST_SCHEMA_STRING, Collections.singletonMap("name", "testUser"));
    }

    public void injectProperties(Properties properties) {
        this.schemaRegistryPort = Integer.valueOf(choosePort());
        properties.setProperty("confluent.schema.registry.url", getSchemaRegistryProtocol() + "://0.0.0.0:" + this.schemaRegistryPort);
        properties.setProperty("num.partitions", "1");
    }

    protected KafkaConfig getKafkaConfig(int i) {
        Option apply = Option.apply((Object) null);
        Properties createBrokerConfig = TestUtils.createBrokerConfig(i, this.zkConnect, false, false, TestUtils.RandomPort(), Option.apply((Object) null), apply, EMPTY_SASL_PROPERTIES, true, false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), Option.empty(), 1, false, 1, (short) 1, false);
        injectProperties(createBrokerConfig);
        return KafkaConfig.fromProps(createBrokerConfig);
    }

    private void registerTestSubject(RestApp restApp, String str, String str2) throws Exception {
        restApp.restClient.registerSchema(AvroUtils.parseSchema(str).canonicalString(), str2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Properties getProducerProps() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerList);
        properties.put("acks", "-1");
        properties.put("schema.registry.url", this.restApp.restConnect);
        properties.put("confluent.value.subject.name.strategy", RecordNameStrategy.class);
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaProducer<Object, Object> createProducer(Class<?> cls, Class<?> cls2) {
        Properties producerProps = getProducerProps();
        producerProps.put("key.serializer", cls);
        producerProps.put("value.serializer", cls2);
        return new KafkaProducer<>(producerProps);
    }

    protected KafkaProducer<Object, Object> createProducer(Class<?> cls, Class<?> cls2, String str) {
        Properties producerProps = getProducerProps();
        producerProps.put("key.serializer", cls);
        producerProps.put("value.serializer", cls2);
        producerProps.put("schema.registry.url", str);
        return new KafkaProducer<>(producerProps);
    }

    protected IndexedRecord createAvroRecord(String str, Map<String, Object> map) {
        GenericData.Record record = new GenericData.Record(new Schema.Parser().parse(str));
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            record.put(entry.getKey(), entry.getValue());
        }
        return record;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public byte[] messageContentWithInvalidSchemaId() {
        byte[] bArr = new byte[10];
        bArr[4] = 2;
        return bArr;
    }

    public void checkRegisterSubjectThroughRestClient(RestApp restApp, String str, String str2) throws Exception {
        registerTestSubject(restApp, str, str2);
        Assertions.assertEquals(1, restApp.restClient.getAllVersions(str2).size());
    }

    protected AdminClient createAdminClient() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerList);
        return AdminClient.create(properties);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createTopicThroughAdminClient(NewTopic newTopic) {
        this.adminClient = createAdminClient();
        try {
            this.adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
            TestUtils.waitForPartitionMetadata(JavaConverters.asScalaBuffer(this.servers), newTopic.name(), 0, 30000L);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void checkMetricsAreRecorded() {
        Assertions.assertEquals(1L, KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().stream().filter(metricName -> {
            return metricName.getMBeanName().startsWith("kafka.log:type=InterceptorMetrics,name=TotalRejectedRecordsPerSec,topic=test,interceptorClassName=" + RecordSchemaValidator.class.getName());
        }).count());
    }
}
