/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGatewayDecoratorBase;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.NumberSequenceSourceWithWaitForCheckpoint;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.function.TriFunction;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class OperatorEventSendingCheckpointITCase
extends TestLogger {
    private static final int PARALLELISM = 1;
    private static MiniCluster flinkCluster;

    @BeforeClass
    public static void setupMiniClusterAndEnv() throws Exception {
        Configuration config = new Configuration();
        flinkCluster = new MiniClusterWithRpcIntercepting(1, config);
        flinkCluster.start();
        TestStreamEnvironment.setAsContext((MiniCluster)flinkCluster, (int)1);
    }

    @AfterClass
    public static void clearEnvAndStopMiniCluster() throws Exception {
        TestStreamEnvironment.unsetAsContext();
        if (flinkCluster != null) {
            flinkCluster.close();
            flinkCluster = null;
        }
    }

    @Test
    public void testOperatorEventLostNoReaderFailure() throws Exception {
        int[] eventsToLose = new int[]{2, 4, 6};
        OpEventRpcInterceptor.currentHandler = new OperatorEventRpcHandler((task, operator, event, originalRpcHandler) -> OperatorEventSendingCheckpointITCase.askTimeoutFuture(), eventsToLose);
        this.runTest(false);
    }

    @Test
    public void testOperatorEventLostWithReaderFailure() throws Exception {
        int[] eventsToLose = new int[]{1, 3};
        OpEventRpcInterceptor.currentHandler = new OperatorEventRpcHandler((task, operator, event, originalRpcHandler) -> OperatorEventSendingCheckpointITCase.askTimeoutFuture(), eventsToLose);
        this.runTest(true);
    }

    @Test
    public void testOperatorEventAckLost() throws Exception {
        int[] eventsWithLostAck = new int[]{2, 4};
        OpEventRpcInterceptor.currentHandler = new OperatorEventRpcHandler((task, operator, event, originalRpcHandler) -> {
            originalRpcHandler.apply((Object)task, (Object)operator, (Object)event);
            return OperatorEventSendingCheckpointITCase.askTimeoutFuture();
        }, eventsWithLostAck);
        this.runTest(false);
    }

    @Test
    public void testOperatorEventAckDelay() throws Exception {
        int[] eventsWithLateAck = new int[]{2, 4};
        OpEventRpcInterceptor.currentHandler = new OperatorEventRpcHandler((task, operator, event, originalRpcHandler) -> {
            CompletableFuture result = (CompletableFuture)originalRpcHandler.apply((Object)task, (Object)operator, (Object)event);
            CompletableFuture<Acknowledge> late = OperatorEventSendingCheckpointITCase.lateFuture();
            return result.thenCompose(v -> late);
        }, eventsWithLateAck);
        this.runTest(false);
    }

    private void runTest(boolean intermittentFailure) throws Exception {
        int numElements = 100;
        final int failAt = intermittentFailure ? 50 : 200;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(50L);
        SingleOutputStreamOperator numbers = env.fromSource((Source)new NumberSequenceSourceWithWaitForCheckpoint(1L, 100L, 3), WatermarkStrategy.noWatermarks(), "numbers").map((MapFunction)new MapFunction<Long, Long>(){
            private int num;

            public Long map(Long value) throws Exception {
                if (++this.num > failAt) {
                    throw new Exception("Artificial intermittent failure.");
                }
                return value;
            }
        });
        List sequence = numbers.executeAndCollect(100);
        sequence.sort(Long::compareTo);
        List expectedSequence = LongStream.rangeClosed(1L, 100L).boxed().collect(Collectors.toList());
        Assert.assertEquals(expectedSequence, (Object)sequence);
    }

    private static CompletableFuture<Acknowledge> askTimeoutFuture() {
        CompletableFuture<Acknowledge> future = new CompletableFuture<Acknowledge>();
        long timeout = 500L;
        FutureUtils.orTimeout(future, (long)500L, (TimeUnit)TimeUnit.MILLISECONDS, (String)String.format("Future timed out after %s ms.", 500L));
        return future;
    }

    private static CompletableFuture<Acknowledge> lateFuture() {
        CompletableFuture<Acknowledge> future = new CompletableFuture<Acknowledge>();
        FutureUtils.completeDelayed(future, (Object)Acknowledge.get(), (Duration)Duration.ofMillis(500L));
        return future;
    }

    private static class MiniClusterWithRpcIntercepting
    extends MiniCluster {
        private boolean localRpcCreated;

        public MiniClusterWithRpcIntercepting(int numSlots, Configuration configuration) {
            super(new MiniClusterConfiguration.Builder().withRandomPorts().setRpcServiceSharing(RpcServiceSharing.SHARED).setNumTaskManagers(1).setConfiguration(configuration).setNumSlotsPerTaskManager(numSlots).build());
        }

        public void start() throws Exception {
            super.start();
            if (!this.localRpcCreated) {
                throw new Exception("MiniClusterWithRpcIntercepting is broken, the intercepting local RPC service was not created.");
            }
        }

        protected RpcService createLocalRpcService(Configuration configuration, RpcSystem rpcSystem) throws Exception {
            this.localRpcCreated = true;
            return new InterceptingRpcService(rpcSystem.localServiceBuilder(configuration).withExecutorConfiguration(RpcUtils.getTestForkJoinExecutorConfiguration()).createAndStart());
        }
    }

    private static class InterceptingRpcService
    implements RpcService {
        private final RpcService rpcService;

        public InterceptingRpcService(RpcService rpcService) {
            this.rpcService = rpcService;
        }

        public String getAddress() {
            return this.rpcService.getAddress();
        }

        public int getPort() {
            return this.rpcService.getPort();
        }

        public <C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType, RpcServer rpcServer) {
            return (C)this.rpcService.getSelfGateway(selfGatewayType, rpcServer);
        }

        public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
            CompletableFuture<C> future = this.rpcService.connect(address, clazz);
            return clazz == TaskExecutorGateway.class ? this.decorateTmGateway(future) : future;
        }

        public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String address, F fencingToken, Class<C> clazz) {
            return this.rpcService.connect(address, fencingToken, clazz);
        }

        public <C extends RpcEndpoint> RpcServer startServer(C rpcEndpoint) {
            return this.rpcService.startServer(rpcEndpoint);
        }

        public void stopServer(RpcServer selfGateway) {
            this.rpcService.stopServer(selfGateway);
        }

        public CompletableFuture<Void> closeAsync() {
            return this.rpcService.closeAsync();
        }

        public ScheduledExecutor getScheduledExecutor() {
            return this.rpcService.getScheduledExecutor();
        }

        private <C extends RpcGateway> CompletableFuture<C> decorateTmGateway(CompletableFuture<C> future) {
            CompletionStage wrapped = future.thenApply(gateway -> new OpEventRpcInterceptor((TaskExecutorGateway)gateway));
            return wrapped;
        }
    }

    private static final class OpEventRpcInterceptor
    extends TaskExecutorGatewayDecoratorBase {
        static OperatorEventRpcHandler currentHandler = new OperatorEventRpcHandler((task, id, evt, rpc) -> null, Collections.emptySet());

        OpEventRpcInterceptor(TaskExecutorGateway originalGateway) {
            super(originalGateway);
        }

        public CompletableFuture<Acknowledge> sendOperatorEventToTask(ExecutionAttemptID task, OperatorID operator, SerializedValue<OperatorEvent> evt) {
            return currentHandler.filterCall(task, operator, evt, (TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>>)((TriFunction)(x$0, x$1, x$2) -> super.sendOperatorEventToTask(x$0, x$1, x$2)));
        }
    }

    private static class OperatorEventRpcHandler {
        private final FilteredRpcAction actionForFilteredEvent;
        private final Set<Integer> eventsToFilter;
        private int eventNum;

        OperatorEventRpcHandler(FilteredRpcAction actionForFilteredEvent, int ... eventsToFilter) {
            this(actionForFilteredEvent, IntStream.of(eventsToFilter).boxed().collect(Collectors.toSet()));
        }

        OperatorEventRpcHandler(FilteredRpcAction actionForFilteredEvent, Set<Integer> eventsToFilter) {
            this.actionForFilteredEvent = actionForFilteredEvent;
            this.eventsToFilter = eventsToFilter;
        }

        CompletableFuture<Acknowledge> filterCall(ExecutionAttemptID task, OperatorID operator, SerializedValue<OperatorEvent> evt, TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> rpcHandler) {
            Object o;
            try {
                o = evt.deserializeValue(this.getClass().getClassLoader());
            }
            catch (Exception e) {
                throw new Error(e);
            }
            if ((o instanceof AddSplitEvent || o instanceof NoMoreSplitsEvent) && this.eventsToFilter.contains(++this.eventNum)) {
                return this.actionForFilteredEvent.handleEvent(task, operator, evt, rpcHandler);
            }
            return (CompletableFuture)rpcHandler.apply((Object)task, (Object)operator, evt);
        }

        static interface FilteredRpcAction {
            public CompletableFuture<Acknowledge> handleEvent(ExecutionAttemptID var1, OperatorID var2, SerializedValue<OperatorEvent> var3, TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> var4);
        }
    }
}

