package org.apache.beam.fn.harness;

import java.util.Collections;
import java.util.HashSet;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/WindowMergingFnRunnerTest.class */
public class WindowMergingFnRunnerTest {
    @Test
    public void testWindowMergingWithNonMergingWindowFn() throws Exception {
        ThrowingFunction createMapFunctionForPTransform = WindowMergingFnRunner.createMapFunctionForPTransform("ptransformId", createMergeTransformForWindowFn(new GlobalWindows()));
        KV of = KV.of("abc", ImmutableList.of(new IntervalWindow(Instant.now(), Duration.standardMinutes(1L))));
        Assert.assertEquals(KV.of(of.getKey(), KV.of((Iterable) of.getValue(), Collections.emptyList())), createMapFunctionForPTransform.apply(of));
    }

    @Test
    public void testWindowMergingWithMergingWindowFn() throws Exception {
        ThrowingFunction createMapFunctionForPTransform = WindowMergingFnRunner.createMapFunctionForPTransform("ptransformId", createMergeTransformForWindowFn(Sessions.withGapDuration(Duration.millis(5L))));
        BoundedWindow[] boundedWindowArr = {new IntervalWindow(new Instant(9L), new Instant(11L)), new IntervalWindow(new Instant(10L), new Instant(10L)), new IntervalWindow(new Instant(7L), new Instant(10L))};
        HashSet newHashSet = Sets.newHashSet(new IntervalWindow(new Instant(1L), new Instant(1L)), new IntervalWindow(new Instant(20L), new Instant(20L)));
        KV of = KV.of("abc", ImmutableList.builder().add((Object[]) boundedWindowArr).addAll((Iterable) newHashSet).build());
        KV kv = (KV) createMapFunctionForPTransform.apply(of);
        Assert.assertEquals(of.getKey(), kv.getKey());
        Assert.assertEquals(newHashSet, ((KV) kv.getValue()).getKey());
        KV kv2 = (KV) Iterables.getOnlyElement((Iterable) ((KV) kv.getValue()).getValue());
        Assert.assertEquals(new IntervalWindow(new Instant(7L), new Instant(11L)), kv2.getKey());
        MatcherAssert.assertThat((Iterable) kv2.getValue(), (Matcher<? super Iterable>) Matchers.containsInAnyOrder(boundedWindowArr));
        BoundedWindow[] boundedWindowArr2 = {new IntervalWindow(new Instant(15L), new Instant(17L)), new IntervalWindow(new Instant(16L), new Instant(18L))};
        KV of2 = KV.of("abc", ImmutableList.builder().add((Object[]) boundedWindowArr2).addAll((Iterable) newHashSet).build());
        KV kv3 = (KV) createMapFunctionForPTransform.apply(of2);
        Assert.assertEquals(of2.getKey(), kv3.getKey());
        Assert.assertEquals(newHashSet, ((KV) kv3.getValue()).getKey());
        KV kv4 = (KV) Iterables.getOnlyElement((Iterable) ((KV) kv3.getValue()).getValue());
        Assert.assertEquals(new IntervalWindow(new Instant(15L), new Instant(18L)), kv4.getKey());
        MatcherAssert.assertThat((Iterable) kv4.getValue(), (Matcher<? super Iterable>) Matchers.containsInAnyOrder(boundedWindowArr2));
    }

    private static <W extends BoundedWindow> RunnerApi.PTransform createMergeTransformForWindowFn(WindowFn<?, W> windowFn) throws Exception {
        SdkComponents create = SdkComponents.create();
        create.registerEnvironment(Environments.createDockerEnvironment("test"));
        return RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.MERGE_WINDOWS_TRANSFORM_URN).setPayload(WindowingStrategyTranslation.toProto((WindowFn<?, ?>) windowFn, create).toByteString()).build()).build();
    }
}
