package org.apache.beam.runners.direct;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.MutableDateTime;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/SplittableDoFnTest.class */
public class SplittableDoFnTest {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/SplittableDoFnTest$OffsetRange.class */
    public static class OffsetRange implements Serializable {
        public final int from;
        public final int to;

        OffsetRange(int i, int i2) {
            this.from = i;
            this.to = i2;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/SplittableDoFnTest$OffsetRangeTracker.class */
    private static class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
        private OffsetRange range;
        private Integer lastClaimedIndex = null;

        OffsetRangeTracker(OffsetRange offsetRange) {
            this.range = (OffsetRange) Preconditions.checkNotNull(offsetRange);
        }

        /* renamed from: currentRestriction, reason: merged with bridge method [inline-methods] */
        public OffsetRange m14currentRestriction() {
            return this.range;
        }

        /* renamed from: checkpoint, reason: merged with bridge method [inline-methods] */
        public OffsetRange m13checkpoint() {
            if (this.lastClaimedIndex == null) {
                OffsetRange offsetRange = this.range;
                this.range = new OffsetRange(this.range.from, this.range.from);
                return offsetRange;
            }
            OffsetRange offsetRange2 = new OffsetRange(this.lastClaimedIndex.intValue() + 1, this.range.to);
            this.range = new OffsetRange(this.range.from, this.lastClaimedIndex.intValue() + 1);
            return offsetRange2;
        }

        boolean tryClaim(int i) {
            Preconditions.checkState(this.lastClaimedIndex == null || i > this.lastClaimedIndex.intValue());
            if (i >= this.range.to) {
                return false;
            }
            this.lastClaimedIndex = Integer.valueOf(i);
            return true;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/SplittableDoFnTest$PairStringWithIndexToLength.class */
    static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
        PairStringWithIndexToLength() {
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation process(DoFn<String, KV<String, Integer>>.ProcessContext processContext, OffsetRangeTracker offsetRangeTracker) {
            for (int i = offsetRangeTracker.m14currentRestriction().from; offsetRangeTracker.tryClaim(i); i++) {
                processContext.output(KV.of(processContext.element(), Integer.valueOf(i)));
                if (i % 3 == 0) {
                    return DoFn.ProcessContinuation.resume();
                }
            }
            return DoFn.ProcessContinuation.stop();
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRange(String str) {
            return new OffsetRange(0, str.length());
        }

        @DoFn.SplitRestriction
        public void splitRange(String str, OffsetRange offsetRange, DoFn.OutputReceiver<OffsetRange> outputReceiver) {
            outputReceiver.output(new OffsetRange(offsetRange.from, (offsetRange.from + offsetRange.to) / 2));
            outputReceiver.output(new OffsetRange((offsetRange.from + offsetRange.to) / 2, offsetRange.to));
        }

        @DoFn.NewTracker
        public OffsetRangeTracker newTracker(OffsetRange offsetRange) {
            return new OffsetRangeTracker(offsetRange);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/SplittableDoFnTest$ReifyTimestampsFn.class */
    private static class ReifyTimestampsFn<T> extends DoFn<T, TimestampedValue<T>> {
        private ReifyTimestampsFn() {
        }

        @DoFn.ProcessElement
        public void process(DoFn<T, TimestampedValue<T>>.ProcessContext processContext) {
            processContext.output(TimestampedValue.of(processContext.element(), processContext.timestamp()));
        }
    }

    @Test
    public void testPairWithIndexBasic() throws ClassNotFoundException {
        TestPipeline create = TestPipeline.create();
        create.getOptions().setRunner(DirectRunner.class);
        PAssert.that(create.apply(Create.of(new String[]{"a", "bb", "ccccc"})).apply(ParDo.of(new PairStringWithIndexToLength())).setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))).containsInAnyOrder(Arrays.asList(KV.of("a", 0), KV.of("bb", 0), KV.of("bb", 1), KV.of("ccccc", 0), KV.of("ccccc", 1), KV.of("ccccc", 2), KV.of("ccccc", 3), KV.of("ccccc", 4)));
        create.run();
    }

    @Test
    public void testPairWithIndexWindowedTimestamped() throws ClassNotFoundException {
        TestPipeline create = TestPipeline.create();
        create.getOptions().setRunner(DirectRunner.class);
        MutableDateTime mutableDateTime = Instant.now().toMutableDateTime();
        mutableDateTime.setMillisOfSecond(0);
        Instant instant = mutableDateTime.toInstant();
        Instant plus = instant.plus(Duration.standardSeconds(1L));
        Instant plus2 = instant.plus(Duration.standardSeconds(2L));
        SlidingWindows every = SlidingWindows.of(Duration.standardSeconds(5L)).every(Duration.standardSeconds(1L));
        PCollection coder = create.apply(Create.timestamped(new TimestampedValue[]{TimestampedValue.of("a", instant), TimestampedValue.of("bb", plus), TimestampedValue.of("ccccc", plus2)})).apply(Window.into(every)).apply(ParDo.of(new PairStringWithIndexToLength())).setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
        Assert.assertEquals(every, coder.getWindowingStrategy().getWindowFn());
        PCollection apply = coder.apply("Reify timestamps", ParDo.of(new ReifyTimestampsFn()));
        for (int i = 0; i < 4; i++) {
            Instant minus = instant.minus(Duration.standardSeconds(i));
            IntervalWindow intervalWindow = new IntervalWindow(minus, minus.plus(Duration.standardSeconds(5L)));
            List<TimestampedValue> asList = Arrays.asList(TimestampedValue.of(KV.of("a", 0), instant), TimestampedValue.of(KV.of("bb", 0), plus), TimestampedValue.of(KV.of("bb", 1), plus), TimestampedValue.of(KV.of("ccccc", 0), plus2), TimestampedValue.of(KV.of("ccccc", 1), plus2), TimestampedValue.of(KV.of("ccccc", 2), plus2), TimestampedValue.of(KV.of("ccccc", 3), plus2), TimestampedValue.of(KV.of("ccccc", 4), plus2));
            ArrayList arrayList = new ArrayList();
            for (TimestampedValue timestampedValue : asList) {
                if (!intervalWindow.start().isAfter(timestampedValue.getTimestamp()) && !timestampedValue.getTimestamp().isAfter(intervalWindow.maxTimestamp())) {
                    arrayList.add(timestampedValue);
                }
            }
            Assert.assertFalse(arrayList.isEmpty());
            PAssert.that(apply).inWindow(intervalWindow).containsInAnyOrder(arrayList);
        }
        create.run();
    }
}
