package org.apache.beam.runners.flink.batch;

import java.util.Arrays;
import java.util.Objects;
import org.apache.beam.runners.flink.FlinkCapabilities;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkTestPipeline;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/flink/batch/NonMergingGroupByKeyTest.class */
public class NonMergingGroupByKeyTest extends AbstractTestBase {

    /* loaded from: input_file:org/apache/beam/runners/flink/batch/NonMergingGroupByKeyTest$ReiterateDoFn.class */
    private static class ReiterateDoFn<K, V> extends DoFn<KV<K, Iterable<V>>, Void> {
        private ReiterateDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<K, Iterable<V>> kv) {
            ((Iterable) kv.getValue()).iterator();
            ((Iterable) kv.getValue()).iterator();
        }
    }

    @Test
    public void testDisabledReIterationThrowsAnException() {
        Assume.assumeTrue(FlinkCapabilities.supportsOutputDuringClosing());
        FlinkTestPipeline createForBatch = FlinkTestPipeline.createForBatch();
        createForBatch.apply(Create.of(Arrays.asList(KV.of("a", 1), KV.of("b", 2), KV.of("c", 3)))).apply(GroupByKey.create()).apply(ParDo.of(new ReiterateDoFn()));
        Pipeline.PipelineExecutionException pipelineExecutionException = null;
        try {
            createForBatch.run().waitUntilFinish();
        } catch (Pipeline.PipelineExecutionException e) {
            pipelineExecutionException = e;
        }
        Assert.assertEquals(IllegalStateException.class, ((Pipeline.PipelineExecutionException) Objects.requireNonNull(pipelineExecutionException)).getCause().getClass());
        Assert.assertTrue(pipelineExecutionException.getCause().getMessage().contains("GBK result is not re-iterable."));
    }

    @Test
    public void testEnabledReIterationDoesNotThrowAnException() {
        FlinkTestPipeline createForBatch = FlinkTestPipeline.createForBatch();
        createForBatch.getOptions().as(FlinkPipelineOptions.class).setReIterableGroupByKeyResult(true);
        createForBatch.apply(Create.of(Arrays.asList(KV.of("a", 1), KV.of("b", 2), KV.of("c", 3)))).apply(GroupByKey.create()).apply(ParDo.of(new ReiterateDoFn()));
        Assert.assertEquals(PipelineResult.State.DONE, createForBatch.run().waitUntilFinish());
    }
}
