package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.Collections;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.local.StructuralKey;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.class */
public class ImmutableListBundleFactoryTest {

    @Rule
    public final TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private ImmutableListBundleFactory bundleFactory = ImmutableListBundleFactory.create();
    private PCollection<Integer> created;
    private PCollection<KV<String, Integer>> downstream;

    @Before
    public void setup() {
        this.created = this.p.apply(Create.of(1, new Integer[]{2, 3}));
        this.downstream = this.created.apply(WithKeys.of("foo"));
    }

    private <T> void createKeyedBundle(Coder<T> coder, T t) throws Exception {
        PCollection apply = this.p.apply("Create", Create.of(1, new Integer[0]));
        StructuralKey of = StructuralKey.of(t, coder);
        Assert.assertThat(this.bundleFactory.createKeyedBundle(of, apply).commit(Instant.now()).getKey(), Matchers.equalTo(of));
    }

    @Test
    public void keyedWithNullKeyShouldCreateKeyedBundle() throws Exception {
        createKeyedBundle(VoidCoder.of(), null);
    }

    @Test
    public void keyedWithStringKeyShouldCreateKeyedBundle() throws Exception {
        createKeyedBundle(StringUtf8Coder.of(), "foo");
    }

    @Test
    public void keyedWithVarIntKeyShouldCreateKeyedBundle() throws Exception {
        createKeyedBundle(VarIntCoder.of(), 1234);
    }

    @Test
    public void keyedWithByteArrayKeyShouldCreateKeyedBundle() throws Exception {
        createKeyedBundle(ByteArrayCoder.of(), new byte[]{0, 2, 4, 99});
    }

    private <T> CommittedBundle<T> afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> iterable) {
        UncommittedBundle createRootBundle = this.bundleFactory.createRootBundle();
        ArrayList arrayList = new ArrayList();
        Instant instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
        for (WindowedValue<T> windowedValue : iterable) {
            createRootBundle.add(windowedValue);
            arrayList.add(Matchers.equalTo(windowedValue));
            if (windowedValue.getTimestamp().isBefore(instant)) {
                instant = windowedValue.getTimestamp();
            }
        }
        Matcher containsInAnyOrder = Matchers.containsInAnyOrder(arrayList);
        Instant now = Instant.now();
        CommittedBundle<T> commit = createRootBundle.commit(now);
        Assert.assertThat(commit.getElements(), containsInAnyOrder);
        Assert.assertThat(instant, Matchers.not(Matchers.equalTo(now)));
        Assert.assertThat(commit.getMinimumTimestamp(), Matchers.equalTo(instant));
        Assert.assertThat(commit.getSynchronizedProcessingOutputWatermark(), Matchers.equalTo(now));
        return commit;
    }

    @Test
    public void getElementsBeforeAddShouldReturnEmptyIterable() {
        afterCommitGetElementsShouldHaveAddedElements(Collections.emptyList());
    }

    @Test
    public void getElementsAfterAddShouldReturnAddedElements() {
        afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(WindowedValue.valueInGlobalWindow(1), WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1000L))));
    }

    @Test
    public void addElementsAtEndOfTimeThrows() {
        Instant instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
        WindowedValue timestampedValueInGlobalWindow = WindowedValue.timestampedValueInGlobalWindow(1, instant);
        UncommittedBundle createRootBundle = this.bundleFactory.createRootBundle();
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage(instant.toString());
        createRootBundle.add(timestampedValueInGlobalWindow);
    }

    @Test
    public void addElementsPastEndOfTimeThrows() {
        Instant plus = BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.standardMinutes(2L));
        WindowedValue timestampedValueInGlobalWindow = WindowedValue.timestampedValueInGlobalWindow(1, plus);
        UncommittedBundle createRootBundle = this.bundleFactory.createRootBundle();
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage(plus.toString());
        createRootBundle.add(timestampedValueInGlobalWindow);
    }

    @Test
    public void withElementsShouldReturnIndependentBundle() {
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(1);
        WindowedValue timestampedValueInGlobalWindow = WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1000L));
        CommittedBundle afterCommitGetElementsShouldHaveAddedElements = afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(valueInGlobalWindow, timestampedValueInGlobalWindow));
        WindowedValue of = WindowedValue.of(9, new Instant(2048L), new IntervalWindow(new Instant(2044L), Instant.now()), PaneInfo.NO_FIRING);
        WindowedValue timestampedValueInGlobalWindow2 = WindowedValue.timestampedValueInGlobalWindow(-1, Instant.now());
        CommittedBundle withElements = afterCommitGetElementsShouldHaveAddedElements.withElements(ImmutableList.of(of, timestampedValueInGlobalWindow2));
        Assert.assertThat(withElements.getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{of, timestampedValueInGlobalWindow2}));
        Assert.assertThat(afterCommitGetElementsShouldHaveAddedElements.getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{valueInGlobalWindow, timestampedValueInGlobalWindow}));
        Assert.assertThat(withElements.getKey(), Matchers.equalTo(afterCommitGetElementsShouldHaveAddedElements.getKey()));
        Assert.assertThat(withElements.getPCollection(), Matchers.equalTo(afterCommitGetElementsShouldHaveAddedElements.getPCollection()));
        Assert.assertThat(withElements.getSynchronizedProcessingOutputWatermark(), Matchers.equalTo(afterCommitGetElementsShouldHaveAddedElements.getSynchronizedProcessingOutputWatermark()));
        Assert.assertThat(withElements.getMinimumTimestamp(), Matchers.equalTo(new Instant(2048L)));
    }

    @Test
    public void addAfterCommitShouldThrowException() {
        UncommittedBundle createRootBundle = this.bundleFactory.createRootBundle();
        createRootBundle.add(WindowedValue.valueInGlobalWindow(1));
        Assert.assertThat(createRootBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(1)}));
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("3");
        this.thrown.expectMessage("committed");
        createRootBundle.add(WindowedValue.valueInGlobalWindow(3));
    }

    @Test
    public void commitAfterCommitShouldThrowException() {
        UncommittedBundle createRootBundle = this.bundleFactory.createRootBundle();
        createRootBundle.add(WindowedValue.valueInGlobalWindow(1));
        Assert.assertThat(createRootBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(1)}));
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("committed");
        createRootBundle.commit(Instant.now());
    }

    @Test
    public void createKeyedBundleKeyed() {
        Assert.assertThat(this.bundleFactory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), this.downstream).commit(Instant.now()).getKey().getKey(), Matchers.equalTo("foo"));
    }
}
