package org.apache.flink.runtime.operators.coordination;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGatewayDecoratorBase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.function.TriFunction;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.class */
public class OperatorEventSendingCheckpointITCase extends TestLogger {
    private static final int PARALLELISM = 1;
    private static MiniCluster flinkCluster;

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase$AssignAfterCheckpointEnumerator.class */
    private static final class AssignAfterCheckpointEnumerator<SplitT extends IteratorSourceSplit<?, ?>> extends IteratorSourceEnumerator<SplitT> {
        private final Queue<Integer> pendingRequests;
        private final SplitEnumeratorContext<?> context;

        /* JADX WARN: Multi-variable type inference failed */
        public AssignAfterCheckpointEnumerator(SplitEnumeratorContext<SplitT> splitEnumeratorContext, Collection<SplitT> collection) {
            super(splitEnumeratorContext, collection);
            this.pendingRequests = new ArrayDeque();
            this.context = splitEnumeratorContext;
        }

        public void handleSplitRequest(int i, @Nullable String str) {
            this.pendingRequests.add(Integer.valueOf(i));
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Collection<SplitT> m707snapshotState(long j) throws Exception {
            this.context.runInCoordinatorThread(this::fullFillPendingRequests);
            return super.snapshotState(j);
        }

        private void fullFillPendingRequests() {
            Iterator<Integer> it = this.pendingRequests.iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                if (this.context.registeredReaders().containsKey(Integer.valueOf(intValue))) {
                    super.handleSplitRequest(intValue, (String) null);
                }
            }
            this.pendingRequests.clear();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase$CheckpointListeningIteratorSourceReader.class */
    private static class CheckpointListeningIteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>> extends IteratorSourceReader<E, IterT, SplitT> {
        private boolean checkpointed;
        private long messagesProduced;
        private final long numAllowedMessageBeforeCheckpoint;

        public CheckpointListeningIteratorSourceReader(SourceReaderContext sourceReaderContext, long j) {
            super(sourceReaderContext);
            this.checkpointed = false;
            this.messagesProduced = 0L;
            this.numAllowedMessageBeforeCheckpoint = j;
        }

        public InputStatus pollNext(ReaderOutput<E> readerOutput) {
            if (this.messagesProduced >= this.numAllowedMessageBeforeCheckpoint && !this.checkpointed) {
                return InputStatus.NOTHING_AVAILABLE;
            }
            this.messagesProduced++;
            return super.pollNext(readerOutput);
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            this.checkpointed = true;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase$InterceptingRpcService.class */
    private static class InterceptingRpcService implements RpcService {
        private final RpcService rpcService;

        public InterceptingRpcService(RpcService rpcService) {
            this.rpcService = rpcService;
        }

        public String getAddress() {
            return this.rpcService.getAddress();
        }

        public int getPort() {
            return this.rpcService.getPort();
        }

        public <C extends RpcGateway> CompletableFuture<C> connect(String str, Class<C> cls) {
            CompletableFuture<C> connect = this.rpcService.connect(str, cls);
            return cls == TaskExecutorGateway.class ? decorateTmGateway(connect) : connect;
        }

        public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String str, F f, Class<C> cls) {
            return this.rpcService.connect(str, f, cls);
        }

        public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C c) {
            return this.rpcService.startServer(c);
        }

        public <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F f) {
            return this.rpcService.fenceRpcServer(rpcServer, f);
        }

        public void stopServer(RpcServer rpcServer) {
            this.rpcService.stopServer(rpcServer);
        }

        public CompletableFuture<Void> stopService() {
            return this.rpcService.stopService();
        }

        public CompletableFuture<Void> getTerminationFuture() {
            return this.rpcService.getTerminationFuture();
        }

        public ScheduledExecutor getScheduledExecutor() {
            return this.rpcService.getScheduledExecutor();
        }

        public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long j, TimeUnit timeUnit) {
            return this.rpcService.scheduleRunnable(runnable, j, timeUnit);
        }

        public void execute(Runnable runnable) {
            this.rpcService.execute(runnable);
        }

        public <T> CompletableFuture<T> execute(Callable<T> callable) {
            return this.rpcService.execute(callable);
        }

