package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskTest;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.class */
public class MultipleInputStreamTaskChainedSourcesCheckpointingTest {
    private static final int MAX_STEPS = 100;
    private final CheckpointMetaData metaData = new CheckpointMetaData(1, System.currentTimeMillis());

    @Test
    public void testSourceCheckpointFirst() throws Exception {
        StreamTaskMailboxTestHarness<String> buildTestHarness = MultipleInputStreamTaskTest.buildTestHarness();
        Throwable th = null;
        try {
            buildTestHarness.setAutoProcess(false);
            ArrayDeque arrayDeque = new ArrayDeque();
            CheckpointBarrier createBarrier = createBarrier(buildTestHarness);
            addRecordsAndBarriers(buildTestHarness, createBarrier);
            Future triggerCheckpointAsync = buildTestHarness.getStreamTask().triggerCheckpointAsync(this.metaData, createBarrier.getCheckpointOptions());
            triggerCheckpointAsync.getClass();
            processSingleStepUntil(buildTestHarness, triggerCheckpointAsync::isDone);
            arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
            ArrayList arrayList = new ArrayList(buildTestHarness.getOutput());
            MatcherAssert.assertThat(arrayList.subList(0, arrayDeque.size()), Matchers.containsInAnyOrder(arrayDeque.toArray()));
            MatcherAssert.assertThat(arrayList.get(arrayDeque.size()), Matchers.equalTo(createBarrier));
            if (buildTestHarness != null) {
                if (0 == 0) {
                    buildTestHarness.close();
                    return;
                }
                try {
                    buildTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (buildTestHarness != null) {
                if (0 != 0) {
                    try {
                        buildTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    buildTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testSourceCheckpointFirstUnaligned() throws Exception {
        StreamTaskMailboxTestHarness<String> buildTestHarness = MultipleInputStreamTaskTest.buildTestHarness(true);
        Throwable th = null;
        try {
            buildTestHarness.setAutoProcess(false);
            ArrayDeque arrayDeque = new ArrayDeque();
            addRecords(buildTestHarness);
            CheckpointBarrier createBarrier = createBarrier(buildTestHarness);
            Future triggerCheckpointAsync = buildTestHarness.getStreamTask().triggerCheckpointAsync(this.metaData, createBarrier.getCheckpointOptions());
            triggerCheckpointAsync.getClass();
            processSingleStepUntil(buildTestHarness, triggerCheckpointAsync::isDone);
            MatcherAssert.assertThat(buildTestHarness.getOutput(), Matchers.contains(new Object[]{createBarrier}));
            buildTestHarness.processAll();
            arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
            MatcherAssert.assertThat(new ArrayList(buildTestHarness.getOutput()).subList(1, arrayDeque.size() + 1), Matchers.containsInAnyOrder(arrayDeque.toArray()));
            if (buildTestHarness != null) {
                if (0 == 0) {
                    buildTestHarness.close();
                    return;
                }
                try {
                    buildTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (buildTestHarness != null) {
                if (0 != 0) {
                    try {
                        buildTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    buildTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testSourceCheckpointLast() throws Exception {
        StreamTaskMailboxTestHarness<String> buildTestHarness = MultipleInputStreamTaskTest.buildTestHarness();
        Throwable th = null;
        try {
            buildTestHarness.setAutoProcess(false);
            ArrayDeque arrayDeque = new ArrayDeque();
            CheckpointBarrier createBarrier = createBarrier(buildTestHarness);
            addRecordsAndBarriers(buildTestHarness, createBarrier);
            buildTestHarness.processAll();
            Future triggerCheckpointAsync = buildTestHarness.getStreamTask().triggerCheckpointAsync(this.metaData, createBarrier.getCheckpointOptions());
            triggerCheckpointAsync.getClass();
            processSingleStepUntil(buildTestHarness, triggerCheckpointAsync::isDone);
            arrayDeque.add(new StreamRecord("42", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("42", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("42", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
            ArrayList arrayList = new ArrayList(buildTestHarness.getOutput());
            MatcherAssert.assertThat(arrayList.subList(0, arrayDeque.size()), Matchers.containsInAnyOrder(arrayDeque.toArray()));
            MatcherAssert.assertThat(arrayList.get(arrayDeque.size()), Matchers.equalTo(createBarrier));
            if (buildTestHarness != null) {
                if (0 == 0) {
                    buildTestHarness.close();
                    return;
                }
                try {
                    buildTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (buildTestHarness != null) {
                if (0 != 0) {
                    try {
                        buildTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    buildTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testSourceCheckpointLastUnaligned() throws Exception {
        StreamTaskMailboxTestHarness<String> buildTestHarness = MultipleInputStreamTaskTest.buildTestHarness(true);
        Throwable th = null;
        try {
            try {
                buildTestHarness.setAutoProcess(false);
                ArrayDeque arrayDeque = new ArrayDeque();
                addNetworkRecords(buildTestHarness);
                CheckpointBarrier createBarrier = createBarrier(buildTestHarness);
                addBarriers(buildTestHarness, createBarrier);
                buildTestHarness.processAll();
                MultipleInputStreamTaskTest.addSourceRecords(buildTestHarness, 1, 1337, 1337, 1337);
                buildTestHarness.processAll();
                arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
                arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
                arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
                arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
                arrayDeque.add(createBarrier);
                MatcherAssert.assertThat(buildTestHarness.getOutput(), Matchers.containsInAnyOrder(arrayDeque.toArray()));
                if (buildTestHarness != null) {
                    if (0 == 0) {
                        buildTestHarness.close();
                        return;
                    }
                    try {
                        buildTestHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (buildTestHarness != null) {
                if (th != null) {
                    try {
                        buildTestHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    buildTestHarness.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testOnlyOneSource() throws Exception {
        StreamTaskMailboxTestHarness<String> build = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(executionConfig -> {
            executionConfig.enableObjectReuse();
        }).addSourceInput(new SourceOperatorFactory<>(new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks())).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MultipleInputStreamTaskTest.MapToStringMultipleInputOperatorFactory(1)).build();
        Throwable th = null;
        try {
            build.setAutoProcess(false);
            ArrayDeque arrayDeque = new ArrayDeque();
            MultipleInputStreamTaskTest.addSourceRecords(build, 0, 42, 43, 44);
            processSingleStepUntil(build, () -> {
                return Boolean.valueOf(!build.getOutput().isEmpty());
            });
            arrayDeque.add(new StreamRecord("42", Long.MIN_VALUE));
            CheckpointBarrier createBarrier = createBarrier(build);
            Future triggerCheckpointAsync = build.getStreamTask().triggerCheckpointAsync(this.metaData, createBarrier.getCheckpointOptions());
            triggerCheckpointAsync.getClass();
            processSingleStepUntil(build, triggerCheckpointAsync::isDone);
            ArrayList arrayList = new ArrayList(build.getOutput());
            MatcherAssert.assertThat(arrayList.subList(0, arrayDeque.size()), Matchers.containsInAnyOrder(arrayDeque.toArray()));
            MatcherAssert.assertThat(arrayList.get(arrayDeque.size()), Matchers.equalTo(createBarrier));
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    private void addRecordsAndBarriers(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, CheckpointBarrier checkpointBarrier) throws Exception {
        addRecords(streamTaskMailboxTestHarness);
        addBarriers(streamTaskMailboxTestHarness, checkpointBarrier);
    }

    private CheckpointBarrier createBarrier(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness) {
        StreamConfig configuration = streamTaskMailboxTestHarness.getStreamTask().getConfiguration();
        return new CheckpointBarrier(this.metaData.getCheckpointId(), this.metaData.getTimestamp(), CheckpointOptions.create(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault(), configuration.isExactlyOnceCheckpointMode(), configuration.isUnalignedCheckpointsEnabled(), configuration.getAlignmentTimeout()));
    }

    private void addBarriers(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, CheckpointBarrier checkpointBarrier) throws Exception {
        streamTaskMailboxTestHarness.processEvent(checkpointBarrier, 0);
        streamTaskMailboxTestHarness.processEvent(checkpointBarrier, 1);
    }

    private void addRecords(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness) throws Exception {
        MultipleInputStreamTaskTest.addSourceRecords(streamTaskMailboxTestHarness, 1, 42, 42, 42);
        addNetworkRecords(streamTaskMailboxTestHarness);
    }

    private void addNetworkRecords(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness) throws Exception {
        streamTaskMailboxTestHarness.processElement(new StreamRecord("44", Long.MIN_VALUE), 0);
        streamTaskMailboxTestHarness.processElement(new StreamRecord("44", Long.MIN_VALUE), 0);
        streamTaskMailboxTestHarness.processElement(new StreamRecord(Double.valueOf(47.0d), Long.MIN_VALUE), 1);
        streamTaskMailboxTestHarness.processElement(new StreamRecord(Double.valueOf(47.0d), Long.MIN_VALUE), 1);
    }

    private void processSingleStepUntil(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, Supplier<Boolean> supplier) throws Exception {
        Assert.assertFalse(supplier.get().booleanValue());
        for (int i = 0; i < MAX_STEPS && !supplier.get().booleanValue(); i++) {
            streamTaskMailboxTestHarness.processSingleStep();
        }
        Assert.assertTrue(supplier.get().booleanValue());
    }
}
