package org.apache.kafka.connect.storage;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TestFuture;
import org.apache.kafka.connect.util.TopicAdmin;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;

@PrepareForTest({KafkaConfigBackingStore.class, WorkerConfig.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
/* loaded from: input_file:org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.class */
public class KafkaConfigBackingStoreTest {
    private static final String CLIENT_ID_BASE = "test-client-id-";
    private static final String TOPIC = "connect-configs";
    private static final short TOPIC_REPLICATION_FACTOR = 5;
    private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap();
    private static final List<String> CONNECTOR_IDS;
    private static final List<String> CONNECTOR_CONFIG_KEYS;
    private static final List<String> COMMIT_TASKS_CONFIG_KEYS;
    private static final List<String> TARGET_STATE_KEYS;
    private static final List<String> CONNECTOR_TASK_COUNT_RECORD_KEYS;
    private static final String CONNECTOR_1_NAME = "connector1";
    private static final String CONNECTOR_2_NAME = "connector2";
    private static final List<String> RESTART_CONNECTOR_KEYS;
    private static final List<ConnectorTaskId> TASK_IDS;
    private static final List<String> TASK_CONFIG_KEYS;
    private static final List<Map<String, String>> SAMPLE_CONFIGS;
    private static final List<Struct> CONNECTOR_CONFIG_STRUCTS;
    private static final List<Struct> TASK_CONFIG_STRUCTS;
    private static final List<Struct> CONNECTOR_TASK_COUNT_RECORD_STRUCTS;
    private static final Struct TARGET_STATE_STARTED;
    private static final Struct TARGET_STATE_PAUSED_LEGACY;
    private static final Struct TARGET_STATE_PAUSED;
    private static final Struct TARGET_STATE_STOPPED;
    private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR;
    private static final Struct TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR;
    private static final Struct ONLY_FAILED_MISSING_STRUCT;
    private static final Struct INLUDE_TASKS_MISSING_STRUCT;
    private static final List<Struct> RESTART_REQUEST_STRUCTS;
    private static final List<byte[]> CONFIGS_SERIALIZED;

    @Mock
    private Converter converter;

    @Mock
    private ConfigBackingStore.UpdateListener configUpdateListener;
    private DistributedConfig config;

    @Mock
    KafkaBasedLog<String, byte[]> storeLog;

    @Mock
    Producer<String, byte[]> fencableProducer;

    @Mock
    Future<RecordMetadata> producerFuture;
    private KafkaConfigBackingStore configStorage;
    private Map<String, String> props = new HashMap(DEFAULT_CONFIG_STORAGE_PROPS);
    private final Capture<String> capturedTopic = EasyMock.newCapture();
    private final Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
    private final Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
    private final Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
    private final Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
    private final Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
    private final MockTime time = new MockTime();
    private long logOffset = 0;

    private void createStore() {
        this.config = (DistributedConfig) PowerMock.createPartialMock(DistributedConfig.class, new String[]{"kafkaClusterId"}, new Object[]{this.props});
        EasyMock.expect(this.config.kafkaClusterId()).andReturn("test-cluster").anyTimes();
        EasyMock.replay(new Object[]{this.config});
        this.configStorage = (KafkaConfigBackingStore) PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog", "createFencableProducer"}, new Object[]{this.converter, this.config, null, null, CLIENT_ID_BASE, this.time});
        Whitebox.setInternalState(this.configStorage, "configLog", this.storeLog);
        this.configStorage.setUpdateListener(this.configUpdateListener);
        EasyMock.reset(new Object[]{this.config});
        EasyMock.expect(this.config.kafkaClusterId()).andReturn("test-cluster").anyTimes();
    }

    @Before
    public void setUp() {
        createStore();
    }

    @Test
    public void testStartStop() throws Exception {
        this.props.put("config.storage.min.insync.replicas", "3");
        this.props.put("config.storage.max.message.bytes", "1001");
        createStore();
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        Assert.assertEquals(TOPIC, this.capturedTopic.getValue());
        Assert.assertEquals("false", ((Map) this.capturedProducerProps.getValue()).get("enable.idempotence"));
        Assert.assertEquals("org.apache.kafka.common.serialization.StringSerializer", ((Map) this.capturedProducerProps.getValue()).get("key.serializer"));
        Assert.assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", ((Map) this.capturedProducerProps.getValue()).get("value.serializer"));
        Assert.assertEquals("org.apache.kafka.common.serialization.StringDeserializer", ((Map) this.capturedConsumerProps.getValue()).get("key.deserializer"));
        Assert.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", ((Map) this.capturedConsumerProps.getValue()).get("value.deserializer"));
        Assert.assertEquals(TOPIC, ((NewTopic) this.capturedNewTopic.getValue()).name());
        Assert.assertEquals(1L, ((NewTopic) this.capturedNewTopic.getValue()).numPartitions());
        Assert.assertEquals(5L, ((NewTopic) this.capturedNewTopic.getValue()).replicationFactor());
        Assert.assertEquals("3", ((NewTopic) this.capturedNewTopic.getValue()).configs().get("min.insync.replicas"));
        Assert.assertEquals("1001", ((NewTopic) this.capturedNewTopic.getValue()).configs().get("max.message.bytes"));
        this.configStorage.start();
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testSnapshotCannotMutateInternalState() throws Exception {
        this.props.put("config.storage.min.insync.replicas", "3");
        this.props.put("config.storage.max.message.bytes", "1001");
        createStore();
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectPartitionCount(1);
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertNotSame(snapshot.connectorTaskCounts, this.configStorage.connectorTaskCounts);
        Assert.assertNotSame(snapshot.connectorConfigs, this.configStorage.connectorConfigs);
        Assert.assertNotSame(snapshot.connectorTargetStates, this.configStorage.connectorTargetStates);
        Assert.assertNotSame(snapshot.taskConfigs, this.configStorage.taskConfigs);
        Assert.assertNotSame(snapshot.connectorTaskCountRecords, this.configStorage.connectorTaskCountRecords);
        Assert.assertNotSame(snapshot.connectorTaskConfigGenerations, this.configStorage.connectorTaskConfigGenerations);
        Assert.assertNotSame(snapshot.connectorsPendingFencing, this.configStorage.connectorsPendingFencing);
        Assert.assertNotSame(snapshot.inconsistentConnectors, this.configStorage.inconsistent);
        PowerMock.verifyAll();
    }

    @Test
    public void testPutConnectorConfig() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectConvertWriteAndRead(CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), "properties", SAMPLE_CONFIGS.get(0));
        this.configUpdateListener.onConnectorConfigUpdate(CONNECTOR_IDS.get(0));
        EasyMock.expectLastCall();
        expectConvertWriteAndRead(CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), "properties", SAMPLE_CONFIGS.get(1));
        this.configUpdateListener.onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
        EasyMock.expectLastCall();
        expectConnectorRemoval(CONNECTOR_CONFIG_KEYS.get(1), TARGET_STATE_KEYS.get(1));
        this.configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(1));
        EasyMock.expectLastCall();
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(-1L, snapshot.offset());
        Assert.assertNull(snapshot.connectorConfig(CONNECTOR_IDS.get(0)));
        Assert.assertNull(snapshot.connectorConfig(CONNECTOR_IDS.get(1)));
        this.configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assert.assertEquals(1L, snapshot2.offset());
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot2.connectorConfig(CONNECTOR_IDS.get(0)));
        Assert.assertNull(snapshot2.connectorConfig(CONNECTOR_IDS.get(1)));
        this.configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1));
        ClusterConfigState snapshot3 = this.configStorage.snapshot();
        Assert.assertEquals(2L, snapshot3.offset());
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot3.connectorConfig(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(1), snapshot3.connectorConfig(CONNECTOR_IDS.get(1)));
        this.configStorage.removeConnectorConfig(CONNECTOR_IDS.get(1));
        ClusterConfigState snapshot4 = this.configStorage.snapshot();
        Assert.assertEquals(4L, snapshot4.offset());
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot4.connectorConfig(CONNECTOR_IDS.get(0)));
        Assert.assertNull(snapshot4.connectorConfig(CONNECTOR_IDS.get(1)));
        Assert.assertNull(snapshot4.targetState(CONNECTOR_IDS.get(1)));
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testPutConnectorConfigProducerError() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectPartitionCount(1);
        expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0));
        this.storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.anyObject());
        EasyMock.expectLastCall().andReturn(this.producerFuture);
        this.producerFuture.get(EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject());
        EasyMock.expectLastCall().andThrow(new ExecutionException((Throwable) new TopicAuthorizationException(Collections.singleton("test"))));
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        Assert.assertEquals(-1L, this.configStorage.snapshot().offset());
        Assert.assertEquals(0L, r0.connectors().size());
        Assert.assertTrue(Assert.assertThrows(ConnectException.class, () -> {
            this.configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
        }).getMessage().contains("Error writing connector configuration to Kafka"));
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testRemoveConnectorConfigSlowProducer() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectPartitionCount(1);
        Future future = (Future) PowerMock.createMock(Future.class);
        this.storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull());
        EasyMock.expectLastCall().andReturn(future);
        Future future2 = (Future) PowerMock.createMock(Future.class);
        this.storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull());
        EasyMock.expectLastCall().andReturn(future2);
        future.get(EasyMock.eq(30000L), (TimeUnit) EasyMock.anyObject());
        EasyMock.expectLastCall().andAnswer(() -> {
            this.time.sleep(29000L);
            return null;
        });
        future2.get(EasyMock.eq(1000L), (TimeUnit) EasyMock.anyObject());
        EasyMock.expectLastCall().andAnswer(() -> {
            this.time.sleep(1000L);
            return null;
        });
        Future future3 = (Future) PowerMock.createMock(Future.class);
        EasyMock.expect(this.storeLog.readToEnd()).andAnswer(() -> {
            return future3;
        });
        EasyMock.expect(future3.get(EasyMock.eq(0L), (TimeUnit) EasyMock.anyObject())).andReturn((Object) null);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        this.configStorage.removeConnectorConfig("test-connector");
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testWritePrivileges() throws Exception {
        this.props.put("exactly.once.source.support", "preparing");
        createStore();
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectConvert(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0));
        expectFencableProducer();
        expectConvert(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0));
        this.fencableProducer.beginTransaction();
        EasyMock.expectLastCall();
        EasyMock.expect(this.fencableProducer.send((ProducerRecord) EasyMock.anyObject())).andReturn((Object) null);
        this.fencableProducer.commitTransaction();
        EasyMock.expectLastCall();
        expectRead(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(0), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0));
        expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(1));
        this.fencableProducer.beginTransaction();
        EasyMock.expectLastCall();
        EasyMock.expect(this.fencableProducer.send((ProducerRecord) EasyMock.anyObject())).andReturn((Object) null);
        this.fencableProducer.commitTransaction();
        EasyMock.expectLastCall().andThrow(new ProducerFencedException("Better luck next time"));
        this.fencableProducer.close(Duration.ZERO);
        EasyMock.expectLastCall();
        expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(1));
        expectConvert(KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
        this.storeLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1));
        EasyMock.expectLastCall().andReturn(this.producerFuture);
        this.producerFuture.get(EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject());
        EasyMock.expectLastCall().andReturn((Object) null);
        expectFencableProducer();
        expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(1));
        this.fencableProducer.beginTransaction();
        EasyMock.expectLastCall();
        EasyMock.expect(this.fencableProducer.send((ProducerRecord) EasyMock.anyObject())).andReturn((Object) null);
        this.fencableProducer.commitTransaction();
        EasyMock.expectLastCall();
        expectConvertRead(CONNECTOR_CONFIG_KEYS.get(1), CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(2));
        this.configUpdateListener.onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
        EasyMock.expectLastCall();
        expectPartitionCount(1);
        expectStop();
        this.fencableProducer.close(Duration.ZERO);
        EasyMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6);
        });
        this.configStorage.claimWritePrivileges();
        this.configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6);
        Assert.assertThrows(PrivilegedWriteException.class, () -> {
            this.configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0));
        });
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0));
        });
        this.configStorage.putTargetState(CONNECTOR_IDS.get(1), TargetState.PAUSED);
        this.configStorage.claimWritePrivileges();
        this.configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0));
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testTaskCountRecordsAndGenerations() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), "properties", SAMPLE_CONFIGS.get(0));
        expectConvertWriteRead(TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), "properties", SAMPLE_CONFIGS.get(1));
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), "tasks", 2);
        this.configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
        EasyMock.expectLastCall();
        LinkedHashMap<String, byte[]> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        linkedHashMap.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
        linkedHashMap.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
        expectReadToEnd(linkedHashMap);
        expectConvertWriteRead(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONFIGS_SERIALIZED.get(3), "task-count", 4);
        LinkedHashMap<String, byte[]> linkedHashMap2 = new LinkedHashMap<>();
        linkedHashMap2.put(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(3));
        expectReadToEnd(linkedHashMap2);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
        String str = CONNECTOR_IDS.get(0);
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertFalse(snapshot.pendingFencing(str));
        Assert.assertNull(snapshot.taskCountRecord(str));
        Assert.assertNull(snapshot.taskConfigGeneration(str));
        this.configStorage.putTaskConfigs(CONNECTOR_1_NAME, Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1)));
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assert.assertEquals(3L, snapshot2.offset());
        Assert.assertTrue(snapshot2.pendingFencing(str));
        Assert.assertNull(snapshot2.taskCountRecord(str));
        Assert.assertEquals(0L, snapshot2.taskConfigGeneration(str).intValue());
        this.configStorage.putTaskCountRecord(str, 4);
        ClusterConfigState snapshot3 = this.configStorage.snapshot();
        Assert.assertEquals(4L, snapshot3.offset());
        Assert.assertFalse(snapshot3.pendingFencing(str));
        Assert.assertEquals(4L, snapshot3.taskCountRecord(str).intValue());
        Assert.assertEquals(0L, snapshot3.taskConfigGeneration(str).intValue());
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testPutTaskConfigs() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), "properties", SAMPLE_CONFIGS.get(0));
        expectConvertWriteRead(TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), "properties", SAMPLE_CONFIGS.get(1));
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), "tasks", 2);
        this.configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
        EasyMock.expectLastCall();
        LinkedHashMap<String, byte[]> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        linkedHashMap.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
        linkedHashMap.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
        expectReadToEnd(linkedHashMap);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(-1L, snapshot.offset());
        Assert.assertNull(snapshot.taskConfig(TASK_IDS.get(0)));
        Assert.assertNull(snapshot.taskConfig(TASK_IDS.get(1)));
        this.configStorage.putTaskConfigs(CONNECTOR_1_NAME, Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1)));
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assert.assertEquals(3L, snapshot2.offset());
        String str = CONNECTOR_IDS.get(0);
        Assert.assertEquals(Arrays.asList(str), new ArrayList(snapshot2.connectors()));
        Assert.assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), snapshot2.tasks(str));
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot2.taskConfig(TASK_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(1), snapshot2.taskConfig(TASK_IDS.get(1)));
        Assert.assertEquals(Collections.EMPTY_SET, snapshot2.inconsistentConnectors());
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testPutTaskConfigsStartsOnlyReconfiguredTasks() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), "properties", SAMPLE_CONFIGS.get(0));
        expectConvertWriteRead(TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), "properties", SAMPLE_CONFIGS.get(1));
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), "tasks", 2);
        this.configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
        EasyMock.expectLastCall();
        LinkedHashMap<String, byte[]> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        linkedHashMap.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
        linkedHashMap.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
        expectReadToEnd(linkedHashMap);
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(TASK_CONFIG_KEYS.get(2), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(3), "properties", SAMPLE_CONFIGS.get(2));
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(COMMIT_TASKS_CONFIG_KEYS.get(1), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(4), "tasks", 1);
        this.configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(2)));
        EasyMock.expectLastCall();
        LinkedHashMap<String, byte[]> linkedHashMap2 = new LinkedHashMap<>();
        linkedHashMap2.put(TASK_CONFIG_KEYS.get(2), CONFIGS_SERIALIZED.get(3));
        linkedHashMap2.put(COMMIT_TASKS_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(4));
        expectReadToEnd(linkedHashMap2);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
        whiteboxAddConnector(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1), Collections.emptyList());
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(-1L, snapshot.offset());
        Assert.assertNull(snapshot.taskConfig(TASK_IDS.get(0)));
        Assert.assertNull(snapshot.taskConfig(TASK_IDS.get(1)));
        this.configStorage.putTaskConfigs(CONNECTOR_1_NAME, Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1)));
        this.configStorage.putTaskConfigs(CONNECTOR_2_NAME, Collections.singletonList(SAMPLE_CONFIGS.get(2)));
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assert.assertEquals(5L, snapshot2.offset());
        String str = CONNECTOR_IDS.get(0);
        String str2 = CONNECTOR_IDS.get(1);
        Assert.assertEquals(Arrays.asList(str, str2), new ArrayList(snapshot2.connectors()));
        Assert.assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), snapshot2.tasks(str));
        Assert.assertEquals(Collections.singletonList(TASK_IDS.get(2)), snapshot2.tasks(str2));
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot2.taskConfig(TASK_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(1), snapshot2.taskConfig(TASK_IDS.get(1)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(2), snapshot2.taskConfig(TASK_IDS.get(2)));
        Assert.assertEquals(Collections.EMPTY_SET, snapshot2.inconsistentConnectors());
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testPutTaskConfigsZeroTasks() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0), "tasks", 0);
        this.configUpdateListener.onTaskConfigUpdate(Collections.emptyList());
        EasyMock.expectLastCall();
        LinkedHashMap<String, byte[]> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        expectReadToEnd(linkedHashMap);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
        Assert.assertEquals(-1L, this.configStorage.snapshot().offset());
        this.configStorage.putTaskConfigs(CONNECTOR_1_NAME, Collections.emptyList());
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(1L, snapshot.offset());
        String str = CONNECTOR_IDS.get(0);
        Assert.assertEquals(Arrays.asList(str), new ArrayList(snapshot.connectors()));
        Assert.assertEquals(Collections.emptyList(), snapshot.tasks(str));
        Assert.assertEquals(Collections.EMPTY_SET, snapshot.inconsistentConnectors());
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testRestoreTargetState() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 5L, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(1), CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), TARGET_STATE_PAUSED_LEGACY);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), TARGET_STATE_STOPPED);
        this.logOffset = 6L;
        expectStart(asList, linkedHashMap);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(6L, snapshot.offset());
        Assert.assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot.connectors()));
        Assert.assertEquals(TargetState.PAUSED, snapshot.targetState(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(TargetState.STOPPED, snapshot.targetState(CONNECTOR_IDS.get(1)));
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testBackgroundUpdateTargetState() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        this.logOffset = 5L;
        expectStart(asList, linkedHashMap);
        LinkedHashMap<String, byte[]> linkedHashMap2 = new LinkedHashMap<>();
        linkedHashMap2.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        linkedHashMap2.put(TARGET_STATE_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
        Map<String, Struct> hashMap = new HashMap<>();
        hashMap.put(TARGET_STATE_KEYS.get(0), TARGET_STATE_PAUSED);
        hashMap.put(TARGET_STATE_KEYS.get(1), TARGET_STATE_STOPPED);
        expectRead(linkedHashMap2, hashMap);
        this.configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
        this.configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(1));
        EasyMock.expectLastCall();
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), this.configStorage.connectorTargetStates.keySet());
        Assert.assertEquals(TargetState.STARTED, snapshot.targetState(CONNECTOR_IDS.get(0)));
        this.configStorage.refresh(0L, TimeUnit.SECONDS);
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assert.assertEquals(new HashSet(CONNECTOR_IDS), this.configStorage.connectorTargetStates.keySet());
        Assert.assertEquals(TargetState.PAUSED, snapshot2.targetState(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(TargetState.STOPPED, snapshot2.targetState(CONNECTOR_IDS.get(1)));
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testSameTargetState() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        this.logOffset = 5L;
        expectStart(asList, linkedHashMap);
        this.configUpdateListener.onConnectorTargetStateChange(EasyMock.anyString());
        EasyMock.expectLastCall().andStubThrow(new AssertionError("unexpected call to onConnectorTargetStateChange"));
        expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), TARGET_STATE_STARTED);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        Assert.assertEquals(TargetState.STARTED, this.configStorage.snapshot().targetState(CONNECTOR_IDS.get(0)));
        this.configStorage.refresh(0L, TimeUnit.SECONDS);
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testBackgroundConnectorDeletion() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        this.logOffset = 5L;
        expectStart(asList, linkedHashMap);
        LinkedHashMap<String, byte[]> linkedHashMap2 = new LinkedHashMap<>();
        linkedHashMap2.put(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        linkedHashMap2.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(1));
        Map<String, Struct> hashMap = new HashMap<>();
        hashMap.put(CONNECTOR_CONFIG_KEYS.get(0), null);
        hashMap.put(TARGET_STATE_KEYS.get(0), null);
        expectRead(linkedHashMap2, hashMap);
        this.configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(0));
        EasyMock.expectLastCall();
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(TargetState.STARTED, snapshot.targetState(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot.connectorConfig(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot.taskConfig(new ConnectorTaskId(CONNECTOR_IDS.get(0), 0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(1), snapshot.taskConfig(new ConnectorTaskId(CONNECTOR_IDS.get(0), 1)));
        Assert.assertEquals(2L, snapshot.taskCount(CONNECTOR_IDS.get(0)));
        this.configStorage.refresh(0L, TimeUnit.SECONDS);
        Assert.assertFalse(this.configStorage.snapshot().contains(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(0L, r0.taskCount(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(Collections.emptyMap(), this.configStorage.deferredTaskUpdates);
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testRestoreTargetStateUnexpectedDeletion() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), null);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        this.logOffset = 5L;
        expectStart(asList, linkedHashMap);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(5L, snapshot.offset());
        Assert.assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot.connectors()));
        Assert.assertEquals(TargetState.STARTED, snapshot.targetState(CONNECTOR_IDS.get(0)));
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testRestore() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 5L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 6L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(1), CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 7L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(7), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 8L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(8), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), CONNECTOR_CONFIG_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(6), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(7), CONNECTOR_CONFIG_STRUCTS.get(2));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(8), TASK_CONFIG_STRUCTS.get(1));
        this.logOffset = 9L;
        expectStart(asList, linkedHashMap);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(this.logOffset, snapshot.offset());
        Assert.assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot.connectors()));
        Assert.assertEquals(TargetState.STARTED, snapshot.targetState(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(2), snapshot.connectorConfig(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), snapshot.tasks(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot.taskConfig(TASK_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot.taskConfig(TASK_IDS.get(1)));
        Assert.assertEquals(9L, snapshot.taskCountRecord(CONNECTOR_IDS.get(1)).intValue());
        Assert.assertEquals(Collections.EMPTY_SET, snapshot.inconsistentConnectors());
        Assert.assertEquals(Collections.singleton(CONNECTOR_1_NAME), snapshot.connectorsPendingFencing);
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testRestoreConnectorDeletion() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 5L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), null);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), null);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        this.logOffset = 6L;
        expectStart(asList, linkedHashMap);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(6L, snapshot.offset());
        Assert.assertTrue(snapshot.connectors().isEmpty());
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testRestoreZeroTasks() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 5L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 6L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 7L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(7), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), CONNECTOR_CONFIG_STRUCTS.get(2));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
        this.logOffset = 8L;
        expectStart(asList, linkedHashMap);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(8L, snapshot.offset());
        Assert.assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot.connectors()));
        Assert.assertEquals(SAMPLE_CONFIGS.get(2), snapshot.connectorConfig(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(Collections.emptyList(), snapshot.tasks(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(Collections.EMPTY_SET, snapshot.inconsistentConnectors());
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 5L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), TASK_CONFIG_STRUCTS.get(1));
        this.logOffset = 6L;
        expectStart(asList, linkedHashMap);
        expectPartitionCount(1);
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), "properties", SAMPLE_CONFIGS.get(0));
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), "tasks", 1);
        this.configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0)));
        EasyMock.expectLastCall();
        LinkedHashMap<String, byte[]> linkedHashMap2 = new LinkedHashMap<>();
        linkedHashMap2.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        linkedHashMap2.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
        expectReadToEnd(linkedHashMap2);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(6L, snapshot.offset());
        Assert.assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot.connectors()));
        Assert.assertEquals(Collections.emptyList(), snapshot.tasks(CONNECTOR_IDS.get(0)));
        Assert.assertNull(snapshot.taskConfig(TASK_IDS.get(0)));
        Assert.assertNull(snapshot.taskConfig(TASK_IDS.get(1)));
        Assert.assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), snapshot.inconsistentConnectors());
        this.configStorage.putTaskConfigs(CONNECTOR_1_NAME, Collections.singletonList(SAMPLE_CONFIGS.get(0)));
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assert.assertEquals(8L, snapshot2.offset());
        Assert.assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot2.connectors()));
        Assert.assertEquals(Arrays.asList(TASK_IDS.get(0)), snapshot2.tasks(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot2.taskConfig(TASK_IDS.get(0)));
        Assert.assertEquals(Collections.EMPTY_SET, snapshot2.inconsistentConnectors());
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testPutRestartRequestOnlyFailed() throws Exception {
        testPutRestartRequest(new RestartRequest(CONNECTOR_IDS.get(0), true, false));
    }

    @Test
    public void testPutRestartRequestOnlyFailedIncludingTasks() throws Exception {
        testPutRestartRequest(new RestartRequest(CONNECTOR_IDS.get(0), true, true));
    }

    private void testPutRestartRequest(RestartRequest restartRequest) throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectConvertWriteAndRead(RESTART_CONNECTOR_KEYS.get(0), KafkaConfigBackingStore.RESTART_REQUEST_V0, CONFIGS_SERIALIZED.get(0), "only-failed", Boolean.valueOf(restartRequest.onlyFailed()));
        Capture newCapture = EasyMock.newCapture();
        this.configUpdateListener.onRestartRequest((RestartRequest) EasyMock.capture(newCapture));
        EasyMock.expectLastCall();
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        this.configStorage.putRestartRequest(restartRequest);
        Assert.assertEquals(restartRequest.connectorName(), ((RestartRequest) newCapture.getValue()).connectorName());
        Assert.assertEquals(Boolean.valueOf(restartRequest.onlyFailed()), Boolean.valueOf(((RestartRequest) newCapture.getValue()).onlyFailed()));
        Assert.assertEquals(Boolean.valueOf(restartRequest.includeTasks()), Boolean.valueOf(((RestartRequest) newCapture.getValue()).includeTasks()));
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testRecordToRestartRequest() {
        ConsumerRecord consumerRecord = new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty());
        Struct struct = RESTART_REQUEST_STRUCTS.get(0);
        RestartRequest recordToRestartRequest = this.configStorage.recordToRestartRequest(consumerRecord, new SchemaAndValue(struct.schema(), structToMap(struct)));
        Assert.assertEquals(CONNECTOR_1_NAME, recordToRestartRequest.connectorName());
        Assert.assertEquals(struct.getBoolean("include-tasks"), Boolean.valueOf(recordToRestartRequest.includeTasks()));
        Assert.assertEquals(struct.getBoolean("only-failed"), Boolean.valueOf(recordToRestartRequest.onlyFailed()));
    }

    @Test
    public void testRecordToRestartRequestOnlyFailedInconsistent() {
        ConsumerRecord consumerRecord = new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty());
        Struct struct = ONLY_FAILED_MISSING_STRUCT;
        RestartRequest recordToRestartRequest = this.configStorage.recordToRestartRequest(consumerRecord, new SchemaAndValue(struct.schema(), structToMap(struct)));
        Assert.assertEquals(CONNECTOR_1_NAME, recordToRestartRequest.connectorName());
        Assert.assertEquals(struct.getBoolean("include-tasks"), Boolean.valueOf(recordToRestartRequest.includeTasks()));
        Assert.assertFalse(recordToRestartRequest.onlyFailed());
    }

    @Test
    public void testRecordToRestartRequestIncludeTasksInconsistent() {
        ConsumerRecord consumerRecord = new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty());
        Struct struct = INLUDE_TASKS_MISSING_STRUCT;
        RestartRequest recordToRestartRequest = this.configStorage.recordToRestartRequest(consumerRecord, new SchemaAndValue(struct.schema(), structToMap(struct)));
        Assert.assertEquals(CONNECTOR_1_NAME, recordToRestartRequest.connectorName());
        Assert.assertFalse(recordToRestartRequest.includeTasks());
        Assert.assertEquals(struct.getBoolean("only-failed"), Boolean.valueOf(recordToRestartRequest.onlyFailed()));
    }

    @Test
    public void testRestoreRestartRequestInconsistentState() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), RESTART_REQUEST_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), RESTART_REQUEST_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), RESTART_REQUEST_STRUCTS.get(2));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), null);
        this.logOffset = 4L;
        expectStart(asList, linkedHashMap);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        this.configStorage.start();
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectPartitionCount(2);
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        Assert.assertTrue(Assert.assertThrows(ConfigException.class, () -> {
            this.configStorage.start();
        }).getMessage().contains("required to have a single partition"));
        PowerMock.verifyAll();
    }

    @Test
    public void testFencableProducerPropertiesInsertedByDefault() throws Exception {
        this.props.put("exactly.once.source.support", "preparing");
        this.props.put("group.id", "my-connect-cluster");
        this.props.remove("transactional.id");
        this.props.remove("enable.idempotence");
        createStore();
        PowerMock.replayAll(new Object[0]);
        Map fencableProducerProps = this.configStorage.fencableProducerProps(this.config);
        Assert.assertEquals("connect-cluster-my-connect-cluster", fencableProducerProps.get("transactional.id"));
        Assert.assertEquals("true", fencableProducerProps.get("enable.idempotence"));
        PowerMock.verifyAll();
    }

    @Test
    public void testFencableProducerPropertiesOverrideUserSuppliedValues() throws Exception {
        this.props.put("exactly.once.source.support", "preparing");
        this.props.put("group.id", "my-other-connect-cluster");
        this.props.put("transactional.id", "my-custom-transactional-id");
        this.props.put("enable.idempotence", "false");
        createStore();
        PowerMock.replayAll(new Object[0]);
        Map fencableProducerProps = this.configStorage.fencableProducerProps(this.config);
        Assert.assertEquals("connect-cluster-my-other-connect-cluster", fencableProducerProps.get("transactional.id"));
        Assert.assertEquals("true", fencableProducerProps.get("enable.idempotence"));
        PowerMock.verifyAll();
    }

    @Test
    public void testConsumerPropertiesInsertedByDefaultWithExactlyOnceSourceEnabled() throws Exception {
        this.props.put("exactly.once.source.support", "enabled");
        this.props.remove("isolation.level");
        createStore();
        expectConfigure();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        Assert.assertEquals(IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT), ((Map) this.capturedConsumerProps.getValue()).get("isolation.level"));
        PowerMock.verifyAll();
    }

    @Test
    public void testConsumerPropertiesOverrideUserSuppliedValuesWithExactlyOnceSourceEnabled() throws Exception {
        this.props.put("exactly.once.source.support", "enabled");
        this.props.put("isolation.level", IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
        createStore();
        expectConfigure();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        Assert.assertEquals(IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT), ((Map) this.capturedConsumerProps.getValue()).get("isolation.level"));
        PowerMock.verifyAll();
    }

    @Test
    public void testConsumerPropertiesNotInsertedByDefaultWithoutExactlyOnceSourceEnabled() throws Exception {
        this.props.put("exactly.once.source.support", "preparing");
        this.props.remove("isolation.level");
        createStore();
        expectConfigure();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        Assert.assertNull(((Map) this.capturedConsumerProps.getValue()).get("isolation.level"));
        PowerMock.verifyAll();
    }

    @Test
    public void testConsumerPropertiesDoNotOverrideUserSuppliedValuesWithoutExactlyOnceSourceEnabled() throws Exception {
        this.props.put("exactly.once.source.support", "preparing");
        this.props.put("isolation.level", IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
        createStore();
        expectConfigure();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        Assert.assertEquals(IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT), ((Map) this.capturedConsumerProps.getValue()).get("isolation.level"));
        PowerMock.verifyAll();
    }

    @Test
    public void testClientIds() throws Exception {
        this.props = new HashMap(DEFAULT_CONFIG_STORAGE_PROPS);
        this.props.put("exactly.once.source.support", "enabled");
        createStore();
        expectConfigure();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        Map fencableProducerProps = this.configStorage.fencableProducerProps(this.config);
        Assert.assertEquals("test-client-id-configs", ((Map) this.capturedProducerProps.getValue()).get("client.id"));
        Assert.assertEquals("test-client-id-configs", ((Map) this.capturedConsumerProps.getValue()).get("client.id"));
        Assert.assertEquals("test-client-id-configs-leader", fencableProducerProps.get("client.id"));
        PowerMock.verifyAll();
    }

    private void expectConfigure() throws Exception {
        PowerMock.expectPrivate(this.configStorage, "createKafkaBasedLog", new Object[]{EasyMock.capture(this.capturedTopic), EasyMock.capture(this.capturedProducerProps), EasyMock.capture(this.capturedConsumerProps), EasyMock.capture(this.capturedConsumedCallback), EasyMock.capture(this.capturedNewTopic), EasyMock.capture(this.capturedAdminSupplier)}).andReturn(this.storeLog);
    }

    private void expectFencableProducer() throws Exception {
        this.fencableProducer.initTransactions();
        EasyMock.expectLastCall();
        PowerMock.expectPrivate(this.configStorage, "createFencableProducer", new Object[0]).andReturn(this.fencableProducer);
    }

    private void expectPartitionCount(int i) {
        EasyMock.expect(Integer.valueOf(this.storeLog.partitionCount())).andReturn(Integer.valueOf(i));
    }

    private void expectStart(List<ConsumerRecord<String, byte[]>> list, Map<byte[], Struct> map) {
        this.storeLog.start();
        PowerMock.expectLastCall().andAnswer(() -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, (ConsumerRecord) it.next());
            }
            return null;
        });
        for (Map.Entry<byte[], Struct> entry : map.entrySet()) {
            EasyMock.expect(this.converter.toConnectData((String) EasyMock.eq(TOPIC), EasyMock.aryEq(entry.getKey()))).andReturn(new SchemaAndValue((Schema) null, structToMap(entry.getValue())));
        }
    }

    private void expectStop() {
        this.storeLog.stop();
        PowerMock.expectLastCall();
    }

    private void expectRead(LinkedHashMap<String, byte[]> linkedHashMap, Map<String, Struct> map) {
        expectReadToEnd(linkedHashMap);
        for (Map.Entry<String, Struct> entry : map.entrySet()) {
            EasyMock.expect(this.converter.toConnectData((String) EasyMock.eq(TOPIC), EasyMock.aryEq(linkedHashMap.get(entry.getKey())))).andReturn(new SchemaAndValue((Schema) null, structToMap(entry.getValue())));
        }
    }

    private void expectRead(String str, byte[] bArr, Struct struct) {
        LinkedHashMap<String, byte[]> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(str, bArr);
        expectRead(linkedHashMap, Collections.singletonMap(str, struct));
    }

    private void expectConvert(Schema schema, Struct struct, byte[] bArr) {
        EasyMock.expect(this.converter.fromConnectData((String) EasyMock.eq(TOPIC), (Schema) EasyMock.eq(schema), EasyMock.eq(struct))).andReturn(bArr);
    }

    private void expectConvertWriteRead(String str, Schema schema, byte[] bArr, String str2, Object obj) throws Exception {
        Capture newCapture = EasyMock.newCapture();
        if (bArr != null) {
            EasyMock.expect(this.converter.fromConnectData((String) EasyMock.eq(TOPIC), (Schema) EasyMock.eq(schema), EasyMock.capture(newCapture))).andReturn(bArr);
        }
        this.storeLog.sendWithReceipt(EasyMock.eq(str), EasyMock.aryEq(bArr));
        EasyMock.expectLastCall().andReturn(this.producerFuture);
        this.producerFuture.get(EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject());
        EasyMock.expectLastCall().andReturn((Object) null);
        EasyMock.expect(this.converter.toConnectData((String) EasyMock.eq(TOPIC), EasyMock.aryEq(bArr))).andAnswer(() -> {
            if (str2 != null) {
                Assert.assertEquals(obj, ((Struct) newCapture.getValue()).get(str2));
            }
            return new SchemaAndValue((Schema) null, bArr == null ? null : structToMap((Struct) newCapture.getValue()));
        });
    }

    private void expectConvertRead(String str, Struct struct, byte[] bArr) {
        EasyMock.expect(this.converter.toConnectData((String) EasyMock.eq(TOPIC), EasyMock.aryEq(bArr))).andAnswer(() -> {
            return new SchemaAndValue((Schema) null, bArr == null ? null : structToMap(struct));
        });
        LinkedHashMap<String, byte[]> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(str, bArr);
        expectReadToEnd(linkedHashMap);
    }

    private void expectReadToEnd(LinkedHashMap<String, byte[]> linkedHashMap) {
        EasyMock.expect(this.storeLog.readToEnd()).andAnswer(() -> {
            TestFuture testFuture = new TestFuture();
            for (Map.Entry entry : linkedHashMap.entrySet()) {
                Callback callback = (Callback) this.capturedConsumedCallback.getValue();
                long j = this.logOffset;
                this.logOffset = j + 1;
                callback.onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 0, j, 0L, TimestampType.CREATE_TIME, 0, 0, entry.getKey(), entry.getValue(), new RecordHeaders(), Optional.empty()));
            }
            testFuture.resolveOnGet((TestFuture) null);
            return testFuture;
        });
    }

    private void expectConnectorRemoval(String str, String str2) throws Exception {
        expectConvertWriteRead(str, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null);
        expectConvertWriteRead(str2, KafkaConfigBackingStore.TARGET_STATE_V0, null, null, null);
        LinkedHashMap<String, byte[]> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(str, null);
        linkedHashMap.put(str2, null);
        expectReadToEnd(linkedHashMap);
    }

    private void expectConvertWriteAndRead(String str, Schema schema, byte[] bArr, String str2, Object obj) throws Exception {
        expectConvertWriteRead(str, schema, bArr, str2, obj);
        LinkedHashMap<String, byte[]> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(str, bArr);
        expectReadToEnd(linkedHashMap);
    }

    private void whiteboxAddConnector(String str, Map<String, String> map, List<Map<String, String>> list) {
        Map map2 = (Map) Whitebox.getInternalState(this.configStorage, "taskConfigs");
        for (int i = 0; i < list.size(); i++) {
            map2.put(new ConnectorTaskId(str, i), list.get(i));
        }
        ((Map) Whitebox.getInternalState(this.configStorage, "connectorConfigs")).put(str, map);
        ((Map) Whitebox.getInternalState(this.configStorage, "connectorTaskCounts")).put(str, Integer.valueOf(list.size()));
    }

    private Map<String, Object> structToMap(Struct struct) {
        if (struct == null) {
            return null;
        }
        HashMap hashMap = new HashMap();
        for (Field field : struct.schema().fields()) {
            hashMap.put(field.name(), struct.get(field));
        }
        return hashMap;
    }

    /* JADX WARN: Type inference failed for: r0v75, types: [byte[], java.lang.Object[]] */
    static {
        DEFAULT_CONFIG_STORAGE_PROPS.put("config.storage.topic", TOPIC);
        DEFAULT_CONFIG_STORAGE_PROPS.put("offset.storage.topic", "connect-offsets");
        DEFAULT_CONFIG_STORAGE_PROPS.put("config.storage.replication.factor", Short.toString((short) 5));
        DEFAULT_CONFIG_STORAGE_PROPS.put("group.id", "connect");
        DEFAULT_CONFIG_STORAGE_PROPS.put("status.storage.topic", "status-topic");
        DEFAULT_CONFIG_STORAGE_PROPS.put("bootstrap.servers", "broker1:9092,broker2:9093");
        DEFAULT_CONFIG_STORAGE_PROPS.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        DEFAULT_CONFIG_STORAGE_PROPS.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        CONNECTOR_IDS = Arrays.asList(CONNECTOR_1_NAME, CONNECTOR_2_NAME);
        CONNECTOR_CONFIG_KEYS = Arrays.asList("connector-connector1", "connector-connector2");
        COMMIT_TASKS_CONFIG_KEYS = Arrays.asList("commit-connector1", "commit-connector2");
        TARGET_STATE_KEYS = Arrays.asList("target-state-connector1", "target-state-connector2");
        CONNECTOR_TASK_COUNT_RECORD_KEYS = Arrays.asList("tasks-fencing-connector1", "tasks-fencing-connector2");
        RESTART_CONNECTOR_KEYS = Arrays.asList(KafkaConfigBackingStore.RESTART_KEY(CONNECTOR_1_NAME), KafkaConfigBackingStore.RESTART_KEY(CONNECTOR_2_NAME));
        TASK_IDS = Arrays.asList(new ConnectorTaskId(CONNECTOR_1_NAME, 0), new ConnectorTaskId(CONNECTOR_1_NAME, 1), new ConnectorTaskId(CONNECTOR_2_NAME, 0));
        TASK_CONFIG_KEYS = Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0");
        SAMPLE_CONFIGS = Arrays.asList(Collections.singletonMap("config-key-one", "config-value-one"), Collections.singletonMap("config-key-two", "config-value-two"), Collections.singletonMap("config-key-three", "config-value-three"));
        CONNECTOR_CONFIG_STRUCTS = Arrays.asList(new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)), new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2)));
        TASK_CONFIG_STRUCTS = Arrays.asList(new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)));
        CONNECTOR_TASK_COUNT_RECORD_STRUCTS = Arrays.asList(new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6), new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9));
        TARGET_STATE_STARTED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED");
        TARGET_STATE_PAUSED_LEGACY = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "PAUSED");
        TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1).put("state", "PAUSED").put("state.v2", "PAUSED");
        TARGET_STATE_STOPPED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1).put("state", "PAUSED").put("state.v2", "STOPPED");
        TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
        TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 0);
        ONLY_FAILED_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put("include-tasks", false);
        INLUDE_TASKS_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put("only-failed", true);
        RESTART_REQUEST_STRUCTS = Arrays.asList(new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put("only-failed", true).put("include-tasks", false), ONLY_FAILED_MISSING_STRUCT, INLUDE_TASKS_MISSING_STRUCT);
        CONFIGS_SERIALIZED = Arrays.asList(new byte[]{"config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(), "config-bytes-4".getBytes(), "config-bytes-5".getBytes(), "config-bytes-6".getBytes(), "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes()});
    }
}
