package org.apache.samza.test.integration.join;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.samza.container.TaskName;
import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.InitableTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/test/integration/join/Joiner.class */
public class Joiner implements StreamTask, InitableTask {
    private static Logger logger = LoggerFactory.getLogger(Joiner.class);
    private KeyValueStore<String, String> store;
    private int expected;
    private TaskName taskName;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/test/integration/join/Joiner$Partitions.class */
    public static class Partitions {
        int epoch;
        Set<Integer> partitions;

        public Partitions(int i, Set<Integer> set) {
            this.epoch = i;
            this.partitions = set;
        }

        public static Partitions parse(String str) {
            String[] split = str.split("\\|", -1);
            int parseInt = Integer.parseInt(split[1]);
            HashSet hashSet = new HashSet(split.length);
            for (int i = 2; i < split.length - 1; i++) {
                hashSet.add(Integer.valueOf(Integer.parseInt(split[i])));
            }
            return new Partitions(parseInt, hashSet);
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("|");
            sb.append(this.epoch);
            sb.append("|");
            Iterator<Integer> it = this.partitions.iterator();
            while (it.hasNext()) {
                sb.append(it.next().intValue());
                sb.append("|");
            }
            return sb.toString();
        }
    }

    public void init(Context context) {
        this.store = context.getTaskContext().getStore("joiner-state");
        this.expected = context.getJobContext().getConfig().getInt("num.partitions");
        this.taskName = context.getTaskContext().getTaskModel().getTaskName();
    }

    public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        String str = (String) incomingMessageEnvelope.getKey();
        String[] split = ((String) incomingMessageEnvelope.getMessage()).split("-");
        int parseInt = Integer.parseInt(split[0]);
        int parseInt2 = Integer.parseInt(split[1].split(" ")[1]);
        Partitions loadPartitions = loadPartitions(parseInt, str);
        logger.info("Joiner got epoch = " + parseInt + ", partition = " + parseInt2 + ", parts = " + loadPartitions);
        if (loadPartitions.epoch < parseInt) {
            if (loadPartitions.partitions.size() != this.expected) {
                throw new IllegalArgumentException("Should have " + this.expected + " partitions when new epoch starts.");
            }
            logger.info("Reseting epoch to " + parseInt);
            this.store.delete(str);
            loadPartitions.epoch = parseInt;
            loadPartitions.partitions.clear();
            loadPartitions.partitions.add(Integer.valueOf(parseInt2));
        } else if (loadPartitions.epoch > parseInt) {
            logger.info("Ignoring message for epoch " + parseInt);
        } else {
            loadPartitions.partitions.add(Integer.valueOf(parseInt2));
            if (loadPartitions.partitions.size() == this.expected) {
                logger.info("Completed: " + str + " -> " + Integer.toString(parseInt));
                messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "completed-keys"), str, Integer.toString(parseInt)));
            }
        }
        this.store.put(str, loadPartitions.toString());
        logger.info("Join store in Task " + this.taskName + " " + str + " -> " + loadPartitions.toString());
    }

    private Partitions loadPartitions(int i, String str) {
        String str2 = (String) this.store.get(str);
        return str2 == null ? new Partitions(i, new HashSet()) : Partitions.parse(str2);
    }
}
