package org.apache.kafka.connect.storage;

import java.util.HashMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.class */
public class KafkaStatusBackingStoreTest extends EasyMockSupport {
    private static final String STATUS_TOPIC = "status-topic";
    private static final String WORKER_ID = "localhost:8083";
    private static final String CONNECTOR = "conn";
    private static final ConnectorTaskId TASK = new ConnectorTaskId(CONNECTOR, 0);

    @Test
    public void putConnectorState() {
        KafkaBasedLog kafkaBasedLog = (KafkaBasedLog) mock(KafkaBasedLog.class);
        Converter converter = (Converter) mock(Converter.class);
        KafkaStatusBackingStore kafkaStatusBackingStore = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
        byte[] bArr = new byte[0];
        EasyMock.expect(converter.fromConnectData((String) EasyMock.eq(STATUS_TOPIC), (Schema) EasyMock.anyObject(Schema.class), EasyMock.anyObject(Struct.class))).andStubReturn(bArr);
        final Capture newCapture = EasyMock.newCapture();
        kafkaBasedLog.send(EasyMock.eq("status-connector-conn"), EasyMock.eq(bArr), (Callback) EasyMock.capture(newCapture));
        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() { // from class: org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m75answer() throws Throwable {
                ((Callback) newCapture.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
                return null;
            }
        });
        replayAll();
        kafkaStatusBackingStore.put(new ConnectorStatus(CONNECTOR, AbstractStatus.State.RUNNING, WORKER_ID, 0));
        Assert.assertEquals((Object) null, kafkaStatusBackingStore.get(CONNECTOR));
        verifyAll();
    }

    @Test
    public void putConnectorStateRetriableFailure() {
        KafkaBasedLog kafkaBasedLog = (KafkaBasedLog) mock(KafkaBasedLog.class);
        Converter converter = (Converter) mock(Converter.class);
        KafkaStatusBackingStore kafkaStatusBackingStore = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
        byte[] bArr = new byte[0];
        EasyMock.expect(converter.fromConnectData((String) EasyMock.eq(STATUS_TOPIC), (Schema) EasyMock.anyObject(Schema.class), EasyMock.anyObject(Struct.class))).andStubReturn(bArr);
        final Capture newCapture = EasyMock.newCapture();
        kafkaBasedLog.send(EasyMock.eq("status-connector-conn"), EasyMock.eq(bArr), (Callback) EasyMock.capture(newCapture));
        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() { // from class: org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest.3
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m77answer() throws Throwable {
                ((Callback) newCapture.getValue()).onCompletion((RecordMetadata) null, new TimeoutException());
                return null;
            }
        }).andAnswer(new IAnswer<Void>() { // from class: org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest.2
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m76answer() throws Throwable {
                ((Callback) newCapture.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
                return null;
            }
        });
        replayAll();
        kafkaStatusBackingStore.put(new ConnectorStatus(CONNECTOR, AbstractStatus.State.RUNNING, WORKER_ID, 0));
        Assert.assertEquals((Object) null, kafkaStatusBackingStore.get(CONNECTOR));
        verifyAll();
    }

    @Test
    public void putConnectorStateNonRetriableFailure() {
        KafkaBasedLog kafkaBasedLog = (KafkaBasedLog) mock(KafkaBasedLog.class);
        Converter converter = (Converter) mock(Converter.class);
        KafkaStatusBackingStore kafkaStatusBackingStore = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
        byte[] bArr = new byte[0];
        EasyMock.expect(converter.fromConnectData((String) EasyMock.eq(STATUS_TOPIC), (Schema) EasyMock.anyObject(Schema.class), EasyMock.anyObject(Struct.class))).andStubReturn(bArr);
        final Capture newCapture = EasyMock.newCapture();
        kafkaBasedLog.send(EasyMock.eq("status-connector-conn"), EasyMock.eq(bArr), (Callback) EasyMock.capture(newCapture));
        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() { // from class: org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest.4
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m78answer() throws Throwable {
                ((Callback) newCapture.getValue()).onCompletion((RecordMetadata) null, new UnknownServerException());
                return null;
            }
        });
        replayAll();
        kafkaStatusBackingStore.put(new ConnectorStatus(CONNECTOR, AbstractStatus.State.RUNNING, WORKER_ID, 0));
        Assert.assertEquals((Object) null, kafkaStatusBackingStore.get(CONNECTOR));
        verifyAll();
    }

    @Test
    public void putSafeConnectorIgnoresStaleStatus() {
        byte[] bArr = new byte[0];
        KafkaBasedLog kafkaBasedLog = (KafkaBasedLog) mock(KafkaBasedLog.class);
        Converter converter = (Converter) mock(Converter.class);
        KafkaStatusBackingStore kafkaStatusBackingStore = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
        HashMap hashMap = new HashMap();
        hashMap.put("worker_id", "anotherhost:8083");
        hashMap.put("state", "RUNNING");
        hashMap.put("generation", 1L);
        EasyMock.expect(converter.toConnectData(STATUS_TOPIC, bArr)).andReturn(new SchemaAndValue((Schema) null, hashMap));
        replayAll();
        kafkaStatusBackingStore.read(consumerRecord(0L, "status-connector-conn", bArr));
        kafkaStatusBackingStore.putSafe(new ConnectorStatus(CONNECTOR, AbstractStatus.State.UNASSIGNED, WORKER_ID, 0));
        Assert.assertEquals(new ConnectorStatus(CONNECTOR, AbstractStatus.State.RUNNING, "anotherhost:8083", 1), kafkaStatusBackingStore.get(CONNECTOR));
        verifyAll();
    }

    @Test
    public void putSafeWithNoPreviousValueIsPropagated() {
        Converter converter = (Converter) mock(Converter.class);
        KafkaBasedLog kafkaBasedLog = (KafkaBasedLog) mock(KafkaBasedLog.class);
        KafkaStatusBackingStore kafkaStatusBackingStore = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
        byte[] bArr = new byte[0];
        Capture newCapture = EasyMock.newCapture();
        converter.fromConnectData((String) EasyMock.eq(STATUS_TOPIC), (Schema) EasyMock.anyObject(Schema.class), EasyMock.capture(newCapture));
        EasyMock.expectLastCall().andReturn(bArr);
        kafkaBasedLog.send(EasyMock.eq("status-connector-conn"), EasyMock.eq(bArr), (Callback) EasyMock.anyObject(Callback.class));
        EasyMock.expectLastCall();
        replayAll();
        ConnectorStatus connectorStatus = new ConnectorStatus(CONNECTOR, AbstractStatus.State.FAILED, WORKER_ID, 0);
        kafkaStatusBackingStore.putSafe(connectorStatus);
        verifyAll();
        Assert.assertEquals(connectorStatus.state().toString(), ((Struct) newCapture.getValue()).get("state"));
        Assert.assertEquals(connectorStatus.workerId(), ((Struct) newCapture.getValue()).get("worker_id"));
        Assert.assertEquals(Integer.valueOf(connectorStatus.generation()), ((Struct) newCapture.getValue()).get("generation"));
    }

    @Test
    public void putSafeOverridesValueSetBySameWorker() {
        final byte[] bArr = new byte[0];
        KafkaBasedLog kafkaBasedLog = (KafkaBasedLog) mock(KafkaBasedLog.class);
        Converter converter = (Converter) mock(Converter.class);
        final KafkaStatusBackingStore kafkaStatusBackingStore = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
        HashMap hashMap = new HashMap();
        hashMap.put("worker_id", WORKER_ID);
        hashMap.put("state", "RUNNING");
        hashMap.put("generation", 1L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("worker_id", WORKER_ID);
        hashMap2.put("state", "UNASSIGNED");
        hashMap2.put("generation", 0L);
        EasyMock.expect(converter.toConnectData(STATUS_TOPIC, bArr)).andReturn(new SchemaAndValue((Schema) null, hashMap)).andReturn(new SchemaAndValue((Schema) null, hashMap2));
        EasyMock.expect(converter.fromConnectData((String) EasyMock.eq(STATUS_TOPIC), (Schema) EasyMock.anyObject(Schema.class), EasyMock.anyObject(Struct.class))).andStubReturn(bArr);
        final Capture newCapture = EasyMock.newCapture();
        kafkaBasedLog.send(EasyMock.eq("status-connector-conn"), EasyMock.eq(bArr), (Callback) EasyMock.capture(newCapture));
        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() { // from class: org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest.5
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m79answer() throws Throwable {
                ((Callback) newCapture.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
                kafkaStatusBackingStore.read(KafkaStatusBackingStoreTest.consumerRecord(1L, "status-connector-conn", bArr));
                return null;
            }
        });
        replayAll();
        kafkaStatusBackingStore.read(consumerRecord(0L, "status-connector-conn", bArr));
        kafkaStatusBackingStore.putSafe(new ConnectorStatus(CONNECTOR, AbstractStatus.State.UNASSIGNED, WORKER_ID, 0));
        Assert.assertEquals(new ConnectorStatus(CONNECTOR, AbstractStatus.State.UNASSIGNED, WORKER_ID, 0), kafkaStatusBackingStore.get(CONNECTOR));
        verifyAll();
    }

    @Test
    public void putConnectorStateShouldOverride() {
        final byte[] bArr = new byte[0];
        KafkaBasedLog kafkaBasedLog = (KafkaBasedLog) mock(KafkaBasedLog.class);
        Converter converter = (Converter) mock(Converter.class);
        final KafkaStatusBackingStore kafkaStatusBackingStore = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
        HashMap hashMap = new HashMap();
        hashMap.put("worker_id", "anotherhost:8083");
        hashMap.put("state", "RUNNING");
        hashMap.put("generation", 1L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("worker_id", WORKER_ID);
        hashMap2.put("state", "UNASSIGNED");
        hashMap2.put("generation", 0L);
        EasyMock.expect(converter.toConnectData(STATUS_TOPIC, bArr)).andReturn(new SchemaAndValue((Schema) null, hashMap)).andReturn(new SchemaAndValue((Schema) null, hashMap2));
        EasyMock.expect(converter.fromConnectData((String) EasyMock.eq(STATUS_TOPIC), (Schema) EasyMock.anyObject(Schema.class), EasyMock.anyObject(Struct.class))).andStubReturn(bArr);
        final Capture newCapture = EasyMock.newCapture();
        kafkaBasedLog.send(EasyMock.eq("status-connector-conn"), EasyMock.eq(bArr), (Callback) EasyMock.capture(newCapture));
        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() { // from class: org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest.6
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m80answer() throws Throwable {
                ((Callback) newCapture.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
                kafkaStatusBackingStore.read(KafkaStatusBackingStoreTest.consumerRecord(1L, "status-connector-conn", bArr));
                return null;
            }
        });
        replayAll();
        kafkaStatusBackingStore.read(consumerRecord(0L, "status-connector-conn", bArr));
        ConnectorStatus connectorStatus = new ConnectorStatus(CONNECTOR, AbstractStatus.State.UNASSIGNED, WORKER_ID, 0);
        kafkaStatusBackingStore.put(connectorStatus);
        Assert.assertEquals(connectorStatus, kafkaStatusBackingStore.get(CONNECTOR));
        verifyAll();
    }

    @Test
    public void readConnectorState() {
        byte[] bArr = new byte[0];
        KafkaBasedLog kafkaBasedLog = (KafkaBasedLog) mock(KafkaBasedLog.class);
        Converter converter = (Converter) mock(Converter.class);
        KafkaStatusBackingStore kafkaStatusBackingStore = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
        HashMap hashMap = new HashMap();
        hashMap.put("worker_id", WORKER_ID);
        hashMap.put("state", "RUNNING");
        hashMap.put("generation", 0L);
        EasyMock.expect(converter.toConnectData(STATUS_TOPIC, bArr)).andReturn(new SchemaAndValue((Schema) null, hashMap));
        replayAll();
        kafkaStatusBackingStore.read(consumerRecord(0L, "status-connector-conn", bArr));
        Assert.assertEquals(new ConnectorStatus(CONNECTOR, AbstractStatus.State.RUNNING, WORKER_ID, 0), kafkaStatusBackingStore.get(CONNECTOR));
        verifyAll();
    }

    @Test
    public void putTaskState() {
        KafkaBasedLog kafkaBasedLog = (KafkaBasedLog) mock(KafkaBasedLog.class);
        Converter converter = (Converter) mock(Converter.class);
        KafkaStatusBackingStore kafkaStatusBackingStore = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
        byte[] bArr = new byte[0];
        EasyMock.expect(converter.fromConnectData((String) EasyMock.eq(STATUS_TOPIC), (Schema) EasyMock.anyObject(Schema.class), EasyMock.anyObject(Struct.class))).andStubReturn(bArr);
        final Capture newCapture = EasyMock.newCapture();
        kafkaBasedLog.send(EasyMock.eq("status-task-conn-0"), EasyMock.eq(bArr), (Callback) EasyMock.capture(newCapture));
        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() { // from class: org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest.7
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m81answer() throws Throwable {
                ((Callback) newCapture.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
                return null;
            }
        });
        replayAll();
        kafkaStatusBackingStore.put(new TaskStatus(TASK, AbstractStatus.State.RUNNING, WORKER_ID, 0));
        Assert.assertEquals((Object) null, kafkaStatusBackingStore.get(TASK));
        verifyAll();
    }

    @Test
    public void readTaskState() {
        byte[] bArr = new byte[0];
        KafkaBasedLog kafkaBasedLog = (KafkaBasedLog) mock(KafkaBasedLog.class);
        Converter converter = (Converter) mock(Converter.class);
        KafkaStatusBackingStore kafkaStatusBackingStore = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
        HashMap hashMap = new HashMap();
        hashMap.put("worker_id", WORKER_ID);
        hashMap.put("state", "RUNNING");
        hashMap.put("generation", 0L);
        EasyMock.expect(converter.toConnectData(STATUS_TOPIC, bArr)).andReturn(new SchemaAndValue((Schema) null, hashMap));
        replayAll();
        kafkaStatusBackingStore.read(consumerRecord(0L, "status-task-conn-0", bArr));
        Assert.assertEquals(new TaskStatus(TASK, AbstractStatus.State.RUNNING, WORKER_ID, 0), kafkaStatusBackingStore.get(TASK));
        verifyAll();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ConsumerRecord<String, byte[]> consumerRecord(long j, String str, byte[] bArr) {
        return new ConsumerRecord<>(STATUS_TOPIC, 0, j, System.currentTimeMillis(), TimestampType.CREATE_TIME, 0L, 0, 0, str, bArr);
    }
}
