package io.confluent.kafkarest.integration.v2;

import com.fasterxml.jackson.databind.JsonNode;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.v2.ConsumerSubscriptionRecord;
import io.confluent.kafkarest.entities.v2.CreateConsumerInstanceRequest;
import io.confluent.kafkarest.entities.v2.CreateConsumerInstanceResponse;
import io.confluent.kafkarest.entities.v2.ProduceResponse;
import io.confluent.kafkarest.entities.v2.SchemaConsumerRecord;
import io.confluent.kafkarest.entities.v2.SchemaTopicProduceRequest;
import io.confluent.kafkarest.integration.ClusterTestHarness;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import javax.validation.ConstraintViolationException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("IntegrationTest")
/* loaded from: input_file:io/confluent/kafkarest/integration/v2/SchemaProduceConsumeTest.class */
public abstract class SchemaProduceConsumeTest extends ClusterTestHarness {
    private static final String TOPIC = "topic-1";
    private static final String CONSUMER_GROUP = "group-1";
    private static final Logger log = LoggerFactory.getLogger(SchemaProduceConsumeTest.class);

    protected abstract EmbeddedFormat getFormat();

    protected abstract String getContentType();

    protected abstract ParsedSchema getKeySchema();

    protected abstract ParsedSchema getValueSchema();

    protected abstract List<SchemaTopicProduceRequest.SchemaTopicProduceRecord> getProduceRecords();

    public SchemaProduceConsumeTest() {
        super(1, true);
    }

    @Test
    public void produceThenConsume_returnsExactlyProduced() throws Exception {
        createTopic(TOPIC, 1, (short) 1);
        Response post = request(String.format("/consumers/%s", CONSUMER_GROUP)).post(Entity.entity(new CreateConsumerInstanceRequest((String) null, (String) null, getFormat() != null ? getFormat().name() : null, (String) null, (String) null, (Integer) null, (Integer) null), "application/vnd.kafka.v2+json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        CreateConsumerInstanceResponse createConsumerInstanceResponse = (CreateConsumerInstanceResponse) post.readEntity(CreateConsumerInstanceResponse.class);
        Assertions.assertEquals(Response.Status.NO_CONTENT.getStatusCode(), request(String.format("/consumers/%s/instances/%s/subscription", CONSUMER_GROUP, createConsumerInstanceResponse.getInstanceId())).post(Entity.entity(new ConsumerSubscriptionRecord(Collections.singletonList(TOPIC), (String) null), "application/vnd.kafka.v2+json")).getStatus());
        request(String.format("/consumers/%s/instances/%s/records", CONSUMER_GROUP, createConsumerInstanceResponse.getInstanceId())).accept(new String[]{getContentType()}).get();
        Response post2 = request(String.format("/topics/%s", TOPIC)).post(Entity.entity(new SchemaTopicProduceRequest(getProduceRecords(), getKeySchema().canonicalString(), (Integer) null, getValueSchema().canonicalString(), (Integer) null), getContentType()));
        try {
            Assertions.assertEquals(Response.Status.OK, ((ProduceResponse) post2.readEntity(ProduceResponse.class)).getRequestStatus());
            Response response = request(String.format("/consumers/%s/instances/%s/records", CONSUMER_GROUP, createConsumerInstanceResponse.getInstanceId())).accept(new String[]{getContentType()}).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            assertMapEquals(producedToMap(getProduceRecords()), consumedToMap((List) response.readEntity(new GenericType<List<SchemaConsumerRecord>>() { // from class: io.confluent.kafkarest.integration.v2.SchemaProduceConsumeTest.1
            })));
        } catch (ConstraintViolationException e) {
            log.error("Can't parse produce response class: {} status: {} ", post2.getClass(), Integer.valueOf(post2.getStatus()));
            log.error("Reading entity using actual class: {}", post2.readEntity(post2.getClass()));
            e.printStackTrace();
            throw e;
        }
    }

    private static Map<JsonNode, JsonNode> producedToMap(List<SchemaTopicProduceRequest.SchemaTopicProduceRecord> list) {
        HashMap hashMap = new HashMap();
        for (SchemaTopicProduceRequest.SchemaTopicProduceRecord schemaTopicProduceRecord : list) {
            hashMap.put(schemaTopicProduceRecord.getKey(), schemaTopicProduceRecord.getValue());
        }
        return Collections.unmodifiableMap(hashMap);
    }

    private static Map<JsonNode, JsonNode> consumedToMap(List<SchemaConsumerRecord> list) {
        HashMap hashMap = new HashMap();
        for (SchemaConsumerRecord schemaConsumerRecord : list) {
            hashMap.put(schemaConsumerRecord.getKey(), schemaConsumerRecord.getValue());
        }
        return Collections.unmodifiableMap(hashMap);
    }

    private static <K, V> void assertMapEquals(Map<K, V> map, Map<K, V> map2) {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        for (Map.Entry<K, V> entry : map.entrySet()) {
            if (!map2.containsKey(entry.getKey())) {
                hashSet2.add(entry.getKey());
            } else if (!map2.get(entry.getKey()).equals(entry.getValue())) {
                hashSet3.add(entry.getKey());
            }
        }
        for (K k : map2.keySet()) {
            if (!map.containsKey(k)) {
                hashSet.add(k);
            }
        }
        if (hashSet.isEmpty() && hashSet2.isEmpty() && hashSet3.isEmpty()) {
            return;
        }
        Assertions.fail(String.format("Expected and actual are not equal. Extra: %s, Missing: %s, Different: %s", hashSet, hashSet2, hashSet3));
    }
}
