package org.apache.beam.sdk.nexmark.queries;

import java.lang.invoke.SerializedLambda;
import java.util.Random;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.KnownSize;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinTest.class */
public class BoundedSideInputJoinTest {

    @Rule
    public TestPipeline p = TestPipeline.create();

    @Before
    public void setupPipeline() {
        NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, this.p);
    }

    private <T extends KnownSize> void queryMatchesModel(String str, NexmarkConfiguration nexmarkConfiguration, NexmarkQueryTransform<T> nexmarkQueryTransform, NexmarkQueryModel<T> nexmarkQueryModel, boolean z) throws Exception {
        nexmarkConfiguration.sideInputUrl = FileSystems.matchNewResource(String.format("%s/BoundedSideInputJoin-%s", this.p.getOptions().getTempLocation(), Integer.valueOf(new Random().nextInt())), false).toString();
        try {
            nexmarkQueryTransform.setSideInput(NexmarkUtils.prepareSideInput(this.p, nexmarkConfiguration));
            PAssert.that(this.p.apply(str + ".Read", z ? NexmarkUtils.streamEventsSource(nexmarkConfiguration) : NexmarkUtils.batchEventsSource(nexmarkConfiguration)).apply(new NexmarkQuery(nexmarkConfiguration, nexmarkQueryTransform))).satisfies(nexmarkQueryModel.assertionFor());
            this.p.run().waitUntilFinish();
            NexmarkUtils.cleanUpSideInput(nexmarkConfiguration);
        } catch (Throwable th) {
            NexmarkUtils.cleanUpSideInput(nexmarkConfiguration);
            throw th;
        }
    }

    @Test
    @Category({NeedsRunner.class})
    public void inputOutputSameEvents() throws Exception {
        NexmarkConfiguration copy = NexmarkConfiguration.DEFAULT.copy();
        copy.sideInputType = NexmarkUtils.SideInputType.DIRECT;
        copy.numEventGenerators = 1;
        copy.numEvents = 5000L;
        copy.sideInputRowCount = 10;
        copy.sideInputNumShards = 3;
        PCollection prepareSideInput = NexmarkUtils.prepareSideInput(this.p, copy);
        try {
            PCollection apply = this.p.apply(NexmarkUtils.batchEventsSource(copy));
            PCollection apply2 = apply.apply(NexmarkQueryUtil.JUST_BIDS).apply("Count Bids", Count.globally());
            BoundedSideInputJoin boundedSideInputJoin = new BoundedSideInputJoin(copy);
            boundedSideInputJoin.setSideInput(prepareSideInput);
            PAssert.that(PCollectionList.of(apply2).and(apply.apply(new NexmarkQuery(copy, boundedSideInputJoin)).apply("Count outputs", Count.globally())).apply(Flatten.pCollections())).satisfies(iterable -> {
                MatcherAssert.assertThat(Integer.valueOf(Iterables.size(iterable)), Matchers.equalTo(2));
                MatcherAssert.assertThat((Long) Iterables.get(iterable, 0), Matchers.greaterThan(0L));
                MatcherAssert.assertThat((Long) Iterables.get(iterable, 0), Matchers.equalTo((Long) Iterables.get(iterable, 1)));
                return null;
            });
            this.p.run();
            NexmarkUtils.cleanUpSideInput(copy);
        } catch (Throwable th) {
            NexmarkUtils.cleanUpSideInput(copy);
            throw th;
        }
    }

    @Test
    @Category({NeedsRunner.class})
    public void queryMatchesModelBatchDirect() throws Exception {
        NexmarkConfiguration copy = NexmarkConfiguration.DEFAULT.copy();
        copy.sideInputType = NexmarkUtils.SideInputType.DIRECT;
        copy.numEventGenerators = 1;
        copy.numEvents = 5000L;
        copy.sideInputRowCount = 10;
        copy.sideInputNumShards = 3;
        queryMatchesModel("BoundedSideInputJoinTestBatch", copy, new BoundedSideInputJoin(copy), new BoundedSideInputJoinModel(copy), false);
    }

    @Test
    @Category({NeedsRunner.class})
    public void queryMatchesModelStreamingDirect() throws Exception {
        NexmarkConfiguration copy = NexmarkConfiguration.DEFAULT.copy();
        copy.sideInputType = NexmarkUtils.SideInputType.DIRECT;
        copy.numEventGenerators = 1;
        copy.numEvents = 5000L;
        copy.sideInputRowCount = 10;
        copy.sideInputNumShards = 3;
        queryMatchesModel("BoundedSideInputJoinTestStreaming", copy, new BoundedSideInputJoin(copy), new BoundedSideInputJoinModel(copy), true);
    }

    @Test
    @Category({NeedsRunner.class})
    public void queryMatchesModelBatchCsv() throws Exception {
        NexmarkConfiguration copy = NexmarkConfiguration.DEFAULT.copy();
        copy.sideInputType = NexmarkUtils.SideInputType.CSV;
        copy.numEventGenerators = 1;
        copy.numEvents = 5000L;
        copy.sideInputRowCount = 10;
        copy.sideInputNumShards = 3;
        queryMatchesModel("BoundedSideInputJoinTestBatch", copy, new BoundedSideInputJoin(copy), new BoundedSideInputJoinModel(copy), false);
    }

    @Test
    @Category({NeedsRunner.class})
    public void queryMatchesModelStreamingCsv() throws Exception {
        NexmarkConfiguration copy = NexmarkConfiguration.DEFAULT.copy();
        copy.sideInputType = NexmarkUtils.SideInputType.CSV;
        copy.numEventGenerators = 1;
        copy.numEvents = 5000L;
        copy.sideInputRowCount = 10;
        copy.sideInputNumShards = 3;
        queryMatchesModel("BoundedSideInputJoinTestStreaming", copy, new BoundedSideInputJoin(copy), new BoundedSideInputJoinModel(copy), true);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -322135581:
                if (implMethodName.equals("lambda$inputOutputSameEvents$43268ee4$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable -> {
                        MatcherAssert.assertThat(Integer.valueOf(Iterables.size(iterable)), Matchers.equalTo(2));
                        MatcherAssert.assertThat((Long) Iterables.get(iterable, 0), Matchers.greaterThan(0L));
                        MatcherAssert.assertThat((Long) Iterables.get(iterable, 0), Matchers.equalTo((Long) Iterables.get(iterable, 1)));
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
