package org.apache.iceberg.flink.source;

import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.IcebergSource;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/iceberg/flink/source/TestIcebergSourceFailover.class */
public class TestIcebergSourceFailover {
    private static final int PARALLELISM = 4;

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(PARALLELISM).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());

    @Rule
    public final HadoopTableResource sourceTableResource = new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, schema());

    @Rule
    public final HadoopTableResource sinkTableResource = new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.SINK_TABLE, schema());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iceberg/flink/source/TestIcebergSourceFailover$FailoverType.class */
    public enum FailoverType {
        NONE,
        TM,
        JM
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iceberg/flink/source/TestIcebergSourceFailover$RecordCounterToFail.class */
    public static class RecordCounterToFail {
        private static AtomicInteger records;
        private static CompletableFuture<Void> fail;
        private static CompletableFuture<Void> continueProcessing;

        private RecordCounterToFail() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static <T> DataStream<T> wrapWithFailureAfter(DataStream<T> dataStream, int i) {
            records = new AtomicInteger();
            fail = new CompletableFuture<>();
            continueProcessing = new CompletableFuture<>();
            return dataStream.map(obj -> {
                boolean z = records.incrementAndGet() > i;
                if ((!fail.isDone()) && z) {
                    fail.complete(null);
                    continueProcessing.get();
                }
                return obj;
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static void waitToFail() throws ExecutionException, InterruptedException {
            fail.get();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static void continueProcessing() {
            continueProcessing.complete(null);
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 1071972400:
                    if (implMethodName.equals("lambda$wrapWithFailureAfter$64125da1$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/iceberg/flink/source/TestIcebergSourceFailover$RecordCounterToFail") && serializedLambda.getImplMethodSignature().equals("(ILjava/lang/Object;)Ljava/lang/Object;")) {
                        int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                        return obj -> {
                            boolean z2 = records.incrementAndGet() > intValue;
                            if ((!fail.isDone()) && z2) {
                                fail.complete(null);
                                continueProcessing.get();
                            }
                            return obj;
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    protected IcebergSource.Builder<RowData> sourceBuilder() {
        Configuration configuration = new Configuration();
        configuration.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128);
        return IcebergSource.forRowData().tableLoader(this.sourceTableResource.tableLoader()).assignerFactory(new SimpleSplitAssignerFactory()).flinkConfig(configuration);
    }

    protected Schema schema() {
        return TestFixtures.SCHEMA;
    }

    protected List<Record> generateRecords(int i, long j) {
        return RandomGenericData.generate(schema(), i, j);
    }

    protected void assertRecords(Table table, List<Record> list, Duration duration, int i) throws Exception {
        SimpleDataUtil.assertTableRecords(table, list, duration, i);
    }

    @Test
    public void testBoundedWithTaskManagerFailover() throws Exception {
        testBoundedIcebergSource(FailoverType.TM);
    }

    @Test
    public void testBoundedWithJobManagerFailover() throws Exception {
        testBoundedIcebergSource(FailoverType.JM);
    }

    private void testBoundedIcebergSource(FailoverType failoverType) throws Exception {
        ArrayList newArrayList = Lists.newArrayList();
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(this.sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        for (int i = 0; i < PARALLELISM; i++) {
            List<Record> generateRecords = generateRecords(2, i);
            newArrayList.addAll(generateRecords);
            genericAppenderHelper.appendToTable(generateRecords);
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
        FlinkSink.forRowData(RecordCounterToFail.wrapWithFailureAfter(executionEnvironment.fromSource(sourceBuilder().build(), WatermarkStrategy.noWatermarks(), "IcebergSource", TypeInformation.of(RowData.class)), newArrayList.size() / 2)).table(this.sinkTableResource.table()).tableLoader(this.sinkTableResource.tableLoader()).append();
        JobID jobID = executionEnvironment.executeAsync("Bounded Iceberg Source Failover Test").getJobID();
        RecordCounterToFail.waitToFail();
        triggerFailover(failoverType, jobID, () -> {
            RecordCounterToFail.continueProcessing();
        }, this.miniClusterResource.getMiniCluster());
        assertRecords(this.sinkTableResource.table(), newArrayList, Duration.ofMillis(10L), 12000);
    }

    @Test
    public void testContinuousWithTaskManagerFailover() throws Exception {
        testContinuousIcebergSource(FailoverType.TM);
    }

    @Test
    public void testContinuousWithJobManagerFailover() throws Exception {
        testContinuousIcebergSource(FailoverType.JM);
    }

    private void testContinuousIcebergSource(FailoverType failoverType) throws Exception {
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(this.sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        ArrayList newArrayList = Lists.newArrayList();
        List<Record> generateRecords = generateRecords(2, 0L);
        newArrayList.addAll(generateRecords);
        genericAppenderHelper.appendToTable(generateRecords);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.enableCheckpointing(10L);
        new Configuration().setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128);
        FlinkSink.forRowData(executionEnvironment.fromSource(sourceBuilder().streaming(true).monitorInterval(Duration.ofMillis(10L)).streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build(), WatermarkStrategy.noWatermarks(), "IcebergSource", TypeInformation.of(RowData.class))).table(this.sinkTableResource.table()).tableLoader(this.sinkTableResource.tableLoader()).append();
        JobID jobID = executionEnvironment.executeAsync("Continuous Iceberg Source Failover Test").getJobID();
        for (int i = 1; i < 5; i++) {
            Thread.sleep(10L);
            List<Record> generateRecords2 = generateRecords(2, i);
            newArrayList.addAll(generateRecords2);
            genericAppenderHelper.appendToTable(generateRecords2);
            if (i == 2) {
                triggerFailover(failoverType, jobID, () -> {
                }, this.miniClusterResource.getMiniCluster());
            }
        }
        assertRecords(this.sinkTableResource.table(), newArrayList, Duration.ofMillis(10L), 12000);
    }

    private static void triggerFailover(FailoverType failoverType, JobID jobID, Runnable runnable, MiniCluster miniCluster) throws Exception {
        switch (failoverType) {
            case NONE:
                runnable.run();
                return;
            case TM:
                restartTaskManager(runnable, miniCluster);
                return;
            case JM:
                triggerJobManagerFailover(jobID, runnable, miniCluster);
                return;
            default:
                return;
        }
    }

    private static void triggerJobManagerFailover(JobID jobID, Runnable runnable, MiniCluster miniCluster) throws Exception {
        HaLeadershipControl haLeadershipControl = (HaLeadershipControl) miniCluster.getHaLeadershipControl().get();
        haLeadershipControl.revokeJobMasterLeadership(jobID).get();
        runnable.run();
        haLeadershipControl.grantJobMasterLeadership(jobID).get();
    }

    private static void restartTaskManager(Runnable runnable, MiniCluster miniCluster) throws Exception {
        miniCluster.terminateTaskManager(0).get();
        runnable.run();
        miniCluster.startTaskManager();
    }
}
