package org.apache.flink.cdc.connectors.vitess;

import java.sql.Connection;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.connectors.utils.AssertUtils;
import org.apache.flink.cdc.connectors.utils.TestSourceContext;
import org.apache.flink.cdc.connectors.vitess.config.TabletType;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/cdc/connectors/vitess/VitessSourceTest.class */
public class VitessSourceTest extends VitessTestBase {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cdc/connectors/vitess/VitessSourceTest$ForwardDeserializeSchema.class */
    public static class ForwardDeserializeSchema implements DebeziumDeserializationSchema<SourceRecord> {
        private static final long serialVersionUID = 2975058057832211228L;

        private ForwardDeserializeSchema() {
        }

        public void deserialize(SourceRecord sourceRecord, Collector<SourceRecord> collector) throws Exception {
            collector.collect(sourceRecord);
        }

        public TypeInformation<SourceRecord> getProducedType() {
            return TypeInformation.of(SourceRecord.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cdc/connectors/vitess/VitessSourceTest$MockFunctionInitializationContext.class */
    public static class MockFunctionInitializationContext implements FunctionInitializationContext {
        private final boolean isRestored;
        private final OperatorStateStore operatorStateStore;

        private MockFunctionInitializationContext(boolean z, OperatorStateStore operatorStateStore) {
            this.isRestored = z;
            this.operatorStateStore = operatorStateStore;
        }

        public boolean isRestored() {
            return this.isRestored;
        }

        public OptionalLong getRestoredCheckpointId() {
            throw new UnsupportedOperationException();
        }

        public OperatorStateStore getOperatorStateStore() {
            return this.operatorStateStore;
        }

        public KeyedStateStore getKeyedStateStore() {
            throw new UnsupportedOperationException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cdc/connectors/vitess/VitessSourceTest$MockOperatorStateStore.class */
    public static class MockOperatorStateStore implements OperatorStateStore {
        private final ListState<?> restoredOffsetListState;
        private final ListState<?> restoredHistoryListState;

        private MockOperatorStateStore(ListState<?> listState, ListState<?> listState2) {
            this.restoredOffsetListState = listState;
            this.restoredHistoryListState = listState2;
        }

        public <S> ListState<S> getUnionListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
            if (listStateDescriptor.getName().equals("offset-states")) {
                return (ListState<S>) this.restoredOffsetListState;
            }
            if (listStateDescriptor.getName().equals("history-records-states")) {
                return (ListState<S>) this.restoredHistoryListState;
            }
            throw new IllegalStateException("Unknown state.");
        }

        public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> mapStateDescriptor) throws Exception {
            throw new UnsupportedOperationException();
        }

        public <S> ListState<S> getListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
            throw new UnsupportedOperationException();
        }

        public Set<String> getRegisteredStateNames() {
            throw new UnsupportedOperationException();
        }

        public Set<String> getRegisteredBroadcastStateNames() {
            throw new UnsupportedOperationException();
        }
    }

    @Before
    public void before() {
        initializeTable("inventory");
    }

    @Test
    public void testConsumingAllEvents() throws Exception {
        final DebeziumSourceFunction<SourceRecord> createVitessSqlSource = createVitessSqlSource(0);
        final TestSourceContext testSourceContext = new TestSourceContext();
        setupSource(createVitessSqlSource);
        Connection jdbcConnection = getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.cdc.connectors.vitess.VitessSourceTest.1
                        public void go() throws Exception {
                            createVitessSqlSource.run(testSourceContext);
                        }
                    };
                    checkedThread.start();
                    waitForSourceToStart(Duration.ofSeconds(60L), createVitessSqlSource);
                    createStatement.execute("INSERT INTO test.products VALUES (default,'robot','Toy robot',1.304)");
                    AssertUtils.assertInsert((SourceRecord) drain(testSourceContext, 1).get(0), "id", 101);
                    createStatement.execute("INSERT INTO test.products VALUES (1001,'roy','old robot',1234.56)");
                    AssertUtils.assertInsert((SourceRecord) drain(testSourceContext, 1).get(0), "id", 1001);
                    createStatement.execute("UPDATE test.products SET id=2001, description='really old robot' WHERE id=1001");
                    List drain = drain(testSourceContext, 2);
                    AssertUtils.assertDelete((SourceRecord) drain.get(0), "id", 1001);
                    AssertUtils.assertInsert((SourceRecord) drain.get(1), "id", 2001);
                    createStatement.execute("UPDATE test.products SET weight=1345.67 WHERE id=2001");
                    AssertUtils.assertUpdate((SourceRecord) drain(testSourceContext, 1).get(0), "id", 2001);
                    createStatement.execute("ALTER TABLE test.products ADD COLUMN volume FLOAT, ADD COLUMN alias VARCHAR(30) NULL");
                    Thread.sleep(5000L);
                    createStatement.execute("UPDATE test.products SET volume=13.5 WHERE id=2001");
                    AssertUtils.assertUpdate((SourceRecord) drain(testSourceContext, 1).get(0), "id", 2001);
                    createVitessSqlSource.close();
                    checkedThread.sync();
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    if (jdbcConnection != null) {
                        if (0 == 0) {
                            jdbcConnection.close();
                            return;
                        }
                        try {
                            jdbcConnection.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
            throw th8;
        }
    }

    private DebeziumSourceFunction<SourceRecord> createVitessSqlSource(int i) {
        Properties properties = new Properties();
        properties.setProperty("heartbeat.interval.ms", String.valueOf(i));
        return VitessSource.builder().hostname(VITESS_CONTAINER.getHost()).port(VITESS_CONTAINER.getGrpcPort().intValue()).keyspace(VITESS_CONTAINER.getKeyspace()).tabletType(TabletType.MASTER).tableIncludeList(new String[]{"test.products"}).deserializer(new ForwardDeserializeSchema()).debeziumProperties(properties).build();
    }

    private <T> List<T> drain(TestSourceContext<T> testSourceContext, int i) throws Exception {
        ArrayList arrayList = new ArrayList();
        LinkedBlockingQueue collectedOutputs = testSourceContext.getCollectedOutputs();
        while (arrayList.size() < i) {
            StreamRecord streamRecord = (StreamRecord) collectedOutputs.poll(1000L, TimeUnit.SECONDS);
            if (streamRecord == null) {
                throw new RuntimeException("Can't receive " + i + " elements before timeout.");
            }
            arrayList.add(streamRecord.getValue());
        }
        return arrayList;
    }

    private boolean waitForSourceToStart(Duration duration, DebeziumSourceFunction<SourceRecord> debeziumSourceFunction) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        while (System.currentTimeMillis() < currentTimeMillis && !debeziumSourceFunction.getDebeziumStarted()) {
            Thread.sleep(10L);
        }
        Thread.sleep(10000L);
        return debeziumSourceFunction.getDebeziumStarted();
    }

    private static <T> void setupSource(DebeziumSourceFunction<T> debeziumSourceFunction) throws Exception {
        setupSource(debeziumSourceFunction, false, null, null, true, 0, 1);
    }

    private static <T, S1, S2> void setupSource(DebeziumSourceFunction<T> debeziumSourceFunction, boolean z, ListState<S1> listState, ListState<S2> listState2, boolean z2, int i, int i2) throws Exception {
        debeziumSourceFunction.setRuntimeContext(new MockStreamingRuntimeContext(z2, i2, i));
        debeziumSourceFunction.initializeState(new MockFunctionInitializationContext(z, new MockOperatorStateStore(listState, listState2)));
        debeziumSourceFunction.open(new Configuration());
    }
}
