package org.apache.beam.runners.direct;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/MultiStepCombineTest.class */
public class MultiStepCombineTest implements Serializable {

    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    private transient KvCoder<String, Long> combinedCoder = KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of());

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/direct/MultiStepCombineTest$MultiStepAccumulator.class */
    public static abstract class MultiStepAccumulator {
        /* JADX INFO: Access modifiers changed from: private */
        public static MultiStepAccumulator of(long j, boolean z) {
            return new AutoValue_MultiStepCombineTest_MultiStepAccumulator(j, z);
        }

        MultiStepAccumulator merge(MultiStepAccumulator multiStepAccumulator) {
            return of(getValue() + multiStepAccumulator.getValue(), isDeserialized() || multiStepAccumulator.isDeserialized());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long getValue();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract boolean isDeserialized();
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/MultiStepCombineTest$MultiStepAccumulatorCoder.class */
    private static class MultiStepAccumulatorCoder extends CustomCoder<MultiStepAccumulator> {
        private MultiStepAccumulatorCoder() {
        }

        public void encode(MultiStepAccumulator multiStepAccumulator, OutputStream outputStream) throws CoderException, IOException {
            VarInt.encode(multiStepAccumulator.getValue(), outputStream);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public MultiStepAccumulator m219decode(InputStream inputStream) throws CoderException, IOException {
            return MultiStepAccumulator.of(VarInt.decodeLong(inputStream), true);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/MultiStepCombineTest$MultiStepCombineFn.class */
    private static class MultiStepCombineFn extends Combine.CombineFn<Long, MultiStepAccumulator, Long> {
        private MultiStepCombineFn() {
        }

        public Coder<MultiStepAccumulator> getAccumulatorCoder(CoderRegistry coderRegistry, Coder<Long> coder) throws CannotProvideCoderException {
            return new MultiStepAccumulatorCoder();
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public MultiStepAccumulator m221createAccumulator() {
            return MultiStepAccumulator.of(0L, false);
        }

        public MultiStepAccumulator addInput(MultiStepAccumulator multiStepAccumulator, Long l) {
            return MultiStepAccumulator.of(multiStepAccumulator.getValue() + l.longValue(), multiStepAccumulator.isDeserialized());
        }

        public MultiStepAccumulator mergeAccumulators(Iterable<MultiStepAccumulator> iterable) {
            MultiStepAccumulator of = MultiStepAccumulator.of(0L, false);
            Iterator<MultiStepAccumulator> it = iterable.iterator();
            while (it.hasNext()) {
                of = of.merge(it.next());
            }
            return of;
        }

        public Long extractOutput(MultiStepAccumulator multiStepAccumulator) {
            MatcherAssert.assertThat("Accumulators should have been serialized and deserialized within the Pipeline", Boolean.valueOf(multiStepAccumulator.isDeserialized()), Matchers.is(true));
            return Long.valueOf(multiStepAccumulator.getValue());
        }

        /* renamed from: mergeAccumulators, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m220mergeAccumulators(Iterable iterable) {
            return mergeAccumulators((Iterable<MultiStepAccumulator>) iterable);
        }
    }

    @Test
    public void testMultiStepCombine() {
        PAssert.that(this.pipeline.apply(Create.of(KV.of("foo", 1L), new KV[]{KV.of("bar", 2L), KV.of("bizzle", 3L), KV.of("bar", 4L), KV.of("bizzle", 11L)})).apply(Combine.perKey(new MultiStepCombineFn()))).containsInAnyOrder(new KV[]{KV.of("foo", 1L), KV.of("bar", 6L), KV.of("bizzle", 14L)});
        this.pipeline.run();
    }

    @Test
    public void testMultiStepCombineWindowed() {
        PCollection apply = this.pipeline.apply(Create.timestamped(TimestampedValue.of(KV.of("foo", 1L), new Instant(1L)), new TimestampedValue[]{TimestampedValue.of(KV.of("bar", 2L), new Instant(2L)), TimestampedValue.of(KV.of("bizzle", 3L), new Instant(3L)), TimestampedValue.of(KV.of("bar", 4L), new Instant(4L)), TimestampedValue.of(KV.of("bizzle", 11L), new Instant(11L))})).apply(Window.into(SlidingWindows.of(Duration.millis(6L)).every(Duration.millis(3L)))).apply(Combine.perKey(new MultiStepCombineFn()));
        PAssert.that("Windows should combine only elements in their windows", apply).inWindow(new IntervalWindow(new Instant(0L), Duration.millis(6L))).containsInAnyOrder(new KV[]{KV.of("foo", 1L), KV.of("bar", 6L), KV.of("bizzle", 3L)});
        PAssert.that("Elements should appear in all the windows they are assigned to", apply).inWindow(new IntervalWindow(new Instant(-3L), Duration.millis(6L))).containsInAnyOrder(new KV[]{KV.of("foo", 1L), KV.of("bar", 2L)});
        PAssert.that(apply).inWindow(new IntervalWindow(new Instant(6L), Duration.millis(6L))).containsInAnyOrder(new KV[]{KV.of("bizzle", 11L)});
        PAssert.that(apply).containsInAnyOrder(new KV[]{KV.of("foo", 1L), KV.of("foo", 1L), KV.of("bar", 6L), KV.of("bar", 2L), KV.of("bar", 4L), KV.of("bizzle", 11L), KV.of("bizzle", 11L), KV.of("bizzle", 3L), KV.of("bizzle", 3L)});
        this.pipeline.run();
    }

    @Test
    public void testMultiStepCombineTimestampCombiner() {
        TimestampCombiner timestampCombiner = TimestampCombiner.LATEST;
        this.combinedCoder = KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of());
        PAssert.that(this.pipeline.apply(Create.timestamped(TimestampedValue.of(KV.of("foo", 4L), new Instant(1L)), new TimestampedValue[]{TimestampedValue.of(KV.of("foo", 1L), new Instant(4L)), TimestampedValue.of(KV.of("bazzle", 4L), new Instant(4L)), TimestampedValue.of(KV.of("foo", 12L), new Instant(12L))})).apply(Window.into(FixedWindows.of(Duration.millis(5L))).withTimestampCombiner(timestampCombiner)).apply(Combine.perKey(new MultiStepCombineFn())).apply(ParDo.of(new DoFn<KV<String, Long>, KV<String, TimestampedValue<Long>>>() { // from class: org.apache.beam.runners.direct.MultiStepCombineTest.1
            @DoFn.ProcessElement
            public void reifyTimestamp(DoFn<KV<String, Long>, KV<String, TimestampedValue<Long>>>.ProcessContext processContext) {
                processContext.output(KV.of((String) ((KV) processContext.element()).getKey(), TimestampedValue.of((Long) ((KV) processContext.element()).getValue(), processContext.timestamp())));
            }
        }))).containsInAnyOrder(new KV[]{KV.of("foo", TimestampedValue.of(5L, new Instant(4L))), KV.of("bazzle", TimestampedValue.of(4L, new Instant(4L))), KV.of("foo", TimestampedValue.of(12L, new Instant(12L)))});
        this.pipeline.run();
    }
}
