package org.apache.beam.sdk.extensions.ordered;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.extensions.ordered.UnprocessedEvent;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SerializableMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.class */
public class OrderedEventProcessorTest {
    public static final boolean LAST_EVENT_RECEIVED = true;
    public static final int EMISSION_FREQUENCY_ON_EVERY_ELEMENT = 1;
    public static final int INITIAL_SEQUENCE_OF_0 = 0;
    public static final boolean DONT_PRODUCE_STATUS_ON_EVERY_EVENT = false;
    public static final int LARGE_MAX_RESULTS_PER_OUTPUT = 1000;
    public static final int EMISSION_FREQUENCY_ON_EVERY_OTHER_EVENT = 2;
    public static final boolean PRODUCE_STATUS_ON_EVERY_EVENT = true;
    public static final boolean STREAMING = true;
    public static final boolean BATCH = false;
    public static final Set<KV<String, KV<Long, UnprocessedEvent<String>>>> NO_EXPECTED_DLQ_EVENTS = Collections.emptySet();

    @Rule
    public final transient TestPipeline streamingPipeline = TestPipeline.create();

    @Rule
    public final transient TestPipeline batchPipeline = TestPipeline.create();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest$MapEventsToKV.class */
    public static class MapEventsToKV extends DoFn<Event, KV<String, KV<Long, String>>> {
        MapEventsToKV() {
        }

