/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar.internal;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicState;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;

@Internal
public final class PulsarTopicPartitionStateWithWatermarkGenerator<T>
extends PulsarTopicState<T> {
    private final TimestampAssigner<T> timestampAssigner;
    private final WatermarkGenerator<T> watermarkGenerator;
    private final WatermarkOutput immediateOutput;
    private final WatermarkOutput deferredOutput;

    public PulsarTopicPartitionStateWithWatermarkGenerator(TopicRange topicRange, PulsarTopicState<T> topicState, TimestampAssigner<T> timestampAssigner, WatermarkGenerator<T> watermarkGenerator, WatermarkOutput immediateOutput, WatermarkOutput deferredOutput) {
        super(topicRange);
        this.timestampAssigner = timestampAssigner;
        this.watermarkGenerator = watermarkGenerator;
        this.immediateOutput = immediateOutput;
        this.deferredOutput = deferredOutput;
    }

    @Override
    public long extractTimestamp(T record, long pulsarEventTimestamp) {
        return this.timestampAssigner.extractTimestamp(record, pulsarEventTimestamp);
    }

    @Override
    public void onEvent(T event, long timestamp) {
        this.watermarkGenerator.onEvent(event, timestamp, this.immediateOutput);
    }

    @Override
    public void onPeriodicEmit() {
        this.watermarkGenerator.onPeriodicEmit(this.deferredOutput);
    }

    @Override
    public String toString() {
        return "PulsarTopicPartitionStateWithPeriodicWatermarks: partition=" + this.getTopicRange() + ", offset=" + this.getOffset();
    }
}

