package org.apache.samza.test.integration.join;

import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.InitableTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.WindowableTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/test/integration/join/Checker.class */
public class Checker implements StreamTask, WindowableTask, InitableTask {
    private static Logger logger = LoggerFactory.getLogger(Checker.class);
    private static final String CURRENT_EPOCH = "current-epoch";
    private KeyValueStore<String, String> store;
    private int expectedKeys;
    private int numPartitions;

    public void init(Context context) {
        this.store = context.getTaskContext().getStore("checker-state");
        this.expectedKeys = context.getJobContext().getConfig().getInt("expected.keys");
        this.numPartitions = context.getJobContext().getConfig().getInt("num.partitions");
    }

    public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        String str = (String) incomingMessageEnvelope.getKey();
        String str2 = (String) incomingMessageEnvelope.getMessage();
        logger.info("Got key=" + str + ", epoch = " + str2 + " in checker...");
        checkEpoch(str2);
        this.store.put(str, str2);
    }

    public void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        String str = (String) this.store.get(CURRENT_EPOCH);
        logger.info("Checking if epoch " + str + " is complete.");
        int i = 0;
        KeyValueIterator all = this.store.all();
        while (all.hasNext()) {
            String str2 = (String) ((Entry) all.next()).getValue();
            if (str2.equals(str)) {
                i++;
            } else {
                logger.info("####### Found a different epoch! - " + str2 + " Current epoch is " + str);
            }
        }
        all.close();
        if (i != this.expectedKeys + 1) {
            if (i > this.expectedKeys + 1) {
                throw new IllegalStateException("Got " + i + " keys, which is more than the expected " + (this.expectedKeys + 1));
            }
            logger.info("Only found " + i + " valid keys, try again later.");
            return;
        }
        logger.info("Epoch " + str + " is complete.");
        int parseInt = Integer.parseInt(str) + 1;
        for (int i2 = 0; i2 < this.numPartitions; i2++) {
            logger.info("Emitting next epoch - " + Integer.toString(i2) + " -> " + Integer.toString(parseInt));
            messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "epoch"), Integer.toString(i2), Integer.toString(parseInt)));
        }
        this.store.put(CURRENT_EPOCH, Integer.toString(parseInt));
    }

    private void checkEpoch(String str) {
        String str2 = (String) this.store.get(CURRENT_EPOCH);
        if (str2 == null) {
            this.store.put(CURRENT_EPOCH, str);
            return;
        }
        int parseInt = Integer.parseInt(str2);
        int parseInt2 = Integer.parseInt(str);
        if (parseInt2 > parseInt) {
            throw new IllegalArgumentException("Got epoch " + str + " but have not yet completed " + str2);
        }
        if (parseInt2 < parseInt) {
            logger.info("#### Ignoring received epoch = " + str + " less than what is in store " + str2);
        }
    }
}
