package org.apache.beam.fn.harness;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.CombineRunners;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/CombineRunnersTest.class */
public class CombineRunnersTest {
    private static final String TEST_COMBINE_ID = "combineId";
    private RunnerApi.PTransform pTransform;
    private String inputPCollectionId;
    private String outputPCollectionId;
    private RunnerApi.Pipeline pProto;

    /* loaded from: input_file:org/apache/beam/fn/harness/CombineRunnersTest$TestCombineFn.class */
    private static class TestCombineFn extends Combine.CombineFn<String, Integer, Integer> {
        private TestCombineFn() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.beam.sdk.transforms.Combine.CombineFn
        public Integer createAccumulator() {
            return 0;
        }

        @Override // org.apache.beam.sdk.transforms.Combine.CombineFn
        public Integer addInput(Integer num, String str) {
            return Integer.valueOf(num.intValue() + Integer.parseInt(str));
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.beam.sdk.transforms.Combine.CombineFn
        public Integer mergeAccumulators(Iterable<Integer> iterable) {
            Integer num = 0;
            Iterator<Integer> it = iterable.iterator();
            while (it.hasNext()) {
                num = Integer.valueOf(num.intValue() + it.next().intValue());
            }
            return num;
        }

        @Override // org.apache.beam.sdk.transforms.Combine.CombineFn
        public Integer extractOutput(Integer num) {
            return Integer.valueOf(-num.intValue());
        }
    }

    @Before
    public void createPipeline() throws Exception {
        Combine.PerKey perKey = Combine.perKey(new TestCombineFn());
        Pipeline create = Pipeline.create();
        PCollection<?> pCollection = (PCollection) create.apply(Create.of(KV.of("unused", "0"), new KV[0]));
        pCollection.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
        PCollection<?> pCollection2 = (PCollection) pCollection.apply(TEST_COMBINE_ID, perKey);
        pCollection2.setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
        SdkComponents create2 = SdkComponents.create(create.getOptions());
        this.pProto = PipelineTranslation.toProto(create, create2);
        this.inputPCollectionId = create2.registerPCollection(pCollection);
        this.outputPCollectionId = create2.registerPCollection(pCollection2);
        this.pTransform = this.pProto.getComponents().getTransformsOrThrow(TEST_COMBINE_ID);
    }

