/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.collect;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkFunction;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator;
import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionInitializationContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CollectSinkFunctionTest
extends TestLogger {
    private static final int MAX_RESULTS_PER_BATCH = 3;
    private static final String ACCUMULATOR_NAME = "tableCollectAccumulator";
    private static final int FUTURE_TIMEOUT_MILLIS = 10000;
    private static final int SOCKET_TIMEOUT_MILLIS = 1000;
    private static final int MAX_RETIRES = 100;
    private static final JobID TEST_JOB_ID = new JobID();
    private static final OperatorID TEST_OPERATOR_ID = new OperatorID();
    private static final TypeSerializer<Integer> serializer = IntSerializer.INSTANCE;
    private CollectSinkFunction<Integer> function;
    private CollectSinkOperatorCoordinator coordinator;
    private MockFunctionInitializationContext functionInitializationContext;
    private boolean jobFinished;
    private IOManager ioManager;
    private StreamingRuntimeContext runtimeContext;
    private MockOperatorEventGateway gateway;

    @Before
    public void before() throws Exception {
        this.ioManager = new IOManagerAsync();
        this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, this.ioManager);
        this.gateway = new MockOperatorEventGateway();
        this.coordinator = new CollectSinkOperatorCoordinator(1000);
        this.coordinator.start();
        this.functionInitializationContext = new MockFunctionInitializationContext();
        this.jobFinished = false;
    }

    @After
    public void after() throws Exception {
        this.coordinator.close();
        this.ioManager.close();
    }

    @Test
    public void testUncheckpointedProtocol() throws Exception {
        int i;
        this.openFunction();
        for (int i2 = 0; i2 < 6; ++i2) {
            this.function.invoke((Object)i2, null);
        }
        CollectCoordinationResponse<Integer> response = this.sendRequestAndGetValidResponse("", 0L);
        Assert.assertEquals((long)0L, (long)response.getLastCheckpointedOffset());
        String version = response.getVersion();
        response = this.sendRequestAndGetValidResponse(version, 0L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(0, 1, 2));
        response = this.sendRequestAndGetValidResponse(version, 4L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(4, 5));
        response = this.sendRequestAndGetValidResponse(version, 6L);
        this.assertResponseEquals(response, version, 0L, Collections.emptyList());
        for (i = 6; i < 10; ++i) {
            this.function.invoke((Object)i, null);
        }
        response = this.sendRequestAndGetValidResponse(version, 5L);
        this.assertResponseEquals(response, version, 0L, Collections.emptyList());
        response = this.sendRequestAndGetValidResponse(version, 6L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(6, 7, 8));
        response = this.sendRequestAndGetValidResponse(version, 6L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(6, 7, 8));
        response = this.sendRequestAndGetValidResponse(version, 12L);
        this.assertResponseEquals(response, version, 0L, Collections.emptyList());
        for (i = 10; i < 16; ++i) {
            this.function.invoke((Object)i, null);
        }
        response = this.sendRequestAndGetValidResponse(version, 12L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(12, 13, 14));
        this.finishJob();
        this.assertAccumulatorResult(12L, version, 0L, Arrays.asList(12, 13, 14, 15));
    }

    @Test
    public void testCheckpointProtocol() throws Exception {
        int i;
        this.openFunctionWithState();
        for (int i2 = 0; i2 < 2; ++i2) {
            this.function.invoke((Object)i2, null);
        }
        CollectCoordinationResponse<Integer> response = this.sendRequestAndGetValidResponse("", 0L);
        Assert.assertEquals((long)0L, (long)response.getLastCheckpointedOffset());
        String version = response.getVersion();
        response = this.sendRequestAndGetValidResponse(version, 0L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(0, 1));
        for (i = 2; i < 6; ++i) {
            this.function.invoke((Object)i, null);
        }
        response = this.sendRequestAndGetValidResponse(version, 3L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(3, 4, 5));
        this.checkpointFunction(1L);
        response = this.sendRequestAndGetValidResponse(version, 4L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(4, 5));
        this.checkpointComplete(1L);
        response = this.sendRequestAndGetValidResponse(version, 4L);
        this.assertResponseEquals(response, version, 3L, Arrays.asList(4, 5));
        for (i = 6; i < 9; ++i) {
            this.function.invoke((Object)i, null);
        }
        response = this.sendRequestAndGetValidResponse(version, 6L);
        this.assertResponseEquals(response, version, 3L, Arrays.asList(6, 7, 8));
        this.closeFuntionAbnormally();
        this.openFunctionWithState();
        for (i = 9; i < 12; ++i) {
            this.function.invoke((Object)i, null);
        }
        response = this.sendRequestAndGetValidResponse(version, 4L);
        Assert.assertEquals((long)3L, (long)response.getLastCheckpointedOffset());
        version = response.getVersion();
        response = this.sendRequestAndGetValidResponse(version, 4L);
        this.assertResponseEquals(response, version, 3L, Arrays.asList(4, 5, 9));
        response = this.sendRequestAndGetValidResponse(version, 6L);
        this.assertResponseEquals(response, version, 3L, Arrays.asList(9, 10, 11));
        this.checkpointFunction(2L);
        this.checkpointComplete(2L);
        this.function.invoke((Object)12, null);
        response = this.sendRequestAndGetValidResponse(version, 7L);
        this.assertResponseEquals(response, version, 6L, Arrays.asList(10, 11, 12));
        this.closeFuntionAbnormally();
        this.openFunctionWithState();
        response = this.sendRequestAndGetValidResponse(version, 7L);
        Assert.assertEquals((long)6L, (long)response.getLastCheckpointedOffset());
        version = response.getVersion();
        response = this.sendRequestAndGetValidResponse(version, 7L);
        this.assertResponseEquals(response, version, 6L, Arrays.asList(10, 11));
        response = this.sendRequest(version, 9L);
        this.assertResponseEquals(response, version, 6L, Collections.emptyList());
        for (i = 13; i < 17; ++i) {
            this.function.invoke((Object)i, null);
        }
        response = this.sendRequestAndGetValidResponse(version, 9L);
        this.assertResponseEquals(response, version, 6L, Arrays.asList(13, 14, 15));
        this.checkpointFunction(3L);
        this.checkpointComplete(3L);
        this.closeFuntionAbnormally();
        this.openFunctionWithState();
        response = this.sendRequestAndGetValidResponse(version, 12L);
        Assert.assertEquals((long)9L, (long)response.getLastCheckpointedOffset());
        version = response.getVersion();
        response = this.sendRequestAndGetValidResponse(version, 12L);
        this.assertResponseEquals(response, version, 9L, Collections.singletonList(16));
        for (i = 17; i < 20; ++i) {
            this.function.invoke((Object)i, null);
        }
        response = this.sendRequestAndGetValidResponse(version, 12L);
        this.assertResponseEquals(response, version, 9L, Arrays.asList(16, 17, 18));
        this.checkpointFunction(4L);
        this.closeFuntionAbnormally();
        this.openFunctionWithState();
        response = this.sendRequestAndGetValidResponse(version, 12L);
        Assert.assertEquals((long)9L, (long)response.getLastCheckpointedOffset());
        version = response.getVersion();
        response = this.sendRequestAndGetValidResponse(version, 12L);
        this.assertResponseEquals(response, version, 9L, Collections.singletonList(16));
        for (i = 20; i < 23; ++i) {
            this.function.invoke((Object)i, null);
        }
        response = this.sendRequestAndGetValidResponse(version, 12L);
        this.assertResponseEquals(response, version, 9L, Arrays.asList(16, 20, 21));
        this.finishJob();
        this.assertAccumulatorResult(12L, version, 9L, Arrays.asList(16, 20, 21, 22));
    }

    @Test
    public void testUncheckpointedFunction() throws Exception {
        for (int testCount = 30; testCount > 0; --testCount) {
            ArrayList<Integer> expected = new ArrayList<Integer>();
            for (int i = 0; i < 50; ++i) {
                expected.add(i);
            }
            UncheckpointedDataFeeder feeder = new UncheckpointedDataFeeder(expected);
            List<Integer> actual = this.runFunctionRandomTest(feeder);
            this.assertResultsEqualAfterSort(expected, actual);
            this.after();
            this.before();
        }
    }

    @Test
    public void testCheckpointedFunction() throws Exception {
        for (int testCount = 30; testCount > 0; --testCount) {
            ArrayList<Integer> expected = new ArrayList<Integer>();
            for (int i = 0; i < 50; ++i) {
                expected.add(i);
            }
            CheckpointedDataFeeder feeder = new CheckpointedDataFeeder(expected);
            List<Integer> actual = this.runFunctionRandomTest(feeder);
            this.assertResultsEqualAfterSort(expected, actual);
            this.after();
            this.before();
        }
    }

    private List<Integer> runFunctionRandomTest(Thread feeder) throws Exception {
        CollectClient client = new CollectClient();
        Thread.UncaughtExceptionHandler exceptionHandler = (t, e) -> {
            feeder.interrupt();
            client.interrupt();
            e.printStackTrace();
        };
        feeder.setUncaughtExceptionHandler(exceptionHandler);
        client.setUncaughtExceptionHandler(exceptionHandler);
        feeder.start();
        client.start();
        feeder.join();
        client.join();
        return client.results;
    }

    private void openFunction() throws Exception {
        this.function = new CollectSinkFunction(serializer, 3, ACCUMULATOR_NAME);
        this.function.setRuntimeContext((RuntimeContext)this.runtimeContext);
        this.function.setOperatorEventGateway((OperatorEventGateway)this.gateway);
        this.function.open(new Configuration());
        this.coordinator.handleEventFromOperator(0, this.gateway.getNextEvent());
    }

    private void openFunctionWithState() throws Exception {
        this.functionInitializationContext.getOperatorStateStore().revertToLastSuccessCheckpoint();
        this.function = new CollectSinkFunction(serializer, 3, ACCUMULATOR_NAME);
        this.function.setRuntimeContext((RuntimeContext)this.runtimeContext);
        this.function.setOperatorEventGateway((OperatorEventGateway)this.gateway);
        this.function.initializeState((FunctionInitializationContext)this.functionInitializationContext);
        this.function.open(new Configuration());
        this.coordinator.handleEventFromOperator(0, this.gateway.getNextEvent());
    }

    private void checkpointFunction(long checkpointId) throws Exception {
        this.function.snapshotState((FunctionSnapshotContext)new MockFunctionSnapshotContext(checkpointId));
        this.functionInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
    }

    private void checkpointComplete(long checkpointId) throws Exception {
        this.function.notifyCheckpointComplete(checkpointId);
        this.functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
    }

    private void closeFuntionAbnormally() throws Exception {
        this.function.close();
        this.coordinator.subtaskFailed(0, null);
    }

    private void finishJob() throws Exception {
        this.function.accumulateFinalResults();
        this.function.close();
        this.jobFinished = true;
    }

    private CollectCoordinationResponse<Integer> sendRequest(String version, long offset) throws Exception {
        CollectCoordinationRequest request = new CollectCoordinationRequest(version, offset);
        return (CollectCoordinationResponse)this.coordinator.handleCoordinationRequest((CoordinationRequest)request).get(10000L, TimeUnit.MILLISECONDS);
    }

    private CollectCoordinationResponse<Integer> sendRequestAndGetValidResponse(String version, long offset) throws Exception {
        for (int i = 0; i < 100; ++i) {
            CollectCoordinationResponse<Integer> response = this.sendRequest(version, offset);
            if (response.getLastCheckpointedOffset() < 0L) continue;
            return response;
        }
        throw new RuntimeException("Too many retries in sendRequestAndGetValidResponse");
    }

    private Tuple2<Long, CollectCoordinationResponse<Integer>> getAccumualtorResults() throws Exception {
        Accumulator accumulator = this.runtimeContext.getAccumulator(ACCUMULATOR_NAME);
        ArrayList accLocalValue = ((SerializedListAccumulator)accumulator).getLocalValue();
        List serializedResults = SerializedListAccumulator.deserializeList((ArrayList)accLocalValue, (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        Assert.assertEquals((long)1L, (long)serializedResults.size());
        byte[] serializedResult = (byte[])serializedResults.get(0);
        return CollectSinkFunction.deserializeAccumulatorResult((byte[])serializedResult);
    }

    private void assertResponseEquals(CollectCoordinationResponse<Integer> response, String version, long lastCheckpointedOffset, List<Integer> expected) throws IOException {
        Assert.assertEquals((Object)version, (Object)response.getVersion());
        Assert.assertEquals((long)lastCheckpointedOffset, (long)response.getLastCheckpointedOffset());
        List results = response.getResults(serializer);
        this.assertResultsEqual(expected, results);
    }

    private void assertResultsEqual(List<Integer> expected, List<Integer> actual) {
        Assert.assertArrayEquals((Object[])expected.toArray(new Integer[0]), (Object[])actual.toArray(new Integer[0]));
    }

    private void assertResultsEqualAfterSort(List<Integer> expected, List<Integer> actual) {
        Collections.sort(expected);
        Collections.sort(actual);
        this.assertResultsEqual(expected, actual);
    }

    private void assertAccumulatorResult(long expectedOffset, String expectedVersion, long expectedLastCheckpointedOffset, List<Integer> expectedResults) throws Exception {
        Tuple2<Long, CollectCoordinationResponse<Integer>> accResults = this.getAccumualtorResults();
        long offset = (Long)accResults.f0;
        CollectCoordinationResponse response = (CollectCoordinationResponse)accResults.f1;
        List actualResults = response.getResults(serializer);
        Assert.assertEquals((long)expectedOffset, (long)offset);
        Assert.assertEquals((Object)expectedVersion, (Object)response.getVersion());
        Assert.assertEquals((long)expectedLastCheckpointedOffset, (long)response.getLastCheckpointedOffset());
        this.assertResultsEqual(expectedResults, actualResults);
    }

    static /* synthetic */ TypeSerializer access$1600() {
        return serializer;
    }

    private class CollectClient
    extends Thread {
        private List<Integer> results = new ArrayList<Integer>();
        private CollectResultIterator<Integer> iterator = new CollectResultIterator(CompletableFuture.completedFuture(CollectSinkFunctionTest.access$1500()), CollectSinkFunctionTest.access$1600(), "tableCollectAccumulator", 0);

        private CollectClient() {
            TestJobClient.JobInfoProvider infoProvider = new TestJobClient.JobInfoProvider(){

                @Override
                public boolean isJobFinished() {
                    return CollectSinkFunctionTest.this.jobFinished;
                }

                @Override
                public Map<String, OptionalFailure<Object>> getAccumulatorResults() {
                    HashMap<String, OptionalFailure<Object>> accumulatorResults = new HashMap<String, OptionalFailure<Object>>();
                    accumulatorResults.put(CollectSinkFunctionTest.ACCUMULATOR_NAME, OptionalFailure.of((Object)CollectSinkFunctionTest.this.runtimeContext.getAccumulator(CollectSinkFunctionTest.ACCUMULATOR_NAME).getLocalValue()));
                    return accumulatorResults;
                }
            };
            TestJobClient jobClient = new TestJobClient(TEST_JOB_ID, TEST_OPERATOR_ID, (CoordinationRequestHandler)CollectSinkFunctionTest.this.coordinator, infoProvider);
            this.iterator.setJobClient((JobClient)jobClient);
        }

        @Override
        public void run() {
            Random random = new Random();
            while (this.iterator.hasNext()) {
                this.results.add((Integer)this.iterator.next());
                if (!random.nextBoolean()) continue;
                try {
                    Thread.sleep(5L);
                }
                catch (InterruptedException interruptedException) {}
            }
            try {
                this.iterator.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static class CheckpointCountdown {
        private long id;
        private List<Integer> data;
        private int countdown;

        private CheckpointCountdown(long id, List<Integer> data, int countdown) {
            this.id = id;
            this.data = new ArrayList<Integer>(data);
            this.countdown = countdown;
        }

        private boolean tick() {
            if (this.countdown > 0) {
                --this.countdown;
                return this.countdown == 0;
            }
            return false;
        }
    }

    private class CheckpointedDataFeeder
    extends Thread {
        private LinkedList<Integer> data;
        private List<Integer> checkpointedData;
        private long checkpointId;
        private long lastSuccessCheckpointId;
        private List<CheckpointCountdown> checkpointCountdowns;

        private CheckpointedDataFeeder(List<Integer> data) {
            this.data = new LinkedList<Integer>(data);
            this.checkpointedData = new ArrayList<Integer>(data);
            this.checkpointId = 0L;
            this.lastSuccessCheckpointId = 0L;
            this.checkpointCountdowns = new ArrayList<CheckpointCountdown>();
        }

        @Override
        public void run() {
            Random random = new Random();
            try {
                CollectSinkFunctionTest.this.openFunctionWithState();
                while (this.data.size() > 0) {
                    ListIterator<CheckpointCountdown> iterator = this.checkpointCountdowns.listIterator();
                    while (iterator.hasNext()) {
                        CheckpointCountdown countdown = iterator.next();
                        if (countdown.id < this.lastSuccessCheckpointId) {
                            iterator.remove();
                            continue;
                        }
                        if (!countdown.tick()) continue;
                        this.checkpointedData = countdown.data;
                        CollectSinkFunctionTest.this.checkpointComplete(countdown.id);
                        this.lastSuccessCheckpointId = countdown.id;
                        iterator.remove();
                    }
                    int r = random.nextInt(10);
                    if (r < 6) {
                        int size = Math.min(this.data.size(), random.nextInt(9) + 1);
                        for (int i = 0; i < size; ++i) {
                            CollectSinkFunctionTest.this.function.invoke((Object)this.data.removeFirst(), null);
                        }
                    } else if (r < 9) {
                        ++this.checkpointId;
                        if (random.nextBoolean()) {
                            this.checkpointCountdowns.add(new CheckpointCountdown(this.checkpointId, this.data, random.nextInt(3) + 1));
                        }
                        CollectSinkFunctionTest.this.checkpointFunction(this.checkpointId);
                    } else {
                        this.checkpointCountdowns.clear();
                        Collections.shuffle(this.checkpointedData);
                        this.data = new LinkedList<Integer>(this.checkpointedData);
                        CollectSinkFunctionTest.this.closeFuntionAbnormally();
                        CollectSinkFunctionTest.this.openFunctionWithState();
                    }
                    if (!random.nextBoolean()) continue;
                    Thread.sleep(random.nextInt(10));
                }
                CollectSinkFunctionTest.this.finishJob();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private class UncheckpointedDataFeeder
    extends Thread {
        private LinkedList<Integer> data;
        private List<Integer> checkpointedData;
        private boolean failedBefore;

        private UncheckpointedDataFeeder(List<Integer> data) {
            this.data = new LinkedList<Integer>(data);
            this.checkpointedData = new ArrayList<Integer>(data);
            this.failedBefore = false;
        }

        @Override
        public void run() {
            Random random = new Random();
            try {
                CollectSinkFunctionTest.this.openFunction();
                while (this.data.size() > 0) {
                    int size = Math.min(this.data.size(), random.nextInt(9) + 1);
                    for (int i = 0; i < size; ++i) {
                        CollectSinkFunctionTest.this.function.invoke((Object)this.data.removeFirst(), null);
                    }
                    if (!this.failedBefore && this.data.size() < this.checkpointedData.size() / 2) {
                        if (random.nextBoolean()) {
                            Collections.shuffle(this.checkpointedData);
                            this.data = new LinkedList<Integer>(this.checkpointedData);
                            CollectSinkFunctionTest.this.closeFuntionAbnormally();
                            CollectSinkFunctionTest.this.openFunction();
                        }
                        this.failedBefore = true;
                    }
                    if (!random.nextBoolean()) continue;
                    Thread.sleep(random.nextInt(10));
                }
                CollectSinkFunctionTest.this.finishJob();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}

