package org.apache.beam.runners.direct.portable;

import java.util.Iterator;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItems;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.Environments;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.RehydratedComponents;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.SdkComponents;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.wire.LengthPrefixUnknownCoders;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.StructuralKey;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.HashMultiset;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactoryTest.class */
public class GroupByKeyOnlyEvaluatorFactoryTest {
    private BundleFactory bundleFactory = ImmutableListBundleFactory.create();

    /* loaded from: input_file:org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactoryTest$KeyedWorkItemMatcher.class */
    private static class KeyedWorkItemMatcher<K, V> extends BaseMatcher<WindowedValue<KeyedWorkItem<K, V>>> {
        private final KeyedWorkItem<K, V> myWorkItem;
        private final Coder<K> keyCoder;

        public KeyedWorkItemMatcher(KeyedWorkItem<K, V> keyedWorkItem, Coder<K> coder) {
            this.myWorkItem = keyedWorkItem;
            this.keyCoder = coder;
        }

        public boolean matches(Object obj) {
            if (obj == null || !(obj instanceof WindowedValue)) {
                return false;
            }
            WindowedValue windowedValue = (WindowedValue) obj;
            HashMultiset create = HashMultiset.create();
            HashMultiset create2 = HashMultiset.create();
            Iterator<WindowedValue<V>> it = this.myWorkItem.elementsIterable().iterator();
            while (it.hasNext()) {
                create.add(it.next());
            }
            Iterator it2 = ((KeyedWorkItem) windowedValue.getValue()).elementsIterable().iterator();
            while (it2.hasNext()) {
                create2.add((WindowedValue) it2.next());
            }
            try {
                if (create.equals(create2)) {
                    if (this.keyCoder.structuralValue(this.myWorkItem.key()).equals(this.keyCoder.structuralValue(((KeyedWorkItem) windowedValue.getValue()).key()))) {
                        return true;
                    }
                }
                return false;
            } catch (Exception e) {
                return false;
            }
        }

        public void describeTo(Description description) {
            description.appendText("KeyedWorkItem<K, V> containing key ").appendValue(this.myWorkItem.key()).appendText(" and values ").appendValueList("[", ", ", "]", this.myWorkItem.elementsIterable());
        }
    }

