/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.server.plugins.auth;

import io.confluent.kafka.multitenant.MultiTenantInterceptor;
import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.metrics.TenantMetrics;
import io.confluent.kafka.server.plugins.auth.MultiTenantSaslConfigEntry;
import io.confluent.kafka.server.plugins.auth.MultiTenantSaslSecrets;
import io.confluent.kafka.server.plugins.auth.MultiTenantSaslSecretsLoaderTest;
import io.confluent.kafka.server.plugins.auth.MultiTenantSaslSecretsStore;
import io.confluent.kafka.server.plugins.auth.PlainSaslAuthenticator;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import kafka.server.BrokerSession;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.PublicCredential;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class MultiTenantSaslSecretsStoreTest {
    private static final String LISTENER_NAME = "listener-name";
    private static final String TOPIC = "confluent.cdc.api.keys.topic";
    public static final String JSON1 = MultiTenantSaslSecretsLoaderTest.JSON1;
    public static final String JSON2 = MultiTenantSaslSecretsLoaderTest.JSON2;
    public static final String JSON2_KEY = "key3";
    static final String JSON3 = MultiTenantSaslSecretsLoaderTest.JSON3;
    static final String MALFORMED_JSON = MultiTenantSaslSecretsLoaderTest.MALFORMED_JSON;
    static final String JSON_WITH_NULL = MultiTenantSaslSecretsLoaderTest.JSON_WITH_NULL;
    static final String JSON_WITH_MISSING_FIELDS = MultiTenantSaslSecretsLoaderTest.JSON_WITH_MISSING_FIELDS;
    public static final String JSON_WITH_USER_RESOURCE_ID_1 = MultiTenantSaslSecretsLoaderTest.JSON_WITH_USER_RESOURCE_ID_1;
    public static final String JSON_WITH_USER_RESOURCE_ID_2 = MultiTenantSaslSecretsLoaderTest.JSON_WITH_USER_RESOURCE_ID_2;
    public static final String KEY1_JSON_WITH_USER_RESOURCE_ID = "key1";
    public static final String KEY2_JSON_WITH_USER_RESOURCE_ID = "key2";
    private Metrics metrics;
    private MultiTenantSaslSecretsStore store;
    private KafkaBasedLog<String, String> secretsLog;

    @BeforeEach
    public void setUp() throws Exception {
        this.secretsLog = (KafkaBasedLog)Mockito.mock(KafkaBasedLog.class);
        this.metrics = new Metrics();
        this.store = new MultiTenantSaslSecretsStore(new HashMap(), this.metrics);
        this.store.configure(this.secretsLog, Collections.singletonList(LISTENER_NAME));
    }

    @AfterEach
    public void tearDown() {
        this.store.close();
        this.verifyMetricsRemoved();
        this.metrics.close();
    }

    @Test
    public void testRead() {
        this.storeStart();
        Assertions.assertNotNull((Object)this.store.getLastSequenceId());
        long seqId1 = 1L;
        ConsumerRecord<String, String> record = this.createConsumerRecord(seqId1, JSON2_KEY, JSON2);
        Assertions.assertEquals((int)0, (int)this.store.getLastSequenceId().size());
        this.store.read(record);
        Assertions.assertEquals((long)seqId1, (long)((Long)this.store.getLastSequenceId().get(JSON2_KEY)));
        this.verifyExpectedSecretsSize(1L);
        Assertions.assertTrue((boolean)this.store.load().entries().containsKey(JSON2_KEY));
        long seqId2 = seqId1 + 1L;
        record = this.createConsumerRecord(seqId2, JSON2_KEY, JSON3);
        Assertions.assertNotEquals((int)0, (int)this.store.getLastSequenceId().size());
        this.store.read(record);
        Assertions.assertEquals((long)seqId2, (long)((Long)this.store.getLastSequenceId().get(JSON2_KEY)));
        this.verifyExpectedSecretsSize(1L);
        Assertions.assertTrue((boolean)this.store.load().entries().containsKey(JSON2_KEY));
        record = this.createConsumerRecord(seqId2 + 1L, JSON2_KEY, null);
        this.store.read(record);
        Assertions.assertEquals((long)(seqId2 + 1L), (long)((Long)this.store.getLastSequenceId().get(JSON2_KEY)));
        this.verifyExpectedSecretsSize(0L);
        Assertions.assertEquals(Collections.emptyMap(), (Object)this.store.load().entries());
    }

    @Test
    public void testReadWithOldSeqId() {
        this.storeStart();
        Assertions.assertNotNull((Object)this.store.getLastSequenceId());
        long seqId1 = 1L;
        ConsumerRecord<String, String> record = this.createConsumerRecordWithOldSeqId(seqId1, JSON2_KEY, JSON2);
        Assertions.assertEquals((int)0, (int)this.store.getLastSequenceId().size());
        this.store.read(record);
        Assertions.assertEquals((long)seqId1, (long)((Long)this.store.getLastSequenceId().get(JSON2_KEY)));
        this.verifyExpectedSecretsSize(1L);
        Assertions.assertTrue((boolean)this.store.load().entries().containsKey(JSON2_KEY));
        long seqId2 = seqId1 + 1L;
        record = this.createConsumerRecordWithOldSeqId(seqId2, JSON2_KEY, JSON3);
        Assertions.assertNotEquals((int)0, (int)this.store.getLastSequenceId().size());
        this.store.read(record);
        Assertions.assertEquals((long)seqId2, (long)((Long)this.store.getLastSequenceId().get(JSON2_KEY)));
        this.verifyExpectedSecretsSize(1L);
        Assertions.assertTrue((boolean)this.store.load().entries().containsKey(JSON2_KEY));
        record = this.createConsumerRecordWithOldSeqId(seqId2 + 1L, JSON2_KEY, null);
        this.store.read(record);
        Assertions.assertEquals((long)(seqId2 + 1L), (long)((Long)this.store.getLastSequenceId().get(JSON2_KEY)));
        this.verifyExpectedSecretsSize(0L);
        Assertions.assertEquals(Collections.emptyMap(), (Object)this.store.load().entries());
        record = this.createConsumerRecord(seqId2 + 2L, JSON2_KEY, JSON3);
        Assertions.assertNotEquals((int)0, (int)this.store.getLastSequenceId().size());
        this.store.read(record);
        Assertions.assertEquals((long)(seqId2 + 2L), (long)((Long)this.store.getLastSequenceId().get(JSON2_KEY)));
        this.verifyExpectedSecretsSize(1L);
        Assertions.assertTrue((boolean)this.store.load().entries().containsKey(JSON2_KEY));
        record = this.createConsumerRecordWithOldSeqId(seqId2 + 3L, JSON2_KEY, null);
        this.store.read(record);
        Assertions.assertEquals((long)(seqId2 + 3L), (long)((Long)this.store.getLastSequenceId().get(JSON2_KEY)));
        this.verifyExpectedSecretsSize(0L);
        Assertions.assertEquals(Collections.emptyMap(), (Object)this.store.load().entries());
    }

    @Test
    public void testInvalidEntries() {
        this.storeStart();
        Assertions.assertNotNull((Object)this.store.getLastSequenceId());
        long seqId = 1L;
        ConsumerRecord<String, String> record = this.createConsumerRecord(seqId, "notkey3", JSON2);
        Assertions.assertEquals((int)0, (int)this.store.getLastSequenceId().size());
        this.store.read(record);
        Assertions.assertEquals(Collections.emptyMap(), (Object)this.store.load().entries());
        ConsumerRecord<String, String> record2 = this.createConsumerRecord(++seqId, "dontcare", JSON1);
        Assertions.assertEquals((int)1, (int)this.store.getLastSequenceId().size());
        this.store.read(record);
        this.store.read(record2);
        Assertions.assertEquals(Collections.emptyMap(), (Object)this.store.load().entries());
    }

    @Test
    public void testReadRecordWithMissingHeader() {
        String key = "dontcare";
        this.storeStart();
        Assertions.assertNotNull((Object)this.store.getLastSequenceId());
        ConsumerRecord record = new ConsumerRecord(TOPIC, 0, 0L, (Object)key, (Object)JSON1);
        Assertions.assertEquals((int)0, (int)this.store.getLastSequenceId().size());
        this.store.read(record);
        Assertions.assertEquals((int)0, (int)this.store.getLastSequenceId().size());
    }

    @Test
    public void testMultipleStarts() throws InterruptedException {
        this.storeStart();
        Assertions.assertThrows(IllegalStateException.class, () -> this.storeStart(), (String)"Starting the store twice should throw");
    }

    @Test
    public void testRecordMessageInvalid() {
        String key = "dontcare";
        this.storeStart();
        Assertions.assertEquals((int)0, (int)this.store.getLastSequenceId().size());
        MultiTenantSaslSecrets startSecrets = this.store.load();
        ConsumerRecord<String, String> record1 = this.createConsumerRecord(123L, key, "invalid-msg");
        this.store.read(record1);
        Assertions.assertEquals((long)123L, (long)((Long)this.store.getLastSequenceId().get(key)));
        Assertions.assertEquals((Object)startSecrets, (Object)this.store.load());
        ConsumerRecord<String, String> record2 = this.createConsumerRecord(124L, key, MALFORMED_JSON);
        this.store.read(record2);
        Assertions.assertEquals((long)124L, (long)((Long)this.store.getLastSequenceId().get(key)));
        Assertions.assertEquals((Object)startSecrets, (Object)this.store.load());
        ConsumerRecord<String, String> record3 = this.createConsumerRecord(125L, key, JSON_WITH_NULL);
        this.store.read(record3);
        Assertions.assertEquals((long)125L, (long)((Long)this.store.getLastSequenceId().get(key)));
        Assertions.assertEquals((Object)startSecrets, (Object)this.store.load());
        ConsumerRecord<String, String> record4 = this.createConsumerRecord(126L, key, JSON_WITH_MISSING_FIELDS);
        this.store.read(record4);
        Assertions.assertEquals((long)126L, (long)((Long)this.store.getLastSequenceId().get(key)));
        Assertions.assertEquals((Object)startSecrets, (Object)this.store.load());
    }

    @Test
    public void testRecordMessageInvalidWithOldSeqId() {
        String key = "dontcare";
        this.storeStart();
        Assertions.assertEquals((int)0, (int)this.store.getLastSequenceId().size());
        MultiTenantSaslSecrets startSecrets = this.store.load();
        ConsumerRecord<String, String> record1 = this.createConsumerRecordWithOldSeqId(123L, key, "invalid-msg");
        this.store.read(record1);
        Assertions.assertEquals((long)123L, (long)((Long)this.store.getLastSequenceId().get(key)));
        Assertions.assertEquals((Object)startSecrets, (Object)this.store.load());
        ConsumerRecord<String, String> record2 = this.createConsumerRecordWithOldSeqId(124L, key, MALFORMED_JSON);
        this.store.read(record2);
        Assertions.assertEquals((long)124L, (long)((Long)this.store.getLastSequenceId().get(key)));
        Assertions.assertEquals((Object)startSecrets, (Object)this.store.load());
        ConsumerRecord<String, String> record3 = this.createConsumerRecordWithOldSeqId(125L, key, JSON_WITH_NULL);
        this.store.read(record3);
        Assertions.assertEquals((long)125L, (long)((Long)this.store.getLastSequenceId().get(key)));
        Assertions.assertEquals((Object)startSecrets, (Object)this.store.load());
        ConsumerRecord<String, String> record4 = this.createConsumerRecordWithOldSeqId(126L, key, JSON_WITH_MISSING_FIELDS);
        this.store.read(record4);
        Assertions.assertEquals((long)126L, (long)((Long)this.store.getLastSequenceId().get(key)));
        Assertions.assertEquals((Object)startSecrets, (Object)this.store.load());
    }

    @Test
    public void testLoad() {
        this.storeStart();
        long seqId = 123L;
        ConsumerRecord<String, String> record = this.createConsumerRecord(seqId, JSON2_KEY, JSON2);
        Assertions.assertEquals((int)0, (int)this.store.getLastSequenceId().size());
        this.store.read(record);
        Assertions.assertEquals((long)seqId, (long)((Long)this.store.getLastSequenceId().get(JSON2_KEY)));
        MultiTenantSaslSecrets secrets1 = this.store.load();
        MultiTenantSaslConfigEntry entry = (MultiTenantSaslConfigEntry)secrets1.entries().get(JSON2_KEY);
        Assertions.assertNotNull((Object)entry);
        Assertions.assertEquals((Object)"user3", (Object)entry.userId());
        Assertions.assertEquals((Object)"myCluster3", (Object)entry.logicalClusterId());
        Assertions.assertEquals((Object)"PLAIN", (Object)entry.saslMechanism());
        Assertions.assertEquals((Object)"no hash", (Object)entry.hashedSecret());
        Assertions.assertEquals((Object)"none3", (Object)entry.hashFunction());
        ((KafkaBasedLog)Mockito.verify(this.secretsLog, (VerificationMode)Mockito.times((int)1))).start();
    }

    @Test
    public void testLoadWithOldSeqId() {
        this.storeStart();
        long seqId = 123L;
        ConsumerRecord<String, String> record = this.createConsumerRecordWithOldSeqId(seqId, JSON2_KEY, JSON2);
        Assertions.assertEquals((int)0, (int)this.store.getLastSequenceId().size());
        this.store.read(record);
        Assertions.assertEquals((long)seqId, (long)((Long)this.store.getLastSequenceId().get(JSON2_KEY)));
        MultiTenantSaslSecrets secrets1 = this.store.load();
        MultiTenantSaslConfigEntry entry = (MultiTenantSaslConfigEntry)secrets1.entries().get(JSON2_KEY);
        Assertions.assertNotNull((Object)entry);
        Assertions.assertEquals((Object)"user3", (Object)entry.userId());
        Assertions.assertEquals((Object)"myCluster3", (Object)entry.logicalClusterId());
        Assertions.assertEquals((Object)"PLAIN", (Object)entry.saslMechanism());
        Assertions.assertEquals((Object)"no hash", (Object)entry.hashedSecret());
        Assertions.assertEquals((Object)"none3", (Object)entry.hashFunction());
        ((KafkaBasedLog)Mockito.verify(this.secretsLog, (VerificationMode)Mockito.times((int)1))).start();
    }

    @Test
    public void testApiKeysMetrics() {
        String[] jsons;
        String authRate = "successful-authentication-by-credential-total";
        String authConnectionCount = "active-authenticated-connection-by-credential-count";
        String[] metricNames = new String[]{"successful-authentication-by-credential-total", "active-authenticated-connection-by-credential-count"};
        HashMap<String, MetricName> key3MetricNames = new HashMap<String, MetricName>();
        for (String name2 : metricNames) {
            key3MetricNames.put(name2, this.metrics.metricName(name2, "tenant-metrics", TenantMetrics.ApiKeyMetricsContext.metricTags((String)JSON2_KEY, (String)"user3")));
        }
        key3MetricNames.forEach((name, metricName) -> Assertions.assertNull((Object)this.metrics.metric(metricName), (String)("Metric " + name + " exists!")));
        this.storeStart();
        long seqId = 123L;
        ConsumerRecord<String, String> record = this.createConsumerRecord(seqId, JSON2_KEY, JSON2);
        Assertions.assertEquals((int)0, (int)this.store.getLastSequenceId().size());
        this.store.read(record);
        ++seqId;
        key3MetricNames.forEach((name, metricName) -> Assertions.assertNotNull((Object)this.metrics.metric(metricName), (String)("Metric " + name + " doesn't exist!")));
        for (String json : jsons = new String[]{JSON1, JSON2}) {
            ConsumerRecord<String, String> record1 = this.createConsumerRecord(seqId, "dontcare", json);
            this.store.read(record1);
            ++seqId;
        }
        this.store.load().entries().forEach((key, entry) -> {
            for (String name : metricNames) {
                MetricName metricName = this.metrics.metricName(name, "tenant-metrics", TenantMetrics.ApiKeyMetricsContext.metricTags((String)key, (String)entry.userId()));
                KafkaMetric metric = this.metrics.metric(metricName);
                Assertions.assertNotNull((Object)metric, (String)("Metric " + name + " doesn't exist!"));
                Assertions.assertEquals((double)0.0, (double)((Double)metric.metricValue()), (double)0.1, (String)("Metric " + name));
            }
        });
        BiConsumer<String, Double> checkMetric = (name, expected) -> Assertions.assertEquals((double)expected, (double)((Double)this.metrics.metric((MetricName)key3MetricNames.get(name)).metricValue()), (double)0.1, (String)("Metric " + name));
        MultiTenantInterceptor interceptor = new MultiTenantInterceptor();
        MultiTenantPrincipal principal = PlainSaslAuthenticator.multiTenantPrincipal((String)JSON2_KEY, (MultiTenantSaslConfigEntry)((MultiTenantSaslConfigEntry)this.store.load().entries().get(JSON2_KEY)));
        InetAddress clientAddress = null;
        try {
            clientAddress = InetAddress.getByAddress(new byte[]{127, 0, 0, 1});
        }
        catch (IOException e) {
            Assertions.fail((String)"Creating InetAddress must not fail");
        }
        interceptor.onAuthenticatedConnection("dontcare", clientAddress, (KafkaPrincipal)principal, this.metrics);
        checkMetric.accept("active-authenticated-connection-by-credential-count", 1.0);
        checkMetric.accept("successful-authentication-by-credential-total", 1.0);
        MultiTenantInterceptor interceptor2 = new MultiTenantInterceptor();
        interceptor2.onAuthenticatedConnection("dontcare", clientAddress, (KafkaPrincipal)principal, this.metrics);
        checkMetric.accept("active-authenticated-connection-by-credential-count", 2.0);
        checkMetric.accept("successful-authentication-by-credential-total", 2.0);
        interceptor.onAuthenticatedDisconnection("dontcare", clientAddress, (KafkaPrincipal)principal, this.metrics);
        checkMetric.accept("active-authenticated-connection-by-credential-count", 1.0);
        checkMetric.accept("successful-authentication-by-credential-total", 2.0);
        interceptor2.onAuthenticatedDisconnection("dontcare", clientAddress, (KafkaPrincipal)principal, this.metrics);
        checkMetric.accept("active-authenticated-connection-by-credential-count", 0.0);
        checkMetric.accept("successful-authentication-by-credential-total", 2.0);
    }

    @Test
    public void testIgnoreOldMessage() {
        this.storeStart();
        long seqId1 = 123L;
        ConsumerRecord<String, String> record = this.createConsumerRecord(seqId1, JSON2_KEY, JSON2);
        Assertions.assertEquals((int)0, (int)this.store.getLastSequenceId().size());
        this.store.read(record);
        Assertions.assertEquals((long)seqId1, (long)((Long)this.store.getLastSequenceId().get(JSON2_KEY)));
        MultiTenantSaslConfigEntry secrets1 = (MultiTenantSaslConfigEntry)this.store.load().entries().get(JSON2_KEY);
        long seqId2 = seqId1 - 1L;
        record = this.createConsumerRecord(seqId2, JSON2_KEY, JSON3);
        Assertions.assertEquals((long)seqId1, (long)((Long)this.store.getLastSequenceId().get(JSON2_KEY)));
        this.store.read(record);
        Assertions.assertEquals((long)seqId1, (long)((Long)this.store.getLastSequenceId().get(JSON2_KEY)));
        MultiTenantSaslConfigEntry secrets2 = (MultiTenantSaslConfigEntry)this.store.load().entries().get(JSON2_KEY);
        Assertions.assertEquals((Object)secrets2, (Object)secrets1);
        String otherKey = "notkey3";
        record = this.createConsumerRecord(seqId2, otherKey, JSON2.replaceAll(JSON2_KEY, otherKey));
        this.store.read(record);
        Assertions.assertEquals((long)seqId2, (long)((Long)this.store.getLastSequenceId().get(otherKey)));
        secrets2 = (MultiTenantSaslConfigEntry)this.store.load().entries().get(otherKey);
        Assertions.assertNotNull((Object)secrets2);
        ((KafkaBasedLog)Mockito.verify(this.secretsLog, (VerificationMode)Mockito.times((int)1))).start();
    }

    @Test
    public void testApiKeyDeleteWithConnectionTerminationEnabled() throws Exception {
        this.verifyApiKeyDelete(true);
    }

    @Test
    public void testApiKeyDeleteWithConnectionTerminationDisabled() throws Exception {
        this.verifyApiKeyDelete(false);
    }

    private void verifyApiKeyDelete(boolean closeConnectionsOnCredentialDelete) throws Exception {
        String sessionUuid = "1";
        HashSet deletedCredentials = new HashSet();
        KafkaConfig config = (KafkaConfig)Mockito.mock(KafkaConfig.class);
        Mockito.when((Object)config.brokerSessionUuid()).thenReturn((Object)sessionUuid);
        Mockito.when((Object)config.closeConnectionsOnCredentialDelete()).thenReturn((Object)closeConnectionsOnCredentialDelete);
        BrokerSession.addSession((KafkaConfig)config, deletedCredentials::add);
        TestUtils.setFieldValueInSuperClass((Object)this.store, (String)"sessionUuid", (Object)sessionUuid);
        this.storeStart();
        Assertions.assertNotNull((Object)this.store.getLastSequenceId());
        long seqId = 1L;
        ConsumerRecord<String, String> record = this.createConsumerRecord(seqId++, JSON2_KEY, JSON2);
        this.store.read(record);
        Assertions.assertTrue((boolean)this.store.load().entries().containsKey(JSON2_KEY));
        Assertions.assertEquals(Collections.emptySet(), deletedCredentials);
        record = this.createConsumerRecord(seqId++, JSON2_KEY, JSON3);
        this.store.read(record);
        Assertions.assertTrue((boolean)this.store.load().entries().containsKey(JSON2_KEY));
        Assertions.assertEquals(Collections.emptySet(), deletedCredentials);
        record = this.createConsumerRecord(seqId++, JSON2_KEY, null);
        this.store.read(record);
        Assertions.assertFalse((boolean)this.store.load().entries().containsKey(JSON2_KEY));
        if (closeConnectionsOnCredentialDelete) {
            PublicCredential key = PublicCredential.saslCredential((String)JSON2_KEY, (String)"PLAIN");
            Assertions.assertEquals(Collections.singleton(key), deletedCredentials);
        } else {
            Assertions.assertEquals(Collections.emptySet(), deletedCredentials);
        }
    }

    private void verifyExpectedSecretsSize(long expectedSize) {
        Assertions.assertEquals((long)expectedSize, (long)this.store.load().entries().size(), (String)"Expected numbers of secrets to match");
        Assertions.assertEquals((double)expectedSize, (double)this.metricValue("active-api-key-count"), (double)0.1, (String)"Expected active secrets metric to match");
    }

    private void verifyMetricsRemoved() {
        Set tenantMetrics = this.metrics.metrics().keySet().stream().filter(metricName -> "tenant-metrics".equals(metricName.group()) && metricName.tags().isEmpty()).collect(Collectors.toSet());
        Assertions.assertEquals(Collections.emptySet(), tenantMetrics);
    }

    private void storeStart() {
        Endpoint dummyEndpoint = new Endpoint(LISTENER_NAME, SecurityProtocol.SASL_PLAINTEXT, "localhost", 0);
        Map endpointFutures = this.store.start(Collections.singletonList(dummyEndpoint));
        Assertions.assertEquals((int)1, (int)endpointFutures.size(), (String)"More endpoints futures returned than expected!");
        ((CompletableFuture)endpointFutures.get(dummyEndpoint)).join();
    }

    private ConsumerRecord<String, String> createConsumerRecord(long seqId, String key, String value) {
        RecordHeaders headers = KafkaTestUtils.createGoodSequenceIdRecordHeaders(seqId);
        return new ConsumerRecord(TOPIC, 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, (Object)key, (Object)value, (Headers)headers, Optional.empty());
    }

    private ConsumerRecord<String, String> createConsumerRecordWithOldSeqId(long seqId, String key, String value) {
        RecordHeaders headers = KafkaTestUtils.createGoodSequenceIdRecordHeaders(seqId, false);
        return new ConsumerRecord(TOPIC, 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, (Object)key, (Object)value, (Headers)headers, Optional.empty());
    }

    private double metricValue(String name) {
        MetricName metricName = this.metrics.metricName(name, "tenant-metrics");
        KafkaMetric metric = this.metrics.metric(metricName);
        return (Double)metric.metricValue();
    }
}

