/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.kafka.CustomTimestampPolicyWithLimitedDelay;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaTimestampType;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(value=JUnit4.class)
public class CustomTimestampPolicyWithLimitedDelayTest {
    private static List<Long> getTimestampsForRecords(TimestampPolicy<String, String> policy, Instant now, List<Long> timestampOffsets) {
        return timestampOffsets.stream().map(ts -> {
            Instant result = policy.getTimestampForRecord(null, new KafkaRecord("topic", 0, 0L, now.getMillis() + ts, KafkaTimestampType.CREATE_TIME, (Headers)new RecordHeaders(), (Object)"key", (Object)"value"));
            return result.getMillis() - now.getMillis();
        }).collect(Collectors.toList());
    }

    @Test
    public void testCustomTimestampPolicyWithLimitedDelay() {
        Duration maxDelay = Duration.standardSeconds((long)60L);
        CustomTimestampPolicyWithLimitedDelay policy = new CustomTimestampPolicyWithLimitedDelay((SerializableFunction & Serializable)record -> new Instant(record.getTimestamp()), maxDelay, Optional.empty());
        Instant now = Instant.now();
        TimestampPolicy.PartitionContext ctx = (TimestampPolicy.PartitionContext)Mockito.mock(TimestampPolicy.PartitionContext.class);
        Mockito.when((Object)ctx.getMessageBacklog()).thenReturn((Object)100L);
        Mockito.when((Object)ctx.getBacklogCheckTime()).thenReturn((Object)now);
        MatcherAssert.assertThat((Object)policy.getWatermark(ctx), (Matcher)Is.is((Object)BoundedWindow.TIMESTAMP_MIN_VALUE));
        ImmutableList input = ImmutableList.of((Object)-200000L, (Object)-150000L, (Object)-120000L, (Object)-140000L, (Object)-100000L, (Object)-110000L);
        MatcherAssert.assertThat(CustomTimestampPolicyWithLimitedDelayTest.getTimestampsForRecords((TimestampPolicy<String, String>)policy, now, (List<Long>)input), (Matcher)Is.is((Object)input));
        MatcherAssert.assertThat((Object)policy.getWatermark(ctx), (Matcher)Is.is((Object)now.minus((ReadableDuration)Duration.standardSeconds((long)100L)).minus((ReadableDuration)maxDelay)));
        input = ImmutableList.of((Object)-200000L, (Object)-150000L, (Object)-120000L, (Object)-140000L, (Object)100000L, (Object)-100000L, (Object)-110000L);
        MatcherAssert.assertThat(CustomTimestampPolicyWithLimitedDelayTest.getTimestampsForRecords((TimestampPolicy<String, String>)policy, now, (List<Long>)input), (Matcher)Is.is((Object)input));
        MatcherAssert.assertThat((Object)policy.getWatermark(ctx, now), (Matcher)Is.is((Object)now.minus((ReadableDuration)maxDelay)));
        now = now.plus((ReadableDuration)Duration.standardMinutes((long)5L));
        Instant backlogCheckTime = now.minus((ReadableDuration)Duration.standardSeconds((long)10L));
        Mockito.when((Object)ctx.getMessageBacklog()).thenReturn((Object)0L);
        Mockito.when((Object)ctx.getBacklogCheckTime()).thenReturn((Object)backlogCheckTime);
        MatcherAssert.assertThat((Object)policy.getWatermark(ctx, now), (Matcher)Is.is((Object)backlogCheckTime.minus((ReadableDuration)maxDelay)));
    }
}

