package org.apache.flink.streaming.connectors.kafka;

import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.Preconditions;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.class */
public abstract class KafkaMigrationTestBase extends KafkaTestBase {
    protected static final Logger LOG = LoggerFactory.getLogger(KafkaMigrationTestBase.class);
    protected static final String TOPIC = "flink-kafka-producer-migration-test";
    protected final MigrationVersion testMigrateVersion;
    protected final TypeInformationSerializationSchema<Integer> integerSerializationSchema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
    protected final KeyedSerializationSchema<Integer> integerKeyedSerializationSchema = new KeyedSerializationSchemaWrapper(this.integerSerializationSchema);
    protected final Optional<MigrationVersion> flinkGenerateSavepointVersion = Optional.empty();

    public KafkaMigrationTestBase(MigrationVersion migrationVersion) {
        this.testMigrateVersion = (MigrationVersion) Preconditions.checkNotNull(migrationVersion);
    }

    public String getOperatorSnapshotPath() {
        return getOperatorSnapshotPath(this.testMigrateVersion);
    }

    public String getOperatorSnapshotPath(MigrationVersion migrationVersion) {
        return "src/test/resources/kafka-migration-kafka-producer-flink-" + migrationVersion + "-snapshot";
    }

    @BeforeClass
    public static void prepare() throws Exception {
    }

    @AfterClass
    public static void shutDownServices() throws Exception {
    }

    @Test
    @Ignore
    public void writeSnapshot() throws Exception {
        try {
            Preconditions.checkState(this.flinkGenerateSavepointVersion.isPresent());
            startClusters();
            OperatorSnapshotUtil.writeStateHandle(initializeTestState(), getOperatorSnapshotPath(this.flinkGenerateSavepointVersion.get()));
        } finally {
            shutdownClusters();
        }
    }

    private OperatorSubtaskState initializeTestState() throws Exception {
        OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness = createTestHarness();
        Throwable th = null;
        try {
            createTestHarness.setup();
            createTestHarness.open();
            createTestHarness.processElement(42, 0L);
            OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 1L);
            createTestHarness.notifyOfCompletedCheckpoint(0L);
            createTestHarness.processElement(43, 2L);
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            return snapshot;
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testRestoreProducer() throws Exception {
        try {
            startClusters();
            initializeTestState();
            OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness = createTestHarness();
            Throwable th = null;
            try {
                initializeState(createTestHarness);
                createTestHarness.processElement(44, 4L);
                createTestHarness.snapshot(2L, 5L);
                createTestHarness.notifyOfCompletedCheckpoint(2L);
                createTestHarness.processElement(45, 6L);
                assertExactlyOnceForTopic(createProperties(), TOPIC, 0, Arrays.asList(42, 44));
                if (createTestHarness != null) {
                    if (0 != 0) {
                        try {
                            createTestHarness.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createTestHarness.close();
                    }
                }
            } finally {
            }
        } finally {
            shutdownClusters();
        }
    }

    protected abstract OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness() throws Exception;

    protected abstract Properties createProperties();

    protected void initializeState(OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness) throws Exception {
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.initializeState(getOperatorSnapshotPath());
        oneInputStreamOperatorTestHarness.open();
    }
}
