package org.apache.flink.connector.base.source.hybrid;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.class */
public class HybridSourceITCase extends TestLogger {
    private static final int PARALLELISM = 2;

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(PARALLELISM).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());
    private static final List<Integer> EXPECTED_RESULT = (List) IntStream.rangeClosed(0, 39).boxed().collect(Collectors.toList());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.connector.base.source.hybrid.HybridSourceITCase$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/connector/base/source/hybrid/HybridSourceITCase$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$connector$base$source$hybrid$HybridSourceITCase$FailoverType = new int[FailoverType.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$connector$base$source$hybrid$HybridSourceITCase$FailoverType[FailoverType.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$connector$base$source$hybrid$HybridSourceITCase$FailoverType[FailoverType.TM.ordinal()] = HybridSourceITCase.PARALLELISM;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$connector$base$source$hybrid$HybridSourceITCase$FailoverType[FailoverType.JM.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/hybrid/HybridSourceITCase$FailoverType.class */
    public enum FailoverType {
        NONE,
        TM,
        JM
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/hybrid/HybridSourceITCase$RecordCounterToFail.class */
    public static class RecordCounterToFail {
        private static AtomicInteger records;
        private static CompletableFuture<Void> fail;
        private static CompletableFuture<Void> continueProcessing;

        private RecordCounterToFail() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static <T> DataStream<T> wrapWithFailureAfter(DataStream<T> dataStream, int i) {
            records = new AtomicInteger();
            fail = new CompletableFuture<>();
            continueProcessing = new CompletableFuture<>();
            return dataStream.map(obj -> {
                boolean z = records.incrementAndGet() > i;
                if ((!fail.isDone()) && z) {
                    fail.complete(null);
                    continueProcessing.get();
                }
                return obj;
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static void waitToFail() throws ExecutionException, InterruptedException {
            fail.get();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static void continueProcessing() {
            continueProcessing.complete(null);
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 1071972400:
                    if (implMethodName.equals("lambda$wrapWithFailureAfter$64125da1$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/base/source/hybrid/HybridSourceITCase$RecordCounterToFail") && serializedLambda.getImplMethodSignature().equals("(ILjava/lang/Object;)Ljava/lang/Object;")) {
                        int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                        return obj -> {
                            boolean z2 = records.incrementAndGet() > intValue;
                            if ((!fail.isDone()) && z2) {
                                fail.complete(null);
                                continueProcessing.get();
                            }
                            return obj;
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    @Test
    public void testHybridSource() throws Exception {
        testHybridSource(FailoverType.NONE, sourceWithFixedSwitchPosition());
    }

    @Test
    public void testHybridSourceWithDynamicSwitchPosition() throws Exception {
        testHybridSource(FailoverType.NONE, sourceWithDynamicSwitchPosition());
    }

    @Test
    public void testHybridSourceWithTaskManagerFailover() throws Exception {
        testHybridSource(FailoverType.TM, sourceWithFixedSwitchPosition());
    }

    @Test
    public void testHybridSourceWithJobManagerFailover() throws Exception {
        testHybridSource(FailoverType.JM, sourceWithFixedSwitchPosition());
    }

    private Source sourceWithFixedSwitchPosition() {
        int size = EXPECTED_RESULT.size() / 4;
        return HybridSource.builder(new MockBaseSource(PARALLELISM, size, Boundedness.BOUNDED)).addSource(new MockBaseSource(PARALLELISM, size, 20, Boundedness.BOUNDED)).build();
    }

    private Source sourceWithDynamicSwitchPosition() {
        return HybridSource.builder(new MockBaseSource(PARALLELISM, 10, Boundedness.BOUNDED)).addSource(sourceSwitchContext -> {
            return new MockBaseSource(PARALLELISM, 10, 20, Boundedness.BOUNDED);
        }, Boundedness.BOUNDED).build();
    }

    private void testHybridSource(FailoverType failoverType, Source source) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.setRestartStrategy(FailoverType.NONE == failoverType ? RestartStrategies.noRestart() : RestartStrategies.fixedDelayRestart(1, 0L));
        ClientAndIterator collectWithClient = DataStreamUtils.collectWithClient(RecordCounterToFail.wrapWithFailureAfter(executionEnvironment.fromSource(source, WatermarkStrategy.noWatermarks(), "hybrid-source").returns(Integer.class), EXPECTED_RESULT.size() / PARALLELISM), HybridSourceITCase.class.getSimpleName() + '-' + failoverType.name());
        JobID jobID = collectWithClient.client.getJobID();
        RecordCounterToFail.waitToFail();
        triggerFailover(failoverType, jobID, () -> {
            RecordCounterToFail.continueProcessing();
        }, this.miniClusterResource.getMiniCluster());
        ArrayList arrayList = new ArrayList();
        while (arrayList.size() < EXPECTED_RESULT.size() && collectWithClient.iterator.hasNext()) {
            arrayList.add(collectWithClient.iterator.next());
        }
        verifyResult(arrayList);
    }

    private static void triggerFailover(FailoverType failoverType, JobID jobID, Runnable runnable, MiniCluster miniCluster) throws Exception {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$connector$base$source$hybrid$HybridSourceITCase$FailoverType[failoverType.ordinal()]) {
            case 1:
                runnable.run();
                return;
            case PARALLELISM /* 2 */:
                restartTaskManager(runnable, miniCluster);
                return;
            case 3:
                triggerJobManagerFailover(jobID, runnable, miniCluster);
                return;
            default:
                return;
        }
    }

    private static void triggerJobManagerFailover(JobID jobID, Runnable runnable, MiniCluster miniCluster) throws Exception {
        HaLeadershipControl haLeadershipControl = (HaLeadershipControl) miniCluster.getHaLeadershipControl().get();
        haLeadershipControl.revokeJobMasterLeadership(jobID).get();
        runnable.run();
        haLeadershipControl.grantJobMasterLeadership(jobID).get();
    }

    private static void restartTaskManager(Runnable runnable, MiniCluster miniCluster) throws Exception {
        miniCluster.terminateTaskManager(0).get();
        runnable.run();
        miniCluster.startTaskManager();
    }

    private static void verifyResult(List<Integer> list) {
        Collections.sort(list);
        Assert.assertThat(list, Matchers.equalTo(EXPECTED_RESULT));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1276200524:
                if (implMethodName.equals("lambda$sourceWithDynamicSwitchPosition$fa7af76a$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/base/source/hybrid/HybridSource$SourceFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/connector/base/source/hybrid/HybridSource$SourceSwitchContext;)Lorg/apache/flink/api/connector/source/Source;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/base/source/hybrid/HybridSourceITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/connector/base/source/hybrid/HybridSource$SourceSwitchContext;)Lorg/apache/flink/connector/base/source/reader/mocks/MockBaseSource;")) {
                    return sourceSwitchContext -> {
                        return new MockBaseSource(PARALLELISM, 10, 20, Boundedness.BOUNDED);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