        private <C extends RpcGateway> CompletableFuture<C> decorateTmGateway(CompletableFuture<C> completableFuture) {
            return (CompletableFuture<C>) completableFuture.thenApply(rpcGateway -> {
                return new OpEventRpcInterceptor((TaskExecutorGateway) rpcGateway);
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase$MiniClusterWithRpcIntercepting.class */
    private static class MiniClusterWithRpcIntercepting extends MiniCluster {
        private boolean localRpcCreated;

        public MiniClusterWithRpcIntercepting(int i, Configuration configuration) {
            super(new MiniClusterConfiguration.Builder().withRandomPorts().setRpcServiceSharing(RpcServiceSharing.SHARED).setNumTaskManagers(OperatorEventSendingCheckpointITCase.PARALLELISM).setConfiguration(configuration).setNumSlotsPerTaskManager(i).build());
        }

        public void start() throws Exception {
            super.start();
            if (!this.localRpcCreated) {
                throw new Exception("MiniClusterWithRpcIntercepting is broken, the intercepting local RPC service was not created.");
            }
        }

        protected RpcService createLocalRpcService(Configuration configuration, RpcSystem rpcSystem) throws Exception {
            this.localRpcCreated = true;
            return new InterceptingRpcService(rpcSystem.localServiceBuilder(configuration).withExecutorConfiguration(RpcUtils.getTestForkJoinExecutorConfiguration()).createAndStart());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase$OpEventRpcInterceptor.class */
    public static final class OpEventRpcInterceptor extends TaskExecutorGatewayDecoratorBase {
        static OperatorEventRpcHandler currentHandler = new OperatorEventRpcHandler((executionAttemptID, operatorID, serializedValue, triFunction) -> {
            return null;
        }, (Set<Integer>) Collections.emptySet());

        OpEventRpcInterceptor(TaskExecutorGateway taskExecutorGateway) {
            super(taskExecutorGateway);
        }

        public CompletableFuture<Acknowledge> sendOperatorEventToTask(ExecutionAttemptID executionAttemptID, OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue) {
            return currentHandler.filterCall(executionAttemptID, operatorID, serializedValue, (executionAttemptID2, operatorID2, serializedValue2) -> {
                return super.sendOperatorEventToTask(executionAttemptID2, operatorID2, serializedValue2);
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase$OperatorEventRpcHandler.class */
    private static class OperatorEventRpcHandler {
        private final FilteredRpcAction actionForFilteredEvent;
        private final Set<Integer> eventsToFilter;
        private int eventNum;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase$OperatorEventRpcHandler$FilteredRpcAction.class */
        public interface FilteredRpcAction {
            CompletableFuture<Acknowledge> handleEvent(ExecutionAttemptID executionAttemptID, OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue, TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> triFunction);
        }

        OperatorEventRpcHandler(FilteredRpcAction filteredRpcAction, int... iArr) {
            this(filteredRpcAction, (Set<Integer>) IntStream.of(iArr).boxed().collect(Collectors.toSet()));
        }

        OperatorEventRpcHandler(FilteredRpcAction filteredRpcAction, Set<Integer> set) {
            this.actionForFilteredEvent = filteredRpcAction;
            this.eventsToFilter = set;
        }

        CompletableFuture<Acknowledge> filterCall(ExecutionAttemptID executionAttemptID, OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue, TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> triFunction) {
            try {
                Object deserializeValue = serializedValue.deserializeValue(getClass().getClassLoader());
                if ((deserializeValue instanceof AddSplitEvent) || (deserializeValue instanceof NoMoreSplitsEvent)) {
                    Set<Integer> set = this.eventsToFilter;
                    int i = this.eventNum + OperatorEventSendingCheckpointITCase.PARALLELISM;
                    this.eventNum = i;
                    if (set.contains(Integer.valueOf(i))) {
                        return this.actionForFilteredEvent.handleEvent(executionAttemptID, operatorID, serializedValue, triFunction);
                    }
                }
                return (CompletableFuture) triFunction.apply(executionAttemptID, operatorID, serializedValue);
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase$TestingNumberSequenceSource.class */
    public static class TestingNumberSequenceSource extends NumberSequenceSource {
        private static final long serialVersionUID = 1;
        private final int numSplits;
        private final long numAllowedMessageBeforeCheckpoint;

        public TestingNumberSequenceSource(long j, long j2, int i) {
            super(j, j2);
            this.numSplits = i;
            this.numAllowedMessageBeforeCheckpoint = (j2 - j) / i;
        }

        public SplitEnumerator<NumberSequenceSource.NumberSequenceSplit, Collection<NumberSequenceSource.NumberSequenceSplit>> createEnumerator(SplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> splitEnumeratorContext) {
            return new AssignAfterCheckpointEnumerator(splitEnumeratorContext, splitNumberRange(getFrom(), getTo(), this.numSplits));
        }

        public SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> createReader(SourceReaderContext sourceReaderContext) {
            return new CheckpointListeningIteratorSourceReader(sourceReaderContext, this.numAllowedMessageBeforeCheckpoint);
        }
    }

    @BeforeClass
    public static void setupMiniClusterAndEnv() throws Exception {
        flinkCluster = new MiniClusterWithRpcIntercepting(PARALLELISM, new Configuration());
        flinkCluster.start();
        TestStreamEnvironment.setAsContext(flinkCluster, PARALLELISM);
    }

    @AfterClass
    public static void clearEnvAndStopMiniCluster() throws Exception {
        TestStreamEnvironment.unsetAsContext();
        if (flinkCluster != null) {
            flinkCluster.close();
            flinkCluster = null;
        }
    }

    @Test
    public void testOperatorEventLostNoReaderFailure() throws Exception {
        OpEventRpcInterceptor.currentHandler = new OperatorEventRpcHandler((executionAttemptID, operatorID, serializedValue, triFunction) -> {
            return askTimeoutFuture();
        }, 2, 4, 6);
        runTest(false);
    }

    @Test
    public void testOperatorEventLostWithReaderFailure() throws Exception {
        OpEventRpcInterceptor.currentHandler = new OperatorEventRpcHandler((executionAttemptID, operatorID, serializedValue, triFunction) -> {
            return askTimeoutFuture();
        }, PARALLELISM, 3);
        runTest(true);
    }

    @Test
    public void testOperatorEventAckLost() throws Exception {
        OpEventRpcInterceptor.currentHandler = new OperatorEventRpcHandler((executionAttemptID, operatorID, serializedValue, triFunction) -> {
            triFunction.apply(executionAttemptID, operatorID, serializedValue);
            return askTimeoutFuture();
        }, 2, 4);
        runTest(false);
    }

    @Test
    public void testOperatorEventAckDelay() throws Exception {
        OpEventRpcInterceptor.currentHandler = new OperatorEventRpcHandler((executionAttemptID, operatorID, serializedValue, triFunction) -> {
            CompletableFuture completableFuture = (CompletableFuture) triFunction.apply(executionAttemptID, operatorID, serializedValue);
            CompletableFuture<Acknowledge> lateFuture = lateFuture();
            return completableFuture.thenCompose(acknowledge -> {
                return lateFuture;
            });
        }, 2, 4);
        runTest(false);
    }

    private void runTest(boolean z) throws Exception {
        final int i = z ? 50 : 200;
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.enableCheckpointing(50L);
        List executeAndCollect = executionEnvironment.fromSource(new TestingNumberSequenceSource(1L, 100L, 3), WatermarkStrategy.noWatermarks(), "numbers").map(new MapFunction<Long, Long>() { // from class: org.apache.flink.runtime.operators.coordination.OperatorEventSendingCheckpointITCase.1
            private int num;

            public Long map(Long l) throws Exception {
                int i2 = this.num + OperatorEventSendingCheckpointITCase.PARALLELISM;
                this.num = i2;
                if (i2 > i) {
                    throw new Exception("Artificial intermittent failure.");
                }
                return l;
            }
        }).executeAndCollect(100);
        executeAndCollect.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        Assert.assertEquals((List) LongStream.rangeClosed(1L, 100L).boxed().collect(Collectors.toList()), executeAndCollect);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static CompletableFuture<Acknowledge> askTimeoutFuture() {
        CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
        FutureUtils.orTimeout(completableFuture, 500L, TimeUnit.MILLISECONDS);
        return completableFuture;
    }

    private static CompletableFuture<Acknowledge> lateFuture() {
        CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
        FutureUtils.completeDelayed(completableFuture, Acknowledge.get(), Duration.ofMillis(500L));
        return completableFuture;
    }
}
