package org.apache.beam.runners.direct.portable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.Environments;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.RehydratedComponents;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.SdkComponents;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.wire.WireCoders;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/portable/BundleFactoryOutputReceiverFactoryTest.class */
public class BundleFactoryOutputReceiverFactoryTest {
    private final BundleFactory bundleFactory = ImmutableListBundleFactory.create();
    private PipelineNode.PCollectionNode fooPC;
    private PipelineNode.PCollectionNode barPC;
    private RunnerApi.Components baseComponents;
    private OutputReceiverFactory factory;
    private Collection<UncommittedBundle<?>> outputBundles;

    @Before
    public void setup() throws IOException {
        Pipeline create = Pipeline.create();
        PCollection<?> apply = create.apply("createFoo", Create.of("1", new String[]{"2", "3"})).apply("windowFoo", Window.into(FixedWindows.of(Duration.standardMinutes(5L))));
        PCollection<?> apply2 = create.apply("bar", Create.of(1, new Integer[]{2, 3}));
        SdkComponents create2 = SdkComponents.create();
        create2.registerEnvironment(Environments.createDockerEnvironment("java"));
        String registerPCollection = create2.registerPCollection(apply);
        String registerPCollection2 = create2.registerPCollection(apply2);
        this.baseComponents = create2.toComponents();
        this.fooPC = PipelineNode.pCollection(registerPCollection, this.baseComponents.getPcollectionsOrThrow(registerPCollection));
        this.barPC = PipelineNode.pCollection(registerPCollection2, this.baseComponents.getPcollectionsOrThrow(registerPCollection2));
        this.outputBundles = new ArrayList();
        BundleFactory bundleFactory = this.bundleFactory;
        RunnerApi.Components components = this.baseComponents;
        Collection<UncommittedBundle<?>> collection = this.outputBundles;
        Objects.requireNonNull(collection);
        this.factory = BundleFactoryOutputReceiverFactory.create(bundleFactory, components, (v1) -> {
            r3.add(v1);
        });
    }

