package org.apache.beam.fn.harness;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.beam.fn.harness.CombineRunners;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsEqual;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/CombineRunnersTest.class */
public class CombineRunnersTest {
    private static final String TEST_COMBINE_ID = "combineId";
    private RunnerApi.PTransform pTransform;
    private String inputPCollectionId;
    private String outputPCollectionId;
    private RunnerApi.Pipeline pProto;

    /* loaded from: input_file:org/apache/beam/fn/harness/CombineRunnersTest$TestCombineFn.class */
    private static class TestCombineFn extends Combine.CombineFn<String, Integer, Integer> {
        private TestCombineFn() {
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Integer m539createAccumulator() {
            return 0;
        }

        public Integer addInput(Integer num, String str) {
            return Integer.valueOf(num.intValue() + Integer.parseInt(str));
        }

        public Integer mergeAccumulators(Iterable<Integer> iterable) {
            Integer num = 0;
            Iterator<Integer> it = iterable.iterator();
            while (it.hasNext()) {
                num = Integer.valueOf(num.intValue() + it.next().intValue());
            }
            return num;
        }

        public Integer extractOutput(Integer num) {
            return Integer.valueOf(-num.intValue());
        }

        /* renamed from: mergeAccumulators, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m538mergeAccumulators(Iterable iterable) {
            return mergeAccumulators((Iterable<Integer>) iterable);
        }
    }

    @Before
    public void createPipeline() throws Exception {
        Combine.PerKey perKey = Combine.perKey(new TestCombineFn());
        Pipeline create = Pipeline.create();
        PCollection<?> apply = create.apply(Create.of(KV.of("unused", "0"), new KV[0]));
        apply.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
        PCollection<?> apply2 = apply.apply(TEST_COMBINE_ID, perKey);
        apply2.setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
        SdkComponents create2 = SdkComponents.create(create.getOptions());
        this.pProto = PipelineTranslation.toProto(create, create2);
        this.inputPCollectionId = create2.registerPCollection(apply);
        this.outputPCollectionId = create2.registerPCollection(apply2);
        this.pTransform = this.pProto.getComponents().getTransformsOrThrow(TEST_COMBINE_ID);
    }

    @Test
    public void testPrecombine() throws Exception {
        PTransformRunnerFactoryTestContext build = PTransformRunnerFactoryTestContext.builder(TEST_COMBINE_ID, this.pTransform).pCollections(this.pProto.getComponents().getPcollectionsMap()).coders(this.pProto.getComponents().getCodersMap()).windowingStrategies(this.pProto.getComponents().getWindowingStrategiesMap()).build();
        ArrayDeque<WindowedValue> arrayDeque = new ArrayDeque();
        String str = (String) Iterables.getOnlyElement(this.pTransform.getOutputsMap().values());
        Objects.requireNonNull(arrayDeque);
        build.addPCollectionConsumer(str, (v1) -> {
            r2.add(v1);
        });
        new CombineRunners.PrecombineFactory().createRunnerForPTransform(build);
        ((ThrowingRunnable) Iterables.getOnlyElement(build.getStartBundleFunctions())).run();
        arrayDeque.clear();
        MatcherAssert.assertThat(build.getPCollectionConsumers().keySet(), (Matcher<? super Set<String>>) Matchers.containsInAnyOrder(this.inputPCollectionId, this.outputPCollectionId));
        FnDataReceiver pCollectionConsumer = build.getPCollectionConsumer(this.inputPCollectionId);
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("A", "1")));
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("A", "2")));
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("A", "6")));
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("B", "2")));
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("C", "3")));
        ((ThrowingRunnable) Iterables.getOnlyElement(build.getFinishBundleFunctions())).run();
        Integer num = 0;
        for (WindowedValue windowedValue : arrayDeque) {
            if ("A".equals(((KV) windowedValue.getValue()).getKey())) {
                num = Integer.valueOf(num.intValue() + ((Integer) ((KV) windowedValue.getValue()).getValue()).intValue());
            }
        }
        MatcherAssert.assertThat(num, (Matcher<? super Integer>) IsEqual.equalTo(9));
        arrayDeque.removeIf(windowedValue2 -> {
            return "A".equals(((KV) windowedValue2.getValue()).getKey());
        });
        MatcherAssert.assertThat(arrayDeque, (Matcher<? super ArrayDeque>) Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow(KV.of("B", 2)), WindowedValue.valueInGlobalWindow(KV.of("C", 3))));
    }

    @Test
    public void testMergeAccumulators() throws Exception {
        RunnerApi.PCollection build = RunnerApi.PCollection.newBuilder().setUniqueName(this.inputPCollectionId).setCoderId("coder-id").build();
        HashMap hashMap = new HashMap(this.pProto.getComponents().getPcollectionsMap());
        hashMap.put(this.inputPCollectionId, build);
        HashMap hashMap2 = new HashMap(this.pProto.getComponents().getCodersMap());
        hashMap2.put("coder-id", RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build()).addComponentCoderIds("StringUtf8Coder").addComponentCoderIds("coder-id-iterable").build());
        hashMap2.put("coder-id-iterable", RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.ITERABLE_CODER_URN).build()).addComponentCoderIds("BigEndianIntegerCoder").build());
        PTransformRunnerFactoryTestContext build2 = PTransformRunnerFactoryTestContext.builder(TEST_COMBINE_ID, this.pTransform).pCollections(hashMap).coders(hashMap2).build();
        ArrayDeque arrayDeque = new ArrayDeque();
        String str = (String) Iterables.getOnlyElement(this.pTransform.getOutputsMap().values());
        Objects.requireNonNull(arrayDeque);
        build2.addPCollectionConsumer(str, (v1) -> {
            r2.add(v1);
        });
        MapFnRunners.forValueMapFnFactory(CombineRunners::createMergeAccumulatorsMapFunction).createRunnerForPTransform(build2);
        MatcherAssert.assertThat(build2.getStartBundleFunctions(), (Matcher<? super List<ThrowingRunnable>>) Matchers.empty());
        MatcherAssert.assertThat(build2.getFinishBundleFunctions(), (Matcher<? super List<ThrowingRunnable>>) Matchers.empty());
        arrayDeque.clear();
        MatcherAssert.assertThat(build2.getPCollectionConsumers().keySet(), (Matcher<? super Set<String>>) Matchers.containsInAnyOrder(this.inputPCollectionId, this.outputPCollectionId));
        FnDataReceiver pCollectionConsumer = build2.getPCollectionConsumer(this.inputPCollectionId);
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("A", Arrays.asList(1, 2, 6))));
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("B", Arrays.asList(2, 3))));
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("C", Arrays.asList(5, 2))));
        MatcherAssert.assertThat(arrayDeque, (Matcher<? super ArrayDeque>) Matchers.contains(WindowedValue.valueInGlobalWindow(KV.of("A", 9)), WindowedValue.valueInGlobalWindow(KV.of("B", 5)), WindowedValue.valueInGlobalWindow(KV.of("C", 7))));
    }

    @Test
    public void testExtractOutputs() throws Exception {
        RunnerApi.PCollection build = RunnerApi.PCollection.newBuilder().setUniqueName(this.inputPCollectionId).setCoderId("coder-id").build();
        HashMap hashMap = new HashMap(this.pProto.getComponents().getPcollectionsMap());
        hashMap.put(this.inputPCollectionId, build);
        HashMap hashMap2 = new HashMap(this.pProto.getComponents().getCodersMap());
        hashMap2.put("coder-id", RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build()).addComponentCoderIds("StringUtf8Coder").addComponentCoderIds("BigEndianIntegerCoder").build());
        PTransformRunnerFactoryTestContext build2 = PTransformRunnerFactoryTestContext.builder(TEST_COMBINE_ID, this.pTransform).pCollections(hashMap).coders(hashMap2).build();
        ArrayDeque arrayDeque = new ArrayDeque();
        String str = (String) Iterables.getOnlyElement(this.pTransform.getOutputsMap().values());
        Objects.requireNonNull(arrayDeque);
        build2.addPCollectionConsumer(str, (v1) -> {
            r2.add(v1);
        });
        MapFnRunners.forValueMapFnFactory(CombineRunners::createExtractOutputsMapFunction).createRunnerForPTransform(build2);
        MatcherAssert.assertThat(build2.getStartBundleFunctions(), (Matcher<? super List<ThrowingRunnable>>) Matchers.empty());
        MatcherAssert.assertThat(build2.getFinishBundleFunctions(), (Matcher<? super List<ThrowingRunnable>>) Matchers.empty());
        arrayDeque.clear();
        MatcherAssert.assertThat(build2.getPCollectionConsumers().keySet(), (Matcher<? super Set<String>>) Matchers.containsInAnyOrder(this.inputPCollectionId, this.outputPCollectionId));
        FnDataReceiver pCollectionConsumer = build2.getPCollectionConsumer(this.inputPCollectionId);
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("A", 9)));
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("B", 5)));
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("C", 7)));
        MatcherAssert.assertThat(arrayDeque, (Matcher<? super ArrayDeque>) Matchers.contains(WindowedValue.valueInGlobalWindow(KV.of("A", -9)), WindowedValue.valueInGlobalWindow(KV.of("B", -5)), WindowedValue.valueInGlobalWindow(KV.of("C", -7))));
    }

    @Test
    public void testConvertToAccumulators() throws Exception {
        PTransformRunnerFactoryTestContext build = PTransformRunnerFactoryTestContext.builder(TEST_COMBINE_ID, this.pTransform).pCollections(this.pProto.getComponents().getPcollectionsMap()).coders(this.pProto.getComponents().getCodersMap()).build();
        ArrayDeque arrayDeque = new ArrayDeque();
        String str = (String) Iterables.getOnlyElement(this.pTransform.getOutputsMap().values());
        Objects.requireNonNull(arrayDeque);
        build.addPCollectionConsumer(str, (v1) -> {
            r2.add(v1);
        });
        MapFnRunners.forValueMapFnFactory(CombineRunners::createConvertToAccumulatorsMapFunction).createRunnerForPTransform(build);
        MatcherAssert.assertThat(build.getStartBundleFunctions(), (Matcher<? super List<ThrowingRunnable>>) Matchers.empty());
        MatcherAssert.assertThat(build.getFinishBundleFunctions(), (Matcher<? super List<ThrowingRunnable>>) Matchers.empty());
        arrayDeque.clear();
        MatcherAssert.assertThat(build.getPCollectionConsumers().keySet(), (Matcher<? super Set<String>>) Matchers.containsInAnyOrder(this.inputPCollectionId, this.outputPCollectionId));
        FnDataReceiver pCollectionConsumer = build.getPCollectionConsumer(this.inputPCollectionId);
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("A", "9")));
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("B", "5")));
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("C", "7")));
        MatcherAssert.assertThat(arrayDeque, (Matcher<? super ArrayDeque>) Matchers.contains(WindowedValue.valueInGlobalWindow(KV.of("A", 9)), WindowedValue.valueInGlobalWindow(KV.of("B", 5)), WindowedValue.valueInGlobalWindow(KV.of("C", 7))));
    }

    @Test
    public void testCombineGroupedValues() throws Exception {
        RunnerApi.PCollection build = RunnerApi.PCollection.newBuilder().setUniqueName(this.inputPCollectionId).setCoderId("coder-id").build();
        HashMap hashMap = new HashMap(this.pProto.getComponents().getPcollectionsMap());
        hashMap.put(this.inputPCollectionId, build);
        HashMap hashMap2 = new HashMap(this.pProto.getComponents().getCodersMap());
        hashMap2.put("coder-id", RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build()).addComponentCoderIds("StringUtf8Coder").addComponentCoderIds("coder-id-iterable").build());
        hashMap2.put("coder-id-iterable", RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.ITERABLE_CODER_URN).build()).addComponentCoderIds("StringUtf8Coder").build());
        PTransformRunnerFactoryTestContext build2 = PTransformRunnerFactoryTestContext.builder(TEST_COMBINE_ID, this.pTransform).pCollections(hashMap).coders(hashMap2).build();
        ArrayDeque arrayDeque = new ArrayDeque();
        String str = (String) Iterables.getOnlyElement(this.pTransform.getOutputsMap().values());
        Objects.requireNonNull(arrayDeque);
        build2.addPCollectionConsumer(str, (v1) -> {
            r2.add(v1);
        });
        MapFnRunners.forValueMapFnFactory(CombineRunners::createCombineGroupedValuesMapFunction).createRunnerForPTransform(build2);
        MatcherAssert.assertThat(build2.getStartBundleFunctions(), (Matcher<? super List<ThrowingRunnable>>) Matchers.empty());
        MatcherAssert.assertThat(build2.getFinishBundleFunctions(), (Matcher<? super List<ThrowingRunnable>>) Matchers.empty());
        arrayDeque.clear();
        MatcherAssert.assertThat(build2.getPCollectionConsumers().keySet(), (Matcher<? super Set<String>>) Matchers.containsInAnyOrder(this.inputPCollectionId, this.outputPCollectionId));
        FnDataReceiver pCollectionConsumer = build2.getPCollectionConsumer(this.inputPCollectionId);
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("A", Arrays.asList("1", "2", "6"))));
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("B", Arrays.asList("2", "3"))));
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("C", Arrays.asList("5", "2"))));
        MatcherAssert.assertThat(arrayDeque, (Matcher<? super ArrayDeque>) Matchers.contains(WindowedValue.valueInGlobalWindow(KV.of("A", -9)), WindowedValue.valueInGlobalWindow(KV.of("B", -5)), WindowedValue.valueInGlobalWindow(KV.of("C", -7))));
    }
}
