package org.apache.samza.test.framework;

import com.google.common.base.Preconditions;
import java.io.File;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.application.SamzaApplication;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.context.ExternalContext;
import org.apache.samza.execution.JobPlanner;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.operators.KV;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.descriptors.StreamDescriptor;
import org.apache.samza.system.inmemory.InMemorySystemFactory;
import org.apache.samza.system.inmemory.InMemorySystemProducer;
import org.apache.samza.task.AsyncStreamTask;
import org.apache.samza.task.StreamTask;
import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.apache.samza.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/test/framework/TestRunner.class */
public class TestRunner {
    private static final Logger LOG = LoggerFactory.getLogger(TestRunner.class);
    private static final String JOB_DEFAULT_SYSTEM = "default-samza-system";
    private static final String APP_NAME = "samza-test";
    private Map<String, String> configs;
    private SamzaApplication app;
    private ExternalContext externalContext;
    private String inMemoryScope;

    private TestRunner() {
        this.configs = new HashMap();
        this.inMemoryScope = RandomStringUtils.random(10, true, true);
        this.configs.put("app.name", APP_NAME);
        this.configs.put(JobConfig.PROCESSOR_ID(), "1");
        this.configs.put("job.coordinator.factory", PassthroughJobCoordinatorFactory.class.getName());
        this.configs.put(JobConfig.STARTPOINT_METADATA_STORE_FACTORY(), InMemoryMetadataStoreFactory.class.getCanonicalName());
        this.configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
        this.configs.put(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR(), new File(System.getProperty("java.io.tmpdir"), this.inMemoryScope + "-non-logged").getAbsolutePath());
        this.configs.put(JobConfig.JOB_LOGGED_STORE_BASE_DIR(), new File(System.getProperty("java.io.tmpdir"), this.inMemoryScope + "-logged").getAbsolutePath());
        addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM);
        addConfig("job.host-affinity.enabled", Boolean.FALSE.toString());
        addConfig("inmemory.scope", this.inMemoryScope);
        addConfig(new InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).withInMemoryScope(this.inMemoryScope).toConfig());
    }

    private TestRunner(Class cls) {
        this();
        Preconditions.checkNotNull(cls);
        this.configs.put(TaskConfig.TASK_CLASS(), cls.getName());
        this.app = new LegacyTaskApplication(cls.getName());
    }

    private TestRunner(SamzaApplication samzaApplication) {
        this();
        Preconditions.checkNotNull(samzaApplication);
        this.app = samzaApplication;
    }

    public static TestRunner of(Class cls) {
        Preconditions.checkNotNull(cls);
        Preconditions.checkState(StreamTask.class.isAssignableFrom(cls) || AsyncStreamTask.class.isAssignableFrom(cls));
        return new TestRunner(cls);
    }

    public static TestRunner of(SamzaApplication samzaApplication) {
        Preconditions.checkNotNull(samzaApplication);
        return new TestRunner(samzaApplication);
    }

    public TestRunner addConfig(String str, String str2) {
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(str2);
        this.configs.put(str, str2);
        return this;
    }

    public TestRunner addConfig(Map<String, String> map) {
        Preconditions.checkNotNull(map);
        this.configs.putAll(map);
        return this;
    }

    public TestRunner addExternalContext(ExternalContext externalContext) {
        Preconditions.checkNotNull(externalContext);
        this.externalContext = externalContext;
        return this;
    }

    public <StreamMessageType> TestRunner addInputStream(InMemoryInputDescriptor inMemoryInputDescriptor, List<StreamMessageType> list) {
        Preconditions.checkNotNull(inMemoryInputDescriptor, list);
        HashMap hashMap = new HashMap();
        hashMap.put(0, list);
        initializeInMemoryInputStream(inMemoryInputDescriptor, hashMap);
        return this;
    }

    public <StreamMessageType> TestRunner addInputStream(InMemoryInputDescriptor inMemoryInputDescriptor, Map<Integer, ? extends Iterable<StreamMessageType>> map) {
        Preconditions.checkNotNull(inMemoryInputDescriptor, map);
        HashMap hashMap = new HashMap();
        hashMap.putAll(map);
        initializeInMemoryInputStream(inMemoryInputDescriptor, hashMap);
        return this;
    }

    public TestRunner addOutputStream(InMemoryOutputDescriptor<?> inMemoryOutputDescriptor, int i) {
        Preconditions.checkNotNull(inMemoryOutputDescriptor);
        Preconditions.checkState(i >= 1);
        ((InMemorySystemDescriptor) inMemoryOutputDescriptor.getSystemDescriptor()).withInMemoryScope(this.inMemoryScope);
        new InMemorySystemFactory().getAdmin(inMemoryOutputDescriptor.getSystemName(), new MapConfig(new Map[]{inMemoryOutputDescriptor.toConfig(), inMemoryOutputDescriptor.getSystemDescriptor().toConfig()})).createStream(new StreamSpec(inMemoryOutputDescriptor.getStreamId(), (String) inMemoryOutputDescriptor.getPhysicalName().orElse(inMemoryOutputDescriptor.getStreamId()), inMemoryOutputDescriptor.getSystemName(), i));
        addConfig(inMemoryOutputDescriptor.toConfig());
        addConfig(inMemoryOutputDescriptor.getSystemDescriptor().toConfig());
        addSerdeConfigs(inMemoryOutputDescriptor);
        return this;
    }

    public void run(Duration duration) {
        Preconditions.checkNotNull(this.app);
        Preconditions.checkState((duration.isZero() && duration.isNegative()) ? false : true, "Timeouts should be positive");
        deleteStoreDirectories();
        LocalApplicationRunner localApplicationRunner = new LocalApplicationRunner(this.app, new MapConfig(JobPlanner.generateSingleJobConfig(this.configs)));
        localApplicationRunner.run(this.externalContext);
        if (!localApplicationRunner.waitForFinish(duration)) {
            throw new SamzaException("Timed out waiting for application to finish");
        }
        ApplicationStatus status = localApplicationRunner.status();
        deleteStoreDirectories();
        if (status.getStatusCode() == ApplicationStatus.StatusCode.UnsuccessfulFinish) {
            throw new SamzaException("Application could not finish successfully", status.getThrowable());
        }
    }

    public static <StreamMessageType> Map<Integer, List<StreamMessageType>> consumeStream(InMemoryOutputDescriptor inMemoryOutputDescriptor, Duration duration) throws SamzaException {
        Preconditions.checkNotNull(inMemoryOutputDescriptor);
        String streamId = inMemoryOutputDescriptor.getStreamId();
        String systemName = inMemoryOutputDescriptor.getSystemName();
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        hashSet2.add(streamId);
        InMemorySystemFactory inMemorySystemFactory = new InMemorySystemFactory();
        MapConfig mapConfig = new MapConfig(new Map[]{inMemoryOutputDescriptor.toConfig(), inMemoryOutputDescriptor.getSystemDescriptor().toConfig()});
        Map systemStreamMetadata = inMemorySystemFactory.getAdmin(systemName, mapConfig).getSystemStreamMetadata(hashSet2);
        SystemConsumer consumer = inMemorySystemFactory.getConsumer(systemName, mapConfig, (MetricsRegistry) null);
        ((SystemStreamMetadata) systemStreamMetadata.get((String) inMemoryOutputDescriptor.getPhysicalName().orElse(streamId))).getSystemStreamPartitionMetadata().keySet().forEach(partition -> {
            SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, streamId, partition);
            hashSet.add(systemStreamPartition);
            consumer.register(systemStreamPartition, "0");
        });
        long currentTimeMillis = System.currentTimeMillis();
        HashMap hashMap = new HashMap();
        HashSet hashSet3 = new HashSet(hashSet);
        while (System.currentTimeMillis() < currentTimeMillis + duration.toMillis()) {
            try {
                for (Map.Entry entry : consumer.poll(hashSet, 10L).entrySet()) {
                    SystemStreamPartition systemStreamPartition = (SystemStreamPartition) entry.getKey();
                    hashMap.computeIfAbsent(systemStreamPartition, systemStreamPartition2 -> {
                        return new LinkedList();
                    });
                    List list = (List) entry.getValue();
                    if (((List) hashMap.get(systemStreamPartition)).size() + list.size() == Integer.valueOf(((SystemStreamMetadata.SystemStreamPartitionMetadata) ((SystemStreamMetadata) systemStreamMetadata.get(inMemoryOutputDescriptor.getStreamId())).getSystemStreamPartitionMetadata().get(systemStreamPartition.getPartition())).getNewestOffset()).intValue()) {
                        hashSet3.remove(entry.getKey());
                        hashSet.remove(entry.getKey());
                    }
                    ((List) hashMap.get(systemStreamPartition)).addAll(list);
                }
                if (hashSet3.isEmpty()) {
                    break;
                }
            } catch (InterruptedException e) {
                throw new SamzaException("Timed out while consuming stream \n" + e.getMessage());
            }
        }
        if (hashSet3.isEmpty()) {
            return (Map) hashMap.entrySet().stream().collect(Collectors.toMap(entry2 -> {
                return Integer.valueOf(((SystemStreamPartition) entry2.getKey()).getPartition().getPartitionId());
            }, entry3 -> {
                return (List) ((List) entry3.getValue()).stream().map(incomingMessageEnvelope -> {
                    return incomingMessageEnvelope.getMessage();
                }).collect(Collectors.toList());
            }));
        }
        throw new IllegalStateException("Could not poll for all system stream partitions");
    }

    private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor<?> inMemoryInputDescriptor, Map<Integer, Iterable<StreamMessageType>> map) {
        String systemName = inMemoryInputDescriptor.getSystemName();
        String str = (String) inMemoryInputDescriptor.getPhysicalName().orElse(inMemoryInputDescriptor.getStreamId());
        if (this.configs.containsKey(TaskConfig.INPUT_STREAMS())) {
            this.configs.put(TaskConfig.INPUT_STREAMS(), this.configs.get(TaskConfig.INPUT_STREAMS()).concat("," + systemName + "." + str));
        } else {
            this.configs.put(TaskConfig.INPUT_STREAMS(), systemName + "." + str);
        }
        ((InMemorySystemDescriptor) inMemoryInputDescriptor.getSystemDescriptor()).withInMemoryScope(this.inMemoryScope);
        addConfig(inMemoryInputDescriptor.toConfig());
        addConfig(inMemoryInputDescriptor.getSystemDescriptor().toConfig());
        addSerdeConfigs(inMemoryInputDescriptor);
        StreamSpec streamSpec = new StreamSpec(inMemoryInputDescriptor.getStreamId(), str, systemName, map.size());
        InMemorySystemFactory inMemorySystemFactory = new InMemorySystemFactory();
        MapConfig mapConfig = new MapConfig(new Map[]{inMemoryInputDescriptor.toConfig(), inMemoryInputDescriptor.getSystemDescriptor().toConfig()});
        inMemorySystemFactory.getAdmin(systemName, mapConfig).createStream(streamSpec);
        InMemorySystemProducer producer = inMemorySystemFactory.getProducer(systemName, mapConfig, (MetricsRegistry) null);
        SystemStream systemStream = new SystemStream(systemName, str);
        map.forEach((num, iterable) -> {
            iterable.forEach(obj -> {
                Object key = obj instanceof KV ? ((KV) obj).getKey() : null;
                Object value = obj instanceof KV ? ((KV) obj).getValue() : obj;
                if (value instanceof IncomingMessageEnvelope) {
                    producer.send((IncomingMessageEnvelope) value);
                } else {
                    producer.send(systemName, new OutgoingMessageEnvelope(systemStream, Integer.valueOf(num.intValue()), key, value));
                }
            });
            producer.send(systemName, new OutgoingMessageEnvelope(systemStream, Integer.valueOf(num.intValue()), (Object) null, new EndOfStreamMessage((String) null)));
        });
    }

    private void deleteStoreDirectories() {
        Preconditions.checkNotNull(this.configs.get(JobConfig.JOB_LOGGED_STORE_BASE_DIR()));
        Preconditions.checkNotNull(this.configs.get(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR()));
        deleteDirectory(this.configs.get(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR()));
        deleteDirectory(this.configs.get(JobConfig.JOB_LOGGED_STORE_BASE_DIR()));
    }

    private void deleteDirectory(String str) {
        File file = new File(str);
        LOG.info("Deleting the directory " + str);
        FileUtil.rm(file);
        if (file.exists()) {
            LOG.warn("Could not delete the directory " + str);
        }
    }

    private void addSerdeConfigs(StreamDescriptor streamDescriptor) {
        String format = String.format(StreamConfig.STREAM_ID_PREFIX(), streamDescriptor.getStreamId());
        String str = format + StreamConfig.KEY_SERDE();
        String str2 = format + StreamConfig.MSG_SERDE();
        this.configs.put(str, null);
        this.configs.put(str2, null);
    }
}
