package org.apache.beam.sdk.io.kafka;

import java.util.Optional;
import org.apache.beam.repackaged.beam_sdks_java_io_kafka.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.class */
public class CustomTimestampPolicyWithLimitedDelay<K, V> extends TimestampPolicy<K, V> {
    private final Duration maxDelay;
    private final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction;
    private Instant maxEventTimestamp;

    public CustomTimestampPolicyWithLimitedDelay(SerializableFunction<KafkaRecord<K, V>, Instant> serializableFunction, Duration duration, Optional<Instant> optional) {
        this.maxDelay = duration;
        this.timestampFunction = serializableFunction;
        this.maxEventTimestamp = optional.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE).plus(duration);
    }

    @Override // org.apache.beam.sdk.io.kafka.TimestampPolicy
    public Instant getTimestampForRecord(TimestampPolicy.PartitionContext partitionContext, KafkaRecord<K, V> kafkaRecord) {
        Instant instant = (Instant) this.timestampFunction.apply(kafkaRecord);
        if (instant.isAfter(this.maxEventTimestamp)) {
            this.maxEventTimestamp = instant;
        }
        return instant;
    }

    @Override // org.apache.beam.sdk.io.kafka.TimestampPolicy
    public Instant getWatermark(TimestampPolicy.PartitionContext partitionContext) {
        return getWatermark(partitionContext, Instant.now());
    }

    @VisibleForTesting
    Instant getWatermark(TimestampPolicy.PartitionContext partitionContext, Instant instant) {
        return this.maxEventTimestamp.isAfter(instant) ? instant.minus(this.maxDelay) : (partitionContext.getMessageBacklog() == 0 && partitionContext.getBacklogCheckTime().minus(this.maxDelay).isAfter(this.maxEventTimestamp) && this.maxEventTimestamp.getMillis() > 0) ? partitionContext.getBacklogCheckTime().minus(this.maxDelay) : this.maxEventTimestamp.minus(this.maxDelay);
    }
}