        @DoFn.ProcessElement
        public void convert(@DoFn.Element Event event, DoFn.OutputReceiver<KV<String, KV<Long, String>>> outputReceiver) {
            outputReceiver.output(KV.of(event.getKey(), KV.of(Long.valueOf(event.getSequence()), event.getValue())));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest$MapStringBufferStateToString.class */
    static class MapStringBufferStateToString extends DoFn<KV<String, StringBuilderState>, KV<String, String>> {
        MapStringBufferStateToString() {
        }

        @DoFn.ProcessElement
        public void map(@DoFn.Element KV<String, StringBuilderState> kv, DoFn.OutputReceiver<KV<String, String>> outputReceiver) {
            outputReceiver.output(KV.of((String) kv.getKey(), ((StringBuilderState) kv.getValue()).toString()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest$UnprocessedEventMatcher.class */
    static class UnprocessedEventMatcher extends BaseMatcher<KV<String, KV<Long, UnprocessedEvent<String>>>> implements SerializableMatcher<KV<String, KV<Long, UnprocessedEvent<String>>>> {
        private KV<String, KV<Long, UnprocessedEvent<String>>> element;

        public UnprocessedEventMatcher(KV<String, KV<Long, UnprocessedEvent<String>>> kv) {
            this.element = kv;
        }

        public boolean matches(Object obj) {
            KV kv = (KV) obj;
            UnprocessedEvent unprocessedEvent = (UnprocessedEvent) ((KV) this.element.getValue()).getValue();
            UnprocessedEvent unprocessedEvent2 = (UnprocessedEvent) ((KV) kv.getValue()).getValue();
            return ((String) this.element.getKey()).equals(kv.getKey()) && ((Long) ((KV) this.element.getValue()).getKey()).equals(((KV) kv.getValue()).getKey()) && ((String) unprocessedEvent.getEvent()).equals(unprocessedEvent2.getEvent()) && unprocessedEvent.getReason() == unprocessedEvent2.getReason() && normalizeExplanation(unprocessedEvent.getExplanation()).equals(normalizeExplanation(unprocessedEvent2.getExplanation()));
        }

        public void describeTo(Description description) {
            description.appendText("Just some text...");
        }

        static String normalizeExplanation(String str) {
            if (str == null) {
                return "";
            }
            String str2 = str.split("\n", 1)[0];
            return str2.contains("Exception") ? str2 : str;
        }
    }

    @Test
    public void testPerfectOrderingProcessing() throws CannotProvideCoderException {
        Event[] eventArr = {Event.create(0L, "id-1", "a"), Event.create(1L, "id-1", "b"), Event.create(2L, "id-1", "c"), Event.create(3L, "id-1", "d"), Event.create(0L, "id-2", "a"), Event.create(1L, "id-2", "b")};
        ArrayList arrayList = new ArrayList();
        arrayList.add(KV.of("id-1", OrderedProcessingStatus.create(3L, 0L, (Long) null, (Long) null, 4L, Arrays.stream(eventArr).filter(event -> {
            return event.getKey().equals("id-1");
        }).count(), 0L, false)));
        arrayList.add(KV.of("id-2", OrderedProcessingStatus.create(1L, 0L, (Long) null, (Long) null, 2L, Arrays.stream(eventArr).filter(event2 -> {
            return event2.getKey().equals("id-2");
        }).count(), 0L, false)));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(KV.of("id-1", "a"));
        arrayList2.add(KV.of("id-1", "ab"));
        arrayList2.add(KV.of("id-1", "abc"));
        arrayList2.add(KV.of("id-1", "abcd"));
        arrayList2.add(KV.of("id-2", "a"));
        arrayList2.add(KV.of("id-2", "ab"));
        testProcessing(eventArr, arrayList, arrayList2, 1, 0L, LARGE_MAX_RESULTS_PER_OUTPUT, false);
    }

    @Test
    public void testOutOfSequenceProcessing() throws CannotProvideCoderException {
        Event[] eventArr = {Event.create(2L, "id-1", "c"), Event.create(1L, "id-1", "b"), Event.create(0L, "id-1", "a"), Event.create(3L, "id-1", "d"), Event.create(1L, "id-2", "b"), Event.create(2L, "id-2", "c"), Event.create(4L, "id-2", "e"), Event.create(0L, "id-2", "a"), Event.create(3L, "id-2", "d")};
        ArrayList arrayList = new ArrayList();
        arrayList.add(KV.of("id-1", OrderedProcessingStatus.create(3L, 0L, (Long) null, (Long) null, 4L, Arrays.stream(eventArr).filter(event -> {
            return event.getKey().equals("id-1");
        }).count(), 0L, false)));
        arrayList.add(KV.of("id-2", OrderedProcessingStatus.create(4L, 0L, (Long) null, (Long) null, 5L, Arrays.stream(eventArr).filter(event2 -> {
            return event2.getKey().equals("id-2");
        }).count(), 0L, false)));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(KV.of("id-1", "a"));
        arrayList2.add(KV.of("id-1", "ab"));
        arrayList2.add(KV.of("id-1", "abc"));
        arrayList2.add(KV.of("id-1", "abcd"));
        arrayList2.add(KV.of("id-2", "a"));
        arrayList2.add(KV.of("id-2", "ab"));
        arrayList2.add(KV.of("id-2", "abc"));
        arrayList2.add(KV.of("id-2", "abcd"));
        arrayList2.add(KV.of("id-2", "abcde"));
        testProcessing(eventArr, arrayList, arrayList2, 1, 0L, LARGE_MAX_RESULTS_PER_OUTPUT, false);
    }

    @Test
    public void testUnfinishedProcessing() throws CannotProvideCoderException {
        Event[] eventArr = {Event.create(2L, "id-1", "c"), Event.create(0L, "id-1", "a"), Event.create(3L, "id-1", "d"), Event.create(0L, "id-2", "a"), Event.create(1L, "id-2", "b")};
        ArrayList arrayList = new ArrayList();
        arrayList.add(KV.of("id-1", OrderedProcessingStatus.create(0L, 2L, 2L, 3L, 3L, 1L, 0L, false)));
        arrayList.add(KV.of("id-2", OrderedProcessingStatus.create(1L, 0L, (Long) null, (Long) null, 2L, 2L, 0L, false)));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(KV.of("id-1", "a"));
        arrayList2.add(KV.of("id-2", "a"));
        arrayList2.add(KV.of("id-2", "ab"));
        testProcessing(eventArr, arrayList, arrayList2, 1, 0L, LARGE_MAX_RESULTS_PER_OUTPUT, false);
    }

    @Test
    public void testHandlingOfDuplicateSequences() throws CannotProvideCoderException {
        Event[] eventArr = {Event.create(3L, "id-1", "d"), Event.create(2L, "id-1", "c"), Event.create(3L, "id-1", "d"), Event.create(3L, "id-1", "d"), Event.create(0L, "id-1", "a"), Event.create(1L, "id-1", "b"), Event.create(1L, "id-1", "b"), Event.create(3L, "id-1", "d")};
        ArrayList arrayList = new ArrayList();
        arrayList.add(KV.of("id-1", OrderedProcessingStatus.create(3L, 0L, (Long) null, (Long) null, eventArr.length, 4, 4, false)));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(KV.of("id-1", "a"));
        arrayList2.add(KV.of("id-1", "ab"));
        arrayList2.add(KV.of("id-1", "abc"));
        arrayList2.add(KV.of("id-1", "abcd"));
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(KV.of("id-1", KV.of(3L, UnprocessedEvent.create("d", UnprocessedEvent.Reason.duplicate))));
        arrayList3.add(KV.of("id-1", KV.of(3L, UnprocessedEvent.create("d", UnprocessedEvent.Reason.duplicate))));
        arrayList3.add(KV.of("id-1", KV.of(1L, UnprocessedEvent.create("b", UnprocessedEvent.Reason.duplicate))));
        arrayList3.add(KV.of("id-1", KV.of(3L, UnprocessedEvent.create("d", UnprocessedEvent.Reason.duplicate))));
        testProcessing(eventArr, arrayList, arrayList2, arrayList3, 1, 0L, LARGE_MAX_RESULTS_PER_OUTPUT, false);
    }

    @Test
    public void testHandlingOfCheckedExceptions() throws CannotProvideCoderException {
        Event[] eventArr = {Event.create(0L, "id-1", "a"), Event.create(1L, "id-1", "b"), Event.create(2L, "id-1", StringBuilderState.BAD_VALUE), Event.create(3L, "id-1", "c")};
        ArrayList arrayList = new ArrayList();
        arrayList.add(KV.of("id-1", OrderedProcessingStatus.create(1L, 1L, 3L, 3L, eventArr.length, 2L, 0L, false)));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(KV.of("id-1", "a"));
        arrayList2.add(KV.of("id-1", "ab"));
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(KV.of("id-1", KV.of(2L, UnprocessedEvent.create(StringBuilderState.BAD_VALUE, UnprocessedEvent.Reason.exception_thrown))));
        testProcessing(eventArr, arrayList, arrayList2, arrayList3, 1, 0L, LARGE_MAX_RESULTS_PER_OUTPUT, false);
    }

    @Test
    public void testProcessingWithEveryOtherResultEmission() throws CannotProvideCoderException {
        Event[] eventArr = {Event.create(2L, "id-1", "c"), Event.create(1L, "id-1", "b"), Event.create(0L, "id-1", "a"), Event.create(3L, "id-1", "d"), Event.create(0L, "id-2", "a"), Event.create(1L, "id-2", "b")};
        ArrayList arrayList = new ArrayList();
        arrayList.add(KV.of("id-1", OrderedProcessingStatus.create(3L, 0L, (Long) null, (Long) null, 4L, 2L, 0L, false)));
        arrayList.add(KV.of("id-2", OrderedProcessingStatus.create(1L, 0L, (Long) null, (Long) null, 2L, 1L, 0L, false)));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(KV.of("id-1", "a"));
        arrayList2.add(KV.of("id-1", "abc"));
        arrayList2.add(KV.of("id-2", "a"));
        testProcessing(eventArr, arrayList, arrayList2, 2, 0L, LARGE_MAX_RESULTS_PER_OUTPUT, false);
    }

    @Test
    public void testLargeBufferedOutputInTimer() throws CannotProvideCoderException {
        long[] jArr = new long[100 * 3];
        for (int i = 0; i < jArr.length - 1; i++) {
            jArr[i] = i + 2;
        }
        jArr[jArr.length - 1] = 1;
        ArrayList arrayList = new ArrayList(jArr.length);
        ArrayList arrayList2 = new ArrayList(jArr.length);
        ArrayList arrayList3 = new ArrayList(jArr.length + 10);
        StringBuilder sb = new StringBuilder();
        int i2 = 0;
        for (long j : jArr) {
            i2++;
            arrayList.add(Event.create(j, "id-1", "."));
            sb.append(".");
            arrayList2.add(KV.of("id-1", sb.toString()));
            if (i2 < jArr.length) {
                arrayList3.add(KV.of("id-1", OrderedProcessingStatus.create((Long) null, i2, 2L, Long.valueOf(j), i2, 0L, 0L, false)));
            }
        }
        int i3 = 100;
        while (true) {
            int i4 = i3;
            if (i4 >= jArr.length) {
                arrayList3.add(KV.of("id-1", OrderedProcessingStatus.create(Long.valueOf(jArr.length), 0L, (Long) null, (Long) null, jArr.length, jArr.length, 0L, false)));
                testProcessing((Event[]) arrayList.toArray(new Event[arrayList.size()]), arrayList3, arrayList2, 1, 1L, 100, true);
                return;
            } else {
                long j2 = i4;
                arrayList3.add(KV.of("id-1", OrderedProcessingStatus.create(Long.valueOf(j2), jArr.length - j2, Long.valueOf(j2 + 1), Long.valueOf(jArr.length), jArr.length, j2, 0L, false)));
                i3 = i4 + 100;
            }
        }
    }

    @Test
    public void testSequenceGapProcessingInBufferedOutput() throws CannotProvideCoderException {
        long[] jArr = {2, 3, 7, 8, 9, 10, 1, 4, 5, 6};
        ArrayList arrayList = new ArrayList(jArr.length);
        ArrayList arrayList2 = new ArrayList(jArr.length);
        StringBuilder sb = new StringBuilder();
        for (long j : jArr) {
            arrayList.add(Event.create(j, "id-1", "."));
            sb.append(".");
            arrayList2.add(KV.of("id-1", sb.toString()));
        }
        Collection<KV<String, OrderedProcessingStatus>> arrayList3 = new ArrayList<>();
        int i = 0 + 1;
        arrayList3.add(KV.of("id-1", OrderedProcessingStatus.create((Long) null, 1L, 2L, 2L, i, 0L, 0L, false)));
        int i2 = i + 1;
        arrayList3.add(KV.of("id-1", OrderedProcessingStatus.create((Long) null, 2L, 2L, 3L, i2, 0L, 0L, false)));
        int i3 = i2 + 1;
        arrayList3.add(KV.of("id-1", OrderedProcessingStatus.create((Long) null, 3L, 2L, 7L, i3, 0L, 0L, false)));
        int i4 = i3 + 1;
        arrayList3.add(KV.of("id-1", OrderedProcessingStatus.create((Long) null, 4L, 2L, 8L, i4, 0L, 0L, false)));
        int i5 = i4 + 1;
        arrayList3.add(KV.of("id-1", OrderedProcessingStatus.create((Long) null, 5L, 2L, 9L, i5, 0L, 0L, false)));
        int i6 = i5 + 1;
        arrayList3.add(KV.of("id-1", OrderedProcessingStatus.create((Long) null, 6L, 2L, 10L, i6, 0L, 0L, false)));
        int i7 = i6 + 1;
        arrayList3.add(KV.of("id-1", OrderedProcessingStatus.create(3L, 4L, 7L, 10L, i7, 3L, 0L, false)));
        int i8 = i7 + 1;
        arrayList3.add(KV.of("id-1", OrderedProcessingStatus.create(4L, 4L, 7L, 10L, i8, 4L, 0L, false)));
        int i9 = i8 + 1;
        arrayList3.add(KV.of("id-1", OrderedProcessingStatus.create(5L, 4L, 7L, 10L, i9, 5L, 0L, false)));
        int i10 = i9 + 1;
        arrayList3.add(KV.of("id-1", OrderedProcessingStatus.create(8L, 2L, 9L, 10L, i10, 8L, 0L, false)));
        arrayList3.add(KV.of("id-1", OrderedProcessingStatus.create(10L, 0L, (Long) null, (Long) null, i10, 10L, 0L, false)));
        testProcessing((Event[]) arrayList.toArray(new Event[arrayList.size()]), arrayList3, arrayList2, 1, 1L, 3, true);
    }

    @Test
    public void testHandlingOfMaxSequenceNumber() throws CannotProvideCoderException {
        Event[] eventArr = {Event.create(0L, "id-1", "a"), Event.create(1L, "id-1", "b"), Event.create(Long.MAX_VALUE, "id-1", "c")};
        ArrayList arrayList = new ArrayList();
        arrayList.add(KV.of("id-1", OrderedProcessingStatus.create(1L, 0L, (Long) null, (Long) null, 3L, 2L, 0L, false)));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(KV.of("id-1", "a"));
        arrayList2.add(KV.of("id-1", "ab"));
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(KV.of("id-1", KV.of(Long.MAX_VALUE, UnprocessedEvent.create("c", UnprocessedEvent.Reason.sequence_id_outside_valid_range))));
        testProcessing(eventArr, arrayList, arrayList2, arrayList3, 1, 0L, LARGE_MAX_RESULTS_PER_OUTPUT, false);
    }

    @Test
    public void testProcessingOfTheLastInput() throws CannotProvideCoderException {
        Event[] eventArr = {Event.create(0L, "id-1", "a"), Event.create(1L, "id-1", "b"), Event.create(2L, "id-1", StringEventExaminer.LAST_INPUT)};
        ArrayList arrayList = new ArrayList();
        arrayList.add(KV.of("id-1", OrderedProcessingStatus.create(2L, 0L, (Long) null, (Long) null, eventArr.length, eventArr.length, 0L, true)));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(KV.of("id-1", "a"));
        arrayList2.add(KV.of("id-1", "ab"));
        arrayList2.add(KV.of("id-1", "abz"));
        testProcessing(eventArr, arrayList, arrayList2, 1, 0L, LARGE_MAX_RESULTS_PER_OUTPUT, false);
    }

    @Test
    public void testWindowedProcessing() throws CannotProvideCoderException {
        Instant instant = new Instant(0L);
        TestStream advanceWatermarkToInfinity = TestStream.create(this.streamingPipeline.getCoderRegistry().getCoder(Event.class)).advanceWatermarkTo(instant).addElements(TimestampedValue.of(Event.create(0L, "id-1", "a"), instant.plus(Duration.standardSeconds(1L))), new TimestampedValue[]{TimestampedValue.of(Event.create(1L, "id-1", "b"), instant.plus(Duration.standardSeconds(2L))), TimestampedValue.of(Event.create(0L, "id-2", "x"), instant.plus(Duration.standardSeconds(1L))), TimestampedValue.of(Event.create(1L, "id-2", "y"), instant.plus(Duration.standardSeconds(2L))), TimestampedValue.of(Event.create(2L, "id-2", StringEventExaminer.LAST_INPUT), instant.plus(Duration.standardSeconds(2L))), TimestampedValue.of(Event.create(0L, "id-1", "c"), instant.plus(Duration.standardSeconds(10L))), TimestampedValue.of(Event.create(1L, "id-1", "d"), instant.plus(Duration.standardSeconds(11L)))}).advanceWatermarkToInfinity();
        TestPipeline testPipeline = this.streamingPipeline;
        PCollection apply = testPipeline.apply("Create Streaming Events", advanceWatermarkToInfinity).apply("To KV", ParDo.of(new MapEventsToKV())).apply("Window input", Window.into(FixedWindows.of(Duration.standardSeconds(5L))));
        StringBufferOrderedProcessingHandler stringBufferOrderedProcessingHandler = new StringBufferOrderedProcessingHandler(1, 0L);
        stringBufferOrderedProcessingHandler.setMaxOutputElementsPerBundle(LARGE_MAX_RESULTS_PER_OUTPUT);
        stringBufferOrderedProcessingHandler.setStatusUpdateFrequency(null);
        stringBufferOrderedProcessingHandler.setProduceStatusUpdateOnEveryEvent(true);
        OrderedEventProcessorResult apply2 = apply.apply("Process Events", OrderedEventProcessor.create(stringBufferOrderedProcessingHandler));
        IntervalWindow intervalWindow = new IntervalWindow(instant, instant.plus(Duration.standardSeconds(5L)));
        PAssert.that("Output matches in window 1", apply2.output()).inWindow(intervalWindow).containsInAnyOrder(new KV[]{KV.of("id-1", "a"), KV.of("id-1", "ab"), KV.of("id-2", "x"), KV.of("id-2", "xy"), KV.of("id-2", "xyz")});
        IntervalWindow intervalWindow2 = new IntervalWindow(instant.plus(Duration.standardSeconds(10L)), instant.plus(Duration.standardSeconds(15L)));
        PAssert.that("Output matches in window 2", apply2.output()).inWindow(intervalWindow2).containsInAnyOrder(new KV[]{KV.of("id-1", "c"), KV.of("id-1", "cd")});
        PAssert.that("Statuses match in window 1", apply2.processingStatuses()).inWindow(intervalWindow).containsInAnyOrder(new KV[]{KV.of("id-1", OrderedProcessingStatus.create(0L, 0L, (Long) null, (Long) null, 1L, 1L, 0L, false)), KV.of("id-1", OrderedProcessingStatus.create(1L, 0L, (Long) null, (Long) null, 2L, 2L, 0L, false)), KV.of("id-2", OrderedProcessingStatus.create(0L, 0L, (Long) null, (Long) null, 1L, 1L, 0L, false)), KV.of("id-2", OrderedProcessingStatus.create(1L, 0L, (Long) null, (Long) null, 2L, 2L, 0L, false)), KV.of("id-2", OrderedProcessingStatus.create(2L, 0L, (Long) null, (Long) null, 3L, 3L, 0L, false))});
        PAssert.that("Statuses match in window 2", apply2.processingStatuses()).inWindow(intervalWindow2).containsInAnyOrder(new KV[]{KV.of("id-1", OrderedProcessingStatus.create(0L, 0L, (Long) null, (Long) null, 1L, 1L, 0L, false)), KV.of("id-1", OrderedProcessingStatus.create(1L, 0L, (Long) null, (Long) null, 2L, 2L, 0L, false))});
        PAssert.that("Unprocessed events match", apply2.unprocessedEvents()).containsInAnyOrder(NO_EXPECTED_DLQ_EVENTS);
        testPipeline.run();
    }

    private void testProcessing(Event[] eventArr, Collection<KV<String, OrderedProcessingStatus>> collection, Collection<KV<String, String>> collection2, int i, long j, int i2, boolean z) throws CannotProvideCoderException {
        testProcessing(eventArr, collection, collection2, NO_EXPECTED_DLQ_EVENTS, i, j, i2, z);
    }

    private void testProcessing(Event[] eventArr, Collection<KV<String, OrderedProcessingStatus>> collection, Collection<KV<String, String>> collection2, Collection<KV<String, KV<Long, UnprocessedEvent<String>>>> collection3, int i, long j, int i2, boolean z) throws CannotProvideCoderException {
        doTest(eventArr, collection, collection2, collection3, i, j, i2, z, true);
        doTest(eventArr, collection, collection2, collection3, i, j, i2, z, false);
    }

    private void doTest(Event[] eventArr, Collection<KV<String, OrderedProcessingStatus>> collection, Collection<KV<String, String>> collection2, Collection<KV<String, KV<Long, UnprocessedEvent<String>>>> collection3, int i, long j, int i2, boolean z, boolean z2) throws CannotProvideCoderException {
        TestPipeline testPipeline = z2 ? this.streamingPipeline : this.batchPipeline;
        PCollection apply = (z2 ? createStreamingPCollection(testPipeline, eventArr) : createBatchPCollection(testPipeline, eventArr)).apply("To KV", ParDo.of(new MapEventsToKV()));
        StringBufferOrderedProcessingHandler stringBufferOrderedProcessingHandler = new StringBufferOrderedProcessingHandler(i, j);
        stringBufferOrderedProcessingHandler.setMaxOutputElementsPerBundle(i2);
        if (z) {
            stringBufferOrderedProcessingHandler.setProduceStatusUpdateOnEveryEvent(true);
            stringBufferOrderedProcessingHandler.setStatusUpdateFrequency(null);
        } else {
            stringBufferOrderedProcessingHandler.setStatusUpdateFrequency(z2 ? Duration.standardMinutes(5L) : Duration.standardSeconds(1L));
        }
        OrderedEventProcessorResult apply2 = apply.apply("Process Events", OrderedEventProcessor.create(stringBufferOrderedProcessingHandler));
        PAssert.that("Output matches", apply2.output()).containsInAnyOrder(collection2);
        if (z2) {
            PAssert.that("Statuses match", apply2.processingStatuses()).containsInAnyOrder(collection);
        }
        boolean z3 = false;
        Iterator<KV<String, KV<Long, UnprocessedEvent<String>>>> it = collection3.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            } else if (((UnprocessedEvent) ((KV) it.next().getValue()).getValue()).getReason() == UnprocessedEvent.Reason.exception_thrown) {
                z3 = true;
                break;
            }
        }
        if (z3) {
            PAssert.thatSingleton("Unprocessed event count", apply2.unprocessedEvents().apply("Window", Window.into(new GlobalWindows()).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).discardingFiredPanes()).apply("Count", Count.globally())).isEqualTo(Long.valueOf(collection3.size()));
        } else {
            PAssert.that("Unprocessed events match", apply2.unprocessedEvents()).containsInAnyOrder(collection3);
        }
        testPipeline.run();
    }

    private PCollection<Event> createBatchPCollection(Pipeline pipeline, Event[] eventArr) {
        return pipeline.apply("Create Batch Events", Create.of(Arrays.asList(eventArr))).apply("Reshuffle", Reshuffle.viaRandomKey());
    }

    private PCollection<Event> createStreamingPCollection(Pipeline pipeline, Event[] eventArr) throws CannotProvideCoderException {
        Instant minus = Instant.now().minus(Duration.standardMinutes(20L));
        TestStream.Builder advanceWatermarkTo = TestStream.create(pipeline.getCoderRegistry().getCoder(Event.class)).advanceWatermarkTo(minus);
        int i = 0;
        for (Event event : eventArr) {
            i++;
            advanceWatermarkTo = advanceWatermarkTo.advanceWatermarkTo(minus.plus(Duration.millis(i))).addElements(event, new Event[0]);
        }
        return pipeline.apply("Create Streaming Events", advanceWatermarkTo.advanceProcessingTime(Duration.standardMinutes(15L)).advanceWatermarkToInfinity());
    }
}
