package org.apache.kafka.connect.integration;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.class */
public class OffsetsApiIntegrationTest {
    private static final String CONNECTOR_NAME = "test-connector";
    private static final String TOPIC = "test-topic";
    private static final Integer NUM_TASKS = 2;
    private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1);
    private static final int NUM_WORKERS = 3;
    private EmbeddedConnectCluster connect;

    @Before
    public void setup() {
        HashMap hashMap = new HashMap();
        hashMap.put("offset.flush.interval.ms", String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
        this.connect = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(NUM_WORKERS).workerProps(hashMap).build();
        this.connect.start();
    }

    @After
    public void tearDown() {
        this.connect.stop();
    }

    @Test
    public void testGetNonExistentConnectorOffsets() {
        Assert.assertEquals(404L, Assert.assertThrows(ConnectRestException.class, () -> {
            this.connect.connectorOffsets("non-existent-connector");
        }).errorCode());
    }

    @Test
    public void testGetSinkConnectorOffsets() throws Exception {
        getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), this.connect.kafka());
    }

    @Test
    public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception {
        Map<String, String> baseSinkConnectorConfigs = baseSinkConnectorConfigs();
        baseSinkConnectorConfigs.put("consumer.override.group.id", "overridden-group-id");
        getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs, this.connect.kafka());
        Admin createAdminClient = this.connect.kafka().createAdminClient();
        Throwable th = null;
        try {
            try {
                Collection collection = (Collection) createAdminClient.listConsumerGroups().all().get();
                Assert.assertTrue(collection.stream().anyMatch(consumerGroupListing -> {
                    return "overridden-group-id".equals(consumerGroupListing.groupId());
                }));
                Assert.assertTrue(collection.stream().noneMatch(consumerGroupListing2 -> {
                    return SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing2.groupId());
                }));
                if (createAdminClient != null) {
                    if (0 == 0) {
                        createAdminClient.close();
                        return;
                    }
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
        embeddedKafkaCluster.getClass();
        AutoCloseable autoCloseable = embeddedKafkaCluster::stop;
        Throwable th = null;
        try {
            try {
                embeddedKafkaCluster.start();
                Map<String, String> baseSinkConnectorConfigs = baseSinkConnectorConfigs();
                baseSinkConnectorConfigs.put("consumer.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
                baseSinkConnectorConfigs.put("admin.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
                getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs, embeddedKafkaCluster);
                if (autoCloseable != null) {
                    if (0 == 0) {
                        autoCloseable.close();
                        return;
                    }
                    try {
                        autoCloseable.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (autoCloseable != null) {
                if (th != null) {
                    try {
                        autoCloseable.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    autoCloseable.close();
                }
            }
            throw th4;
        }
    }

    private void getAndVerifySinkConnectorOffsets(Map<String, String> map, EmbeddedKafkaCluster embeddedKafkaCluster) throws Exception {
        embeddedKafkaCluster.createTopic(TOPIC, 5);
        for (int i = 0; i < 5; i++) {
            for (int i2 = 0; i2 < 10; i2++) {
                embeddedKafkaCluster.produce(TOPIC, Integer.valueOf(i), "key", "value");
            }
        }
        this.connect.configureConnector(CONNECTOR_NAME, map);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS.intValue(), "Connector tasks did not start in time.");
        TestUtils.waitForCondition(() -> {
            ConnectorOffsets connectorOffsets = this.connect.connectorOffsets(CONNECTOR_NAME);
            if (connectorOffsets.offsets().size() != 5) {
                return false;
            }
            for (ConnectorOffset connectorOffset : connectorOffsets.offsets()) {
                Assert.assertEquals(TOPIC, connectorOffset.partition().get("kafka_topic"));
                if (((Integer) connectorOffset.offset().get("kafka_offset")).intValue() != 10) {
                    return false;
                }
            }
            return true;
        }, "Sink connector consumer group offsets should catch up to the topic end offsets");
        for (int i3 = 0; i3 < 5; i3++) {
            for (int i4 = 0; i4 < 10; i4++) {
                embeddedKafkaCluster.produce(TOPIC, Integer.valueOf(i3), "key", "value");
            }
        }
        TestUtils.waitForCondition(() -> {
            ConnectorOffsets connectorOffsets = this.connect.connectorOffsets(CONNECTOR_NAME);
            if (connectorOffsets.offsets().size() != 5) {
                return false;
            }
            for (ConnectorOffset connectorOffset : connectorOffsets.offsets()) {
                Assert.assertEquals(TOPIC, connectorOffset.partition().get("kafka_topic"));
                if (((Integer) connectorOffset.offset().get("kafka_offset")).intValue() != 20) {
                    return false;
                }
            }
            return true;
        }, "Sink connector consumer group offsets should catch up to the topic end offsets");
    }

    @Test
    public void testGetSourceConnectorOffsets() throws Exception {
        getAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
    }

    @Test
    public void testGetSourceConnectorOffsetsCustomOffsetsTopic() throws Exception {
        Map<String, String> baseSourceConnectorConfigs = baseSourceConnectorConfigs();
        baseSourceConnectorConfigs.put("offsets.storage.topic", "custom-offsets-topic");
        getAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs);
    }

    @Test
    public void testGetSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
        embeddedKafkaCluster.getClass();
        AutoCloseable autoCloseable = embeddedKafkaCluster::stop;
        Throwable th = null;
        try {
            try {
                embeddedKafkaCluster.start();
                Map<String, String> baseSourceConnectorConfigs = baseSourceConnectorConfigs();
                baseSourceConnectorConfigs.put("producer.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
                baseSourceConnectorConfigs.put("admin.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
                getAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs);
                if (autoCloseable != null) {
                    if (0 == 0) {
                        autoCloseable.close();
                        return;
                    }
                    try {
                        autoCloseable.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (autoCloseable != null) {
                if (th != null) {
                    try {
                        autoCloseable.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    autoCloseable.close();
                }
            }
            throw th4;
        }
    }

    private void getAndVerifySourceConnectorOffsets(Map<String, String> map) throws Exception {
        this.connect.configureConnector(CONNECTOR_NAME, map);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS.intValue(), "Connector tasks did not start in time.");
        TestUtils.waitForCondition(() -> {
            ConnectorOffsets connectorOffsets = this.connect.connectorOffsets(CONNECTOR_NAME);
            if (connectorOffsets.offsets().size() != NUM_TASKS.intValue()) {
                return false;
            }
            for (ConnectorOffset connectorOffset : connectorOffsets.offsets()) {
                Assert.assertTrue(((String) connectorOffset.partition().get("task.id")).startsWith(CONNECTOR_NAME));
                if (((Integer) connectorOffset.offset().get("saved")).intValue() != 10) {
                    return false;
                }
            }
            return true;
        }, "Source connector offsets should reflect the expected number of records produced");
        map.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, "20");
        this.connect.configureConnector(CONNECTOR_NAME, map);
        TestUtils.waitForCondition(() -> {
            ConnectorOffsets connectorOffsets = this.connect.connectorOffsets(CONNECTOR_NAME);
            if (connectorOffsets.offsets().size() != NUM_TASKS.intValue()) {
                return false;
            }
            for (ConnectorOffset connectorOffset : connectorOffsets.offsets()) {
                Assert.assertTrue(((String) connectorOffset.partition().get("task.id")).startsWith(CONNECTOR_NAME));
                if (((Integer) connectorOffset.offset().get("saved")).intValue() != 20) {
                    return false;
                }
            }
            return true;
        }, "Source connector offsets should reflect the expected number of records produced");
    }

    private Map<String, String> baseSinkConnectorConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSinkConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(NUM_TASKS));
        hashMap.put("topics", TOPIC);
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        return hashMap;
    }

    private Map<String, String> baseSourceConnectorConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(NUM_TASKS));
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, TOPIC);
        hashMap.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, "3");
        hashMap.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, "10");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("topic.creation.default.replication.factor", "1");
        hashMap.put("topic.creation.default.partitions", "1");
        return hashMap;
    }
}
