package org.apache.flink.streaming.api.operators.collect;

import java.lang.Thread;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper;
import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionRandomITCase.class */
public class CollectSinkFunctionRandomITCase extends TestLogger {
    private static final int MAX_RESULTS_PER_BATCH = 3;
    private static final JobID TEST_JOB_ID = new JobID();
    private static final OperatorID TEST_OPERATOR_ID = new OperatorID();
    private static final TypeSerializer<Integer> serializer = IntSerializer.INSTANCE;
    private CollectSinkFunctionTestWrapper<Integer> functionWrapper;
    private boolean jobFinished;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionRandomITCase$CheckpointCountdown.class */
    private static class CheckpointCountdown {
        private final long id;
        private final List<Integer> data;
        private int countdown;

        private CheckpointCountdown(long j, List<Integer> list, int i) {
            this.id = j;
            this.data = new ArrayList(list);
            this.countdown = i;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean tick() {
            if (this.countdown <= 0) {
                return false;
            }
            this.countdown--;
            return this.countdown == 0;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionRandomITCase$CheckpointedDataFeeder.class */
    private class CheckpointedDataFeeder implements RunnableWithException {
        private LinkedList<Integer> data;
        private List<Integer> checkpointedData;
        private long checkpointId;
        private long lastSuccessCheckpointId;
        private final List<CheckpointCountdown> checkpointCountdowns;

        private CheckpointedDataFeeder(List<Integer> list) {
            this.data = new LinkedList<>(list);
            this.checkpointedData = new ArrayList(list);
            this.checkpointId = 0L;
            this.lastSuccessCheckpointId = 0L;
            this.checkpointCountdowns = new ArrayList();
        }

        public void run() throws Exception {
            Random random = new Random();
            CollectSinkFunctionRandomITCase.this.functionWrapper.openFunctionWithState();
            while (this.data.size() > 0) {
                ListIterator<CheckpointCountdown> listIterator = this.checkpointCountdowns.listIterator();
                while (listIterator.hasNext()) {
                    CheckpointCountdown next = listIterator.next();
                    if (next.id < this.lastSuccessCheckpointId) {
                        listIterator.remove();
                    } else if (next.tick()) {
                        this.checkpointedData = next.data;
                        CollectSinkFunctionRandomITCase.this.functionWrapper.checkpointComplete(next.id);
                        this.lastSuccessCheckpointId = next.id;
                        listIterator.remove();
                    }
                }
                int nextInt = random.nextInt(10);
                if (nextInt < 6) {
                    int min = Math.min(this.data.size(), random.nextInt(9) + 1);
                    for (int i = 0; i < min; i++) {
                        CollectSinkFunctionRandomITCase.this.functionWrapper.invoke(this.data.removeFirst());
                    }
                } else if (nextInt < 9) {
                    this.checkpointId++;
                    if (random.nextBoolean()) {
                        this.checkpointCountdowns.add(new CheckpointCountdown(this.checkpointId, this.data, random.nextInt(3) + 1));
                    }
                    CollectSinkFunctionRandomITCase.this.functionWrapper.checkpointFunction(this.checkpointId);
                } else {
                    this.checkpointCountdowns.clear();
                    Collections.shuffle(this.checkpointedData);
                    this.data = new LinkedList<>(this.checkpointedData);
                    CollectSinkFunctionRandomITCase.this.functionWrapper.closeFunctionAbnormally();
                    CollectSinkFunctionRandomITCase.this.functionWrapper.openFunctionWithState();
                }
                if (random.nextBoolean()) {
                    Thread.sleep(random.nextInt(10));
                }
            }
            CollectSinkFunctionRandomITCase.this.functionWrapper.closeFunctionNormally();
            CollectSinkFunctionRandomITCase.this.jobFinished = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionRandomITCase$CollectClient.class */
    public class CollectClient implements RunnableWithException {
        private final List<Integer> results;
        private final CollectResultIterator<Integer> iterator;

        private CollectClient() {
            this.results = new ArrayList();
            this.iterator = new CollectResultIterator<>(new CheckpointedCollectResultBuffer(CollectSinkFunctionRandomITCase.serializer), CompletableFuture.completedFuture(CollectSinkFunctionRandomITCase.TEST_OPERATOR_ID), CollectSinkFunctionTestWrapper.ACCUMULATOR_NAME, 0);
            this.iterator.setJobClient(new TestJobClient(CollectSinkFunctionRandomITCase.TEST_JOB_ID, CollectSinkFunctionRandomITCase.TEST_OPERATOR_ID, CollectSinkFunctionRandomITCase.this.functionWrapper.getCoordinator(), new TestJobClient.JobInfoProvider() { // from class: org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionRandomITCase.CollectClient.1
                @Override // org.apache.flink.streaming.api.operators.collect.utils.TestJobClient.JobInfoProvider
                public boolean isJobFinished() {
                    return CollectSinkFunctionRandomITCase.this.jobFinished;
                }

                @Override // org.apache.flink.streaming.api.operators.collect.utils.TestJobClient.JobInfoProvider
                public Map<String, OptionalFailure<Object>> getAccumulatorResults() {
                    HashMap hashMap = new HashMap();
                    hashMap.put(CollectSinkFunctionTestWrapper.ACCUMULATOR_NAME, OptionalFailure.of(CollectSinkFunctionRandomITCase.this.functionWrapper.getAccumulatorLocalValue()));
                    return hashMap;
                }
            }));
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void run() throws Exception {
            Random random = new Random();
            while (this.iterator.hasNext()) {
                this.results.add(this.iterator.next());
                if (random.nextBoolean()) {
                    try {
                        Thread.sleep(5L);
                    } catch (InterruptedException e) {
                    }
                }
            }
            this.iterator.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionRandomITCase$ThreadWithException.class */
    public static class ThreadWithException extends Thread {
        private final RunnableWithException runnable;

        private ThreadWithException(RunnableWithException runnableWithException) {
            this.runnable = runnableWithException;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.runnable.run();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionRandomITCase$UncheckpointedDataFeeder.class */
    private class UncheckpointedDataFeeder implements RunnableWithException {
        private LinkedList<Integer> data;
        private final List<Integer> originalData;
        private boolean failedBefore;

        private UncheckpointedDataFeeder(List<Integer> list) {
            this.data = new LinkedList<>(list);
            this.originalData = new ArrayList(list);
            this.failedBefore = false;
        }

        public void run() throws Exception {
            Random random = new Random();
            CollectSinkFunctionRandomITCase.this.functionWrapper.openFunction();
            while (this.data.size() > 0) {
                int min = Math.min(this.data.size(), random.nextInt(9) + 1);
                for (int i = 0; i < min; i++) {
                    CollectSinkFunctionRandomITCase.this.functionWrapper.invoke(this.data.removeFirst());
                }
                if (!this.failedBefore && this.data.size() < this.originalData.size() / 2) {
                    if (random.nextBoolean()) {
                        this.data = new LinkedList<>(this.originalData);
                        Collections.shuffle(this.data);
                        CollectSinkFunctionRandomITCase.this.functionWrapper.closeFunctionAbnormally();
                        CollectSinkFunctionRandomITCase.this.functionWrapper.openFunction();
                    }
                    this.failedBefore = true;
                }
                if (random.nextBoolean()) {
                    Thread.sleep(random.nextInt(10));
                }
            }
            CollectSinkFunctionRandomITCase.this.functionWrapper.closeFunctionNormally();
            CollectSinkFunctionRandomITCase.this.jobFinished = true;
        }
    }

    @Test
    public void testUncheckpointedFunction() throws Exception {
        for (int i = 30; i > 0; i--) {
            this.functionWrapper = new CollectSinkFunctionTestWrapper<>(serializer, 12);
            this.jobFinished = false;
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 50; i2++) {
                arrayList.add(Integer.valueOf(i2));
            }
            assertResultsEqualAfterSort(arrayList, runFunctionRandomTest(new ThreadWithException(new UncheckpointedDataFeeder(arrayList))));
            this.functionWrapper.closeWrapper();
        }
    }

    @Test
    public void testCheckpointedFunction() throws Exception {
        for (int i = 30; i > 0; i--) {
            this.functionWrapper = new CollectSinkFunctionTestWrapper<>(serializer, 12);
            this.jobFinished = false;
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 50; i2++) {
                arrayList.add(Integer.valueOf(i2));
            }
            assertResultsEqualAfterSort(arrayList, runFunctionRandomTest(new ThreadWithException(new CheckpointedDataFeeder(arrayList))));
            this.functionWrapper.closeWrapper();
        }
    }

    private List<Integer> runFunctionRandomTest(Thread thread) throws Exception {
        CollectClient collectClient = new CollectClient();
        ThreadWithException threadWithException = new ThreadWithException(collectClient);
        Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (thread2, th) -> {
            thread.interrupt();
            threadWithException.interrupt();
            th.printStackTrace();
        };
        thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
        threadWithException.setUncaughtExceptionHandler(uncaughtExceptionHandler);
        thread.start();
        threadWithException.start();
        thread.join();
        threadWithException.join();
        return collectClient.results;
    }

    private void assertResultsEqualAfterSort(List<Integer> list, List<Integer> list2) {
        Collections.sort(list);
        Collections.sort(list2);
        Assert.assertThat(list2, CoreMatchers.is(list));
    }
}
