/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.cloud.pubsublite.proto.AttributeValues;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.io.Serializable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.gcp.pubsublite.UuidDeduplicationOptions;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.Uuid;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.UuidDeduplicationTransform;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Deduplicate;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class UuidDeduplicationTransformTest {
    @Rule
    public final TestPipeline pipeline = TestPipeline.create();
    private static final Instant START = new Instant(0L);

    private static SequencedMessage newMessage() {
        Uuid uuid = Uuid.random();
        return SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().putAttributes("x-goog-pubsublite-dataflow-uuid", AttributeValues.newBuilder().addValues(uuid.value()).build())).setSizeBytes(10000L).setPublishTime(Timestamps.EPOCH).setCursor(Cursor.newBuilder().setOffset(10L)).build();
    }

    @Test
    public void unrelatedUuidsProxied() {
        SequencedMessage message1 = UuidDeduplicationTransformTest.newMessage();
        SequencedMessage message2 = UuidDeduplicationTransformTest.newMessage();
        TestStream messageStream = TestStream.create((Coder)ProtoCoder.of(SequencedMessage.class)).advanceWatermarkTo(START).addElements((Object)message1, (Object[])new SequencedMessage[0]).advanceWatermarkTo(START.plus((ReadableDuration)Deduplicate.DEFAULT_DURATION.dividedBy(2L))).addElements((Object)message2, (Object[])new SequencedMessage[0]).advanceWatermarkToInfinity();
        PCollection results = (PCollection)((PCollection)this.pipeline.apply((PTransform)messageStream)).apply((PTransform)new UuidDeduplicationTransform(UuidDeduplicationOptions.newBuilder().build()));
        PAssert.that((PCollection)results).containsInAnyOrder((Object[])new SequencedMessage[]{message1, message2});
        this.pipeline.run();
    }

    @Test
    public void sameUuidsWithinWindowOnlyOne() {
        SequencedMessage message = UuidDeduplicationTransformTest.newMessage();
        TestStream messageStream = TestStream.create((Coder)ProtoCoder.of(SequencedMessage.class)).advanceWatermarkTo(START).addElements((Object)message, (Object[])new SequencedMessage[0]).advanceWatermarkTo(START.plus((ReadableDuration)Deduplicate.DEFAULT_DURATION.dividedBy(2L))).advanceWatermarkToInfinity();
        PCollection results = (PCollection)((PCollection)this.pipeline.apply((PTransform)messageStream)).apply((PTransform)new UuidDeduplicationTransform(UuidDeduplicationOptions.newBuilder().build()));
        PAssert.that((PCollection)results).containsInAnyOrder((Object[])new SequencedMessage[]{message});
        this.pipeline.run();
    }

    @Test
    public void sameUuidsAfterGcOutsideWindowHasBoth() {
        SequencedMessage message1 = UuidDeduplicationTransformTest.newMessage();
        TestStream messageStream = TestStream.create((Coder)ProtoCoder.of(SequencedMessage.class)).advanceWatermarkTo(START).addElements((Object)message1, (Object[])new SequencedMessage[0]).advanceWatermarkTo(START.plus((ReadableDuration)UuidDeduplicationOptions.DEFAULT_DEDUPLICATE_DURATION.plus((ReadableDuration)Duration.millis((long)1L)))).addElements((Object)message1, (Object[])new SequencedMessage[0]).advanceWatermarkToInfinity();
        PCollection results = (PCollection)((PCollection)this.pipeline.apply((PTransform)messageStream)).apply((PTransform)new UuidDeduplicationTransform(UuidDeduplicationOptions.newBuilder().build()));
        PAssert.that((PCollection)results).containsInAnyOrder((Object[])new SequencedMessage[]{message1, message1});
        this.pipeline.run();
    }

    @Test
    public void dedupesBasedOnReturnedUuid() {
        byte[] bytes = new byte[]{35, 86};
        SequencedMessage message1 = UuidDeduplicationTransformTest.newMessage();
        SequencedMessage message2 = UuidDeduplicationTransformTest.newMessage();
        TestStream messageStream = TestStream.create((Coder)ProtoCoder.of(SequencedMessage.class)).advanceWatermarkTo(START).addElements((Object)message1, (Object[])new SequencedMessage[]{message2}).advanceWatermarkToInfinity();
        PCollection results = (PCollection)((PCollection)this.pipeline.apply((PTransform)messageStream)).apply((PTransform)new UuidDeduplicationTransform(UuidDeduplicationOptions.newBuilder().setUuidExtractor((SerializableFunction & Serializable)message -> Uuid.of((ByteString)ByteString.copyFrom((byte[])bytes))).build()));
        PAssert.that((PCollection)results).satisfies((SerializableFunction & Serializable)messages -> {
            Preconditions.checkArgument((Iterables.size((Iterable)messages) == 1 ? 1 : 0) != 0);
            return null;
        });
        this.pipeline.run();
    }
}