    @Test
    public void testInMemoryEvaluator() throws Exception {
        Coder<?> of = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
        SdkComponents create = SdkComponents.create();
        create.registerEnvironment(Environments.createDockerEnvironment("java"));
        String registerWindowingStrategy = create.registerWindowingStrategy(WindowingStrategy.globalDefault());
        String registerCoder = create.registerCoder(of);
        RunnerApi.Components.Builder builder = create.toComponents().toBuilder();
        String addLengthPrefixedCoder = LengthPrefixUnknownCoders.addLengthPrefixedCoder(registerCoder, builder, false);
        String addLengthPrefixedCoder2 = LengthPrefixUnknownCoders.addLengthPrefixedCoder(registerCoder, builder, true);
        RehydratedComponents forComponents = RehydratedComponents.forComponents(builder.build());
        Coder<?> coder = forComponents.getCoder(addLengthPrefixedCoder);
        KvCoder coder2 = forComponents.getCoder(addLengthPrefixedCoder2);
        KV<?, ?> asRunnerKV = asRunnerKV(coder, coder2, KV.of("foo", -1));
        KV<?, ?> asRunnerKV2 = asRunnerKV(coder, coder2, KV.of("foo", 1));
        KV<?, ?> asRunnerKV3 = asRunnerKV(coder, coder2, KV.of("foo", 3));
        KV<?, ?> asRunnerKV4 = asRunnerKV(coder, coder2, KV.of("bar", 22));
        KV<?, ?> asRunnerKV5 = asRunnerKV(coder, coder2, KV.of("bar", 12));
        KV<?, ?> asRunnerKV6 = asRunnerKV(coder, coder2, KV.of("baz", Integer.MAX_VALUE));
        PipelineNode.PTransformNode pTransform = PipelineNode.pTransform("source", RunnerApi.PTransform.newBuilder().putOutputs("out", "values").build());
        PipelineNode.PCollectionNode pCollection = PipelineNode.pCollection("values", RunnerApi.PCollection.newBuilder().setUniqueName("values").setCoderId(registerCoder).setWindowingStrategyId(registerWindowingStrategy).build());
        PipelineNode.PCollectionNode pCollection2 = PipelineNode.pCollection("groupedKvs", RunnerApi.PCollection.newBuilder().setUniqueName("groupedKvs").build());
        PipelineNode.PTransformNode pTransform2 = PipelineNode.pTransform("gbko", RunnerApi.PTransform.newBuilder().putInputs("input", "values").putOutputs("output", "groupedKvs").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:beam:directrunner:transforms:gbko:v1").build()).build());
        RunnerApi.Pipeline build = RunnerApi.Pipeline.newBuilder().addRootTransformIds(pTransform.getId()).addRootTransformIds(pTransform2.getId()).setComponents(builder.putTransforms(pTransform.getId(), pTransform.getTransform()).putTransforms(pTransform2.getId(), pTransform2.getTransform()).putPcollections(pCollection.getId(), pCollection.getPCollection()).putPcollections(pCollection2.getId(), pCollection2.getPCollection())).build();
        TransformEvaluator forApplication = new GroupByKeyOnlyEvaluatorFactory(PortableGraph.forPipeline(build), build.getComponents(), this.bundleFactory).forApplication(pTransform2, this.bundleFactory.createBundle(pCollection).commit(Instant.now()));
        forApplication.processElement(WindowedValue.valueInGlobalWindow(asRunnerKV));
        forApplication.processElement(WindowedValue.valueInGlobalWindow(asRunnerKV2));
        forApplication.processElement(WindowedValue.valueInGlobalWindow(asRunnerKV3));
        forApplication.processElement(WindowedValue.valueInGlobalWindow(asRunnerKV4));
        forApplication.processElement(WindowedValue.valueInGlobalWindow(asRunnerKV5));
        forApplication.processElement(WindowedValue.valueInGlobalWindow(asRunnerKV6));
        TransformResult finishBundle = forApplication.finishBundle();
        Coder keyCoder = coder2.getKeyCoder();
        CommittedBundle committedBundle = null;
        CommittedBundle committedBundle2 = null;
        CommittedBundle committedBundle3 = null;
        StructuralKey of2 = StructuralKey.of(asRunnerKV.getKey(), keyCoder);
        StructuralKey of3 = StructuralKey.of(asRunnerKV4.getKey(), keyCoder);
        StructuralKey of4 = StructuralKey.of(asRunnerKV6.getKey(), keyCoder);
        Iterator it = finishBundle.getOutputBundles().iterator();
        while (it.hasNext()) {
            CommittedBundle commit = ((UncommittedBundle) it.next()).commit(Instant.now());
            if (of2.equals(commit.getKey())) {
                committedBundle = commit;
            } else if (of3.equals(commit.getKey())) {
                committedBundle2 = commit;
            } else {
                if (!of4.equals(commit.getKey())) {
                    throw new IllegalArgumentException(String.format("Unknown Key %s", commit.getKey()));
                }
                committedBundle3 = commit;
            }
        }
        Assert.assertThat(committedBundle, Matchers.contains(new KeyedWorkItemMatcher(KeyedWorkItems.elementsWorkItem(of2.getKey(), ImmutableSet.of(WindowedValue.valueInGlobalWindow(asRunnerKV.getValue()), WindowedValue.valueInGlobalWindow(asRunnerKV2.getValue()), WindowedValue.valueInGlobalWindow(asRunnerKV3.getValue()))), keyCoder)));
        Assert.assertThat(committedBundle2, Matchers.contains(new KeyedWorkItemMatcher(KeyedWorkItems.elementsWorkItem(of3.getKey(), ImmutableSet.of(WindowedValue.valueInGlobalWindow(asRunnerKV4.getValue()), WindowedValue.valueInGlobalWindow(asRunnerKV5.getValue()))), keyCoder)));
        Assert.assertThat(committedBundle3, Matchers.contains(new KeyedWorkItemMatcher(KeyedWorkItems.elementsWorkItem(of4.getKey(), ImmutableSet.of(WindowedValue.valueInGlobalWindow(asRunnerKV6.getValue()))), keyCoder)));
    }

    private KV<?, ?> asRunnerKV(Coder<KV<String, Integer>> coder, Coder<KV<?, ?>> coder2, KV<String, Integer> kv) throws CoderException {
        return (KV) CoderUtils.decodeFromByteArray(coder2, CoderUtils.encodeToByteArray(coder, kv));
    }

    private <K, V> KV<K, WindowedValue<V>> gwValue(KV<K, V> kv) {
        return KV.of(kv.getKey(), WindowedValue.valueInGlobalWindow(kv.getValue()));
    }
}