    @Test
    public void testPrecombine() throws Exception {
        ArrayListMultimap create = ArrayListMultimap.create();
        ArrayDeque<WindowedValue> arrayDeque = new ArrayDeque();
        String str = (String) Iterables.getOnlyElement(this.pTransform.getOutputsMap().values());
        Objects.requireNonNull(arrayDeque);
        create.put(str, (v1) -> {
            r2.add(v1);
        });
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        CombineRunners.PrecombineFactory precombineFactory = new CombineRunners.PrecombineFactory();
        PipelineOptions create2 = PipelineOptionsFactory.create();
        RunnerApi.PTransform pTransform = this.pTransform;
        Map<String, RunnerApi.PCollection> pcollectionsMap = this.pProto.getComponents().getPcollectionsMap();
        Map<String, RunnerApi.Coder> codersMap = this.pProto.getComponents().getCodersMap();
        Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = this.pProto.getComponents().getWindowingStrategiesMap();
        Objects.requireNonNull(arrayList);
        Consumer consumer = (v1) -> {
            r11.add(v1);
        };
        Objects.requireNonNull(arrayList2);
        precombineFactory.createRunnerForPTransform(create2, (BeamFnDataClient) null, (BeamFnStateClient) null, TEST_COMBINE_ID, pTransform, (Supplier) null, pcollectionsMap, codersMap, windowingStrategiesMap, create, consumer, (v1) -> {
            r12.add(v1);
        }, (BundleSplitListener) null);
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList)).run();
        arrayDeque.clear();
        Assert.assertThat(create.keySet(), Matchers.containsInAnyOrder(this.inputPCollectionId, this.outputPCollectionId));
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Iterables.getOnlyElement(create.get((ArrayListMultimap) this.inputPCollectionId));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("A", "1")));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("A", "2")));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("A", "6")));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("B", "2")));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("C", "3")));
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
        Integer num = 0;
        for (WindowedValue windowedValue : arrayDeque) {
            if ("A".equals(((KV) windowedValue.getValue()).getKey())) {
                num = Integer.valueOf(num.intValue() + ((Integer) ((KV) windowedValue.getValue()).getValue()).intValue());
            }
        }
        Assert.assertThat(num, IsEqual.equalTo(9));
        arrayDeque.removeIf(windowedValue2 -> {
            return "A".equals(((KV) windowedValue2.getValue()).getKey());
        });
        Assert.assertThat(arrayDeque, Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow(KV.of("B", 2)), WindowedValue.valueInGlobalWindow(KV.of("C", 3))));
    }

    @Test
    public void testMergeAccumulators() throws Exception {
        ArrayListMultimap create = ArrayListMultimap.create();
        ArrayDeque arrayDeque = new ArrayDeque();
        String str = (String) Iterables.getOnlyElement(this.pTransform.getOutputsMap().values());
        Objects.requireNonNull(arrayDeque);
        create.put(str, (v1) -> {
            r2.add(v1);
        });
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        PTransformRunnerFactory forValueMapFnFactory = MapFnRunners.forValueMapFnFactory(CombineRunners::createMergeAccumulatorsMapFunction);
        PipelineOptions create2 = PipelineOptionsFactory.create();
        RunnerApi.PTransform pTransform = this.pTransform;
        Map emptyMap = Collections.emptyMap();
        Map emptyMap2 = Collections.emptyMap();
        Map emptyMap3 = Collections.emptyMap();
        Objects.requireNonNull(arrayList);
        Consumer consumer = (v1) -> {
            r11.add(v1);
        };
        Objects.requireNonNull(arrayList2);
        forValueMapFnFactory.createRunnerForPTransform(create2, (BeamFnDataClient) null, (BeamFnStateClient) null, TEST_COMBINE_ID, pTransform, (Supplier) null, emptyMap, emptyMap2, emptyMap3, create, consumer, (v1) -> {
            r12.add(v1);
        }, (BundleSplitListener) null);
        Assert.assertThat(arrayList, Matchers.empty());
        Assert.assertThat(arrayList2, Matchers.empty());
        arrayDeque.clear();
        Assert.assertThat(create.keySet(), Matchers.containsInAnyOrder(this.inputPCollectionId, this.outputPCollectionId));
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Iterables.getOnlyElement(create.get((ArrayListMultimap) this.inputPCollectionId));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("A", Arrays.asList(1, 2, 6))));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("B", Arrays.asList(2, 3))));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("C", Arrays.asList(5, 2))));
        Assert.assertThat(arrayDeque, Matchers.contains(WindowedValue.valueInGlobalWindow(KV.of("A", 9)), WindowedValue.valueInGlobalWindow(KV.of("B", 5)), WindowedValue.valueInGlobalWindow(KV.of("C", 7))));
    }

    @Test
    public void testExtractOutputs() throws Exception {
        ArrayListMultimap create = ArrayListMultimap.create();
        ArrayDeque arrayDeque = new ArrayDeque();
        String str = (String) Iterables.getOnlyElement(this.pTransform.getOutputsMap().values());
        Objects.requireNonNull(arrayDeque);
        create.put(str, (v1) -> {
            r2.add(v1);
        });
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        PTransformRunnerFactory forValueMapFnFactory = MapFnRunners.forValueMapFnFactory(CombineRunners::createExtractOutputsMapFunction);
        PipelineOptions create2 = PipelineOptionsFactory.create();
        RunnerApi.PTransform pTransform = this.pTransform;
        Map emptyMap = Collections.emptyMap();
        Map emptyMap2 = Collections.emptyMap();
        Map emptyMap3 = Collections.emptyMap();
        Objects.requireNonNull(arrayList);
        Consumer consumer = (v1) -> {
            r11.add(v1);
        };
        Objects.requireNonNull(arrayList2);
        forValueMapFnFactory.createRunnerForPTransform(create2, (BeamFnDataClient) null, (BeamFnStateClient) null, TEST_COMBINE_ID, pTransform, (Supplier) null, emptyMap, emptyMap2, emptyMap3, create, consumer, (v1) -> {
            r12.add(v1);
        }, (BundleSplitListener) null);
        Assert.assertThat(arrayList, Matchers.empty());
        Assert.assertThat(arrayList2, Matchers.empty());
        arrayDeque.clear();
        Assert.assertThat(create.keySet(), Matchers.containsInAnyOrder(this.inputPCollectionId, this.outputPCollectionId));
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Iterables.getOnlyElement(create.get((ArrayListMultimap) this.inputPCollectionId));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("A", 9)));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("B", 5)));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("C", 7)));
        Assert.assertThat(arrayDeque, Matchers.contains(WindowedValue.valueInGlobalWindow(KV.of("A", -9)), WindowedValue.valueInGlobalWindow(KV.of("B", -5)), WindowedValue.valueInGlobalWindow(KV.of("C", -7))));
    }

    @Test
    public void testCombineGroupedValues() throws Exception {
        ArrayListMultimap create = ArrayListMultimap.create();
        ArrayDeque arrayDeque = new ArrayDeque();
        String str = (String) Iterables.getOnlyElement(this.pTransform.getOutputsMap().values());
        Objects.requireNonNull(arrayDeque);
        create.put(str, (v1) -> {
            r2.add(v1);
        });
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        PTransformRunnerFactory forValueMapFnFactory = MapFnRunners.forValueMapFnFactory(CombineRunners::createCombineGroupedValuesMapFunction);
        PipelineOptions create2 = PipelineOptionsFactory.create();
        RunnerApi.PTransform pTransform = this.pTransform;
        Map emptyMap = Collections.emptyMap();
        Map emptyMap2 = Collections.emptyMap();
        Map emptyMap3 = Collections.emptyMap();
        Objects.requireNonNull(arrayList);
        Consumer consumer = (v1) -> {
            r11.add(v1);
        };
        Objects.requireNonNull(arrayList2);
        forValueMapFnFactory.createRunnerForPTransform(create2, (BeamFnDataClient) null, (BeamFnStateClient) null, TEST_COMBINE_ID, pTransform, (Supplier) null, emptyMap, emptyMap2, emptyMap3, create, consumer, (v1) -> {
            r12.add(v1);
        }, (BundleSplitListener) null);
        Assert.assertThat(arrayList, Matchers.empty());
        Assert.assertThat(arrayList2, Matchers.empty());
        arrayDeque.clear();
        Assert.assertThat(create.keySet(), Matchers.containsInAnyOrder(this.inputPCollectionId, this.outputPCollectionId));
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Iterables.getOnlyElement(create.get((ArrayListMultimap) this.inputPCollectionId));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("A", Arrays.asList("1", "2", "6"))));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("B", Arrays.asList("2", "3"))));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("C", Arrays.asList("5", "2"))));
        Assert.assertThat(arrayDeque, Matchers.contains(WindowedValue.valueInGlobalWindow(KV.of("A", -9)), WindowedValue.valueInGlobalWindow(KV.of("B", -5)), WindowedValue.valueInGlobalWindow(KV.of("C", -7))));
    }
}