    @Test
    public void addsBundlesToResult() {
        this.factory.create(this.fooPC.getId());
        this.factory.create(this.barPC.getId());
        Assert.assertThat(Integer.valueOf(Iterables.size(this.outputBundles)), Matchers.equalTo(2));
        ArrayList arrayList = new ArrayList();
        Iterator<UncommittedBundle<?>> it = this.outputBundles.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getPCollection());
        }
        Assert.assertThat(arrayList, Matchers.containsInAnyOrder(new PipelineNode.PCollectionNode[]{this.fooPC, this.barPC}));
    }

    @Test
    public void receiverAddsElementsToBundle() throws Exception {
        FnDataReceiver create = this.factory.create(this.fooPC.getId());
        RunnerApi.Components.Builder builder = this.baseComponents.toBuilder();
        String addSdkWireCoder = WireCoders.addSdkWireCoder(this.fooPC, builder);
        RunnerApi.Components build = builder.build();
        Coder<?> coder = RehydratedComponents.forComponents(build).getCoder(addSdkWireCoder);
        Coder instantiateRunnerWireCoder = WireCoders.instantiateRunnerWireCoder(this.fooPC, build);
        WindowedValue windowedValue = (WindowedValue) CoderUtils.decodeFromByteArray(instantiateRunnerWireCoder, CoderUtils.encodeToByteArray(coder, WindowedValue.of("1", new Instant(120L), new IntervalWindow(new Instant(0L), Duration.standardMinutes(5L)), PaneInfo.NO_FIRING)));
        WindowedValue windowedValue2 = (WindowedValue) CoderUtils.decodeFromByteArray(instantiateRunnerWireCoder, CoderUtils.encodeToByteArray(coder, WindowedValue.of("2", new Instant(240L), new IntervalWindow(new Instant(0L), Duration.standardMinutes(5L)), PaneInfo.NO_FIRING)));
        create.accept(windowedValue);
        create.accept(windowedValue2);
        Assert.assertThat(((UncommittedBundle) Iterables.getOnlyElement(this.outputBundles)).commit(Instant.now()), Matchers.containsInAnyOrder(new WindowedValue[]{windowedValue, windowedValue2}));
    }

    @Test
    public void multipleInstancesOfPCollectionIndependent() throws Exception {
        FnDataReceiver create = this.factory.create(this.fooPC.getId());
        FnDataReceiver create2 = this.factory.create(this.fooPC.getId());
        RunnerApi.Components.Builder builder = this.baseComponents.toBuilder();
        String addSdkWireCoder = WireCoders.addSdkWireCoder(this.fooPC, builder);
        RunnerApi.Components build = builder.build();
        Coder<?> coder = RehydratedComponents.forComponents(build).getCoder(addSdkWireCoder);
        Coder instantiateRunnerWireCoder = WireCoders.instantiateRunnerWireCoder(this.fooPC, build);
        WindowedValue windowedValue = (WindowedValue) CoderUtils.decodeFromByteArray(instantiateRunnerWireCoder, CoderUtils.encodeToByteArray(coder, WindowedValue.of("1", new Instant(120L), new IntervalWindow(new Instant(0L), Duration.standardMinutes(5L)), PaneInfo.NO_FIRING)));
        create.accept(windowedValue);
        WindowedValue windowedValue2 = (WindowedValue) CoderUtils.decodeFromByteArray(instantiateRunnerWireCoder, CoderUtils.encodeToByteArray(coder, WindowedValue.of("2", new Instant(240L), new IntervalWindow(new Instant(0L), Duration.standardMinutes(5L)), PaneInfo.NO_FIRING)));
        create2.accept(windowedValue2);
        ArrayList arrayList = new ArrayList();
        for (UncommittedBundle<?> uncommittedBundle : this.outputBundles) {
            Assert.assertThat(uncommittedBundle.getPCollection(), Matchers.equalTo(this.fooPC));
            Iterable elements = uncommittedBundle.commit(Instant.now()).getElements();
            Iterables.addAll(arrayList, elements);
            Assert.assertThat(Integer.valueOf(Iterables.size(elements)), Matchers.equalTo(1));
        }
        Assert.assertThat(arrayList, Matchers.containsInAnyOrder(new WindowedValue[]{windowedValue, windowedValue2}));
    }

    @Test
    public void differentPCollectionsIndependent() throws Exception {
        FnDataReceiver create = this.factory.create(this.fooPC.getId());
        RunnerApi.Components.Builder builder = this.baseComponents.toBuilder();
        String addSdkWireCoder = WireCoders.addSdkWireCoder(this.fooPC, builder);
        String addSdkWireCoder2 = WireCoders.addSdkWireCoder(this.barPC, builder);
        RunnerApi.Components build = builder.build();
        Coder<?> coder = RehydratedComponents.forComponents(build).getCoder(addSdkWireCoder);
        Coder instantiateRunnerWireCoder = WireCoders.instantiateRunnerWireCoder(this.fooPC, build);
        FnDataReceiver create2 = this.factory.create(this.barPC.getId());
        Coder<?> coder2 = RehydratedComponents.forComponents(build).getCoder(addSdkWireCoder2);
        Coder instantiateRunnerWireCoder2 = WireCoders.instantiateRunnerWireCoder(this.barPC, build);
        WindowedValue windowedValue = (WindowedValue) CoderUtils.decodeFromByteArray(instantiateRunnerWireCoder, CoderUtils.encodeToByteArray(coder, WindowedValue.of("1", new Instant(120L), new IntervalWindow(new Instant(0L), Duration.standardMinutes(5L)), PaneInfo.NO_FIRING)));
        create.accept(windowedValue);
        WindowedValue windowedValue2 = (WindowedValue) CoderUtils.decodeFromByteArray(instantiateRunnerWireCoder2, CoderUtils.encodeToByteArray(coder2, WindowedValue.timestampedValueInGlobalWindow(2, new Instant(240L))));
        create2.accept(windowedValue2);
        ArrayList arrayList = new ArrayList();
        for (UncommittedBundle<?> uncommittedBundle : this.outputBundles) {
            WindowedValue windowedValue3 = (WindowedValue) Iterables.getOnlyElement(uncommittedBundle.commit(Instant.now()).getElements());
            if (this.fooPC.equals(uncommittedBundle.getPCollection())) {
                Assert.assertThat(windowedValue3, Matchers.equalTo(windowedValue));
            } else if (this.barPC.equals(uncommittedBundle.getPCollection())) {
                Assert.assertThat(windowedValue3, Matchers.equalTo(windowedValue2));
            } else {
                Assert.fail(String.format("Output %s should be either 'foo' or 'bar', got '%s", PCollection.class.getSimpleName(), uncommittedBundle.getPCollection().getId()));
            }
            arrayList.add(windowedValue3);
        }
        Assert.assertThat(arrayList, Matchers.containsInAnyOrder(new Object[]{windowedValue, windowedValue2}));
    }
}
