package org.apache.samza.test.integration.join;

import org.apache.samza.context.Context;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.task.InitableTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.WindowableTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/test/integration/join/Watcher.class */
public class Watcher implements StreamTask, WindowableTask, InitableTask {
    private static Logger logger = LoggerFactory.getLogger(Watcher.class);
    private long maxTimeBetweenEpochsMs;
    private boolean inError = false;
    private long lastEpochChange = System.currentTimeMillis();
    private int currentEpoch = 0;

    public void init(Context context) {
        this.maxTimeBetweenEpochsMs = context.getJobContext().getConfig().getLong("max.time.between.epochs.ms");
    }

    public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        int parseInt = Integer.parseInt((String) incomingMessageEnvelope.getMessage());
        if (parseInt > this.currentEpoch) {
            logger.info("Epoch changed to " + parseInt + " from " + this.currentEpoch);
            this.currentEpoch = parseInt;
            this.lastEpochChange = System.currentTimeMillis();
            this.inError = false;
        }
    }

    public void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        boolean z = System.currentTimeMillis() - this.lastEpochChange > this.maxTimeBetweenEpochsMs;
        if (this.inError || !z) {
            return;
        }
        this.inError = true;
        logger.info("Error state detected, alerting...");
        logger.error("Job failed to make progress!" + String.format("No epoch change for %d minutes.", Long.valueOf(this.maxTimeBetweenEpochsMs / 60000)));
    }
}
