package org.apache.samza.test.integration.join;

import org.apache.samza.config.Config;
import org.apache.samza.container.TaskName;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.InitableTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.WindowableTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/test/integration/join/Emitter.class */
public class Emitter implements StreamTask, InitableTask, WindowableTask {
    private static Logger logger = LoggerFactory.getLogger(Emitter.class);
    private static final String EPOCH = "the-epoch";
    private static final String COUNT = "the-count";
    private KeyValueStore<String, String> state;
    private int max;
    private TaskName taskName;

    public void init(Config config, TaskContext taskContext) {
        this.state = (KeyValueStore) taskContext.getStore("emitter-state");
        this.taskName = taskContext.getTaskName();
        this.max = config.getInt("count");
    }

    public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        if (incomingMessageEnvelope.getSystemStreamPartition().getStream().equals("epoch")) {
            int parseInt = Integer.parseInt((String) incomingMessageEnvelope.getMessage());
            logger.info("New epoch in message - " + parseInt);
            Integer num = getInt(EPOCH);
            if (num == null || parseInt == num.intValue()) {
                return;
            }
            if (parseInt < num.intValue()) {
                throw new IllegalArgumentException("Got new epoch " + parseInt + " which is less than current epoch " + num);
            }
            logger.info("Epoch: " + parseInt);
            this.state.put(EPOCH, Integer.toString(parseInt));
            this.state.put(COUNT, "0");
            taskCoordinator.commit(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
        }
    }

    public void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        Integer num = getInt(EPOCH);
        if (num == null) {
            resetEpoch();
            return;
        }
        int intValue = getInt(COUNT).intValue();
        if (intValue < this.max) {
            logger.info("Emitting: " + intValue + ", epoch = " + num + ", task = " + this.taskName);
            messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "emitted"), Integer.toString(intValue), num + "-" + this.taskName.toString()));
            this.state.put(COUNT, Integer.toString(getInt(COUNT).intValue() + 1));
        }
    }

    private void resetEpoch() {
        logger.info("Resetting epoch to 0");
        this.state.put(EPOCH, "0");
        this.state.put(COUNT, "0");
    }

    private Integer getInt(String str) {
        String str2 = (String) this.state.get(str);
        if (str2 == null) {
            return null;
        }
        return Integer.valueOf(Integer.parseInt(str2));
    }
}
