package org.apache.samza.system.mock;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.BlockingEnvelopeMap;
import org.apache.samza.util.Clock;

/* loaded from: input_file:org/apache/samza/system/mock/MockSystemConsumer.class */
public class MockSystemConsumer extends BlockingEnvelopeMap {
    private final int messagesPerBatch;
    private final int threadCount;
    private final int brokerSleepMs;
    private final Set<SystemStreamPartition> ssps;
    private List<Thread> threads;

    /* loaded from: input_file:org/apache/samza/system/mock/MockSystemConsumer$MockSystemConsumerRunnable.class */
    public class MockSystemConsumerRunnable implements Runnable {
        private final Set<SystemStreamPartition> ssps;

        public MockSystemConsumerRunnable(Set<SystemStreamPartition> set) {
            this.ssps = set;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.interrupted() && this.ssps.size() > 0) {
                try {
                    HashSet<SystemStreamPartition> hashSet = new HashSet();
                    for (SystemStreamPartition systemStreamPartition : this.ssps) {
                        if (MockSystemConsumer.this.getNumMessagesInQueue(systemStreamPartition) <= 0) {
                            hashSet.add(systemStreamPartition);
                        }
                    }
                    Thread.sleep(MockSystemConsumer.this.brokerSleepMs);
                    for (SystemStreamPartition systemStreamPartition2 : hashSet) {
                        for (int i = 0; i < MockSystemConsumer.this.messagesPerBatch; i++) {
                            MockSystemConsumer.this.put(systemStreamPartition2, new IncomingMessageEnvelope(systemStreamPartition2, "0", "key", "value"));
                        }
                    }
                } catch (InterruptedException e) {
                    System.out.println("Got interrupt. Shutting down.");
                    return;
                }
            }
        }
    }

    public MockSystemConsumer(int i, int i2, int i3) {
        super(new MetricsRegistryMap("test-container-performance"), new Clock() { // from class: org.apache.samza.system.mock.MockSystemConsumer.1
            public long currentTimeMillis() {
                return System.currentTimeMillis();
            }
        });
        this.messagesPerBatch = i;
        this.threadCount = i2;
        this.brokerSleepMs = i3;
        this.ssps = new HashSet();
        this.threads = new ArrayList(i2);
    }

    public void start() {
        for (int i = 0; i < this.threadCount; i++) {
            HashSet hashSet = new HashSet();
            for (SystemStreamPartition systemStreamPartition : this.ssps) {
                if (Math.abs(systemStreamPartition.hashCode()) % this.threadCount == i) {
                    hashSet.add(systemStreamPartition);
                }
            }
            Thread thread = new Thread(new MockSystemConsumerRunnable(hashSet), "MockSystemConsumer-" + i);
            thread.setDaemon(true);
            this.threads.add(thread);
            thread.start();
        }
    }

    public void stop() {
        Iterator<Thread> it = this.threads.iterator();
        while (it.hasNext()) {
            it.next().interrupt();
        }
        try {
            Iterator<Thread> it2 = this.threads.iterator();
            while (it2.hasNext()) {
                it2.next().join();
            }
        } catch (InterruptedException e) {
        }
    }

    public void register(SystemStreamPartition systemStreamPartition, String str) {
        super.register(systemStreamPartition, str);
        this.ssps.add(systemStreamPartition);
        setIsAtHead(systemStreamPartition, true);
    }
}
