package org.apache.streams.local.builders;

import com.google.common.util.concurrent.Uninterruptibles;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatusCountable;
import org.apache.streams.core.StreamBuilder;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.local.LocalRuntimeConfiguration;
import org.apache.streams.local.counters.StreamsTaskCounter;
import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
import org.apache.streams.local.monitoring.MonitoringConfiguration;
import org.apache.streams.local.queues.ThroughputQueue;
import org.apache.streams.local.tasks.BaseStreamsTask;
import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
import org.apache.streams.local.tasks.StatusCounterMonitorThread;
import org.apache.streams.local.tasks.StreamsProviderTask;
import org.apache.streams.local.tasks.StreamsTask;
import org.apache.streams.monitoring.tasks.BroadcastMonitorThread;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/local/builders/LocalStreamBuilder.class */
public class LocalStreamBuilder implements StreamBuilder {
    private static final Logger LOGGER = LoggerFactory.getLogger(LocalStreamBuilder.class);
    private static final int DEFAULT_QUEUE_SIZE = 500;
    public static final String DEFAULT_STREAM_IDENTIFIER = "Unknown_Stream";
    public static final String DEFAULT_STARTED_AT_KEY = "startedAt";
    private LocalRuntimeConfiguration localRuntimeConfiguration;
    private MonitoringConfiguration monitoringConfiguration;
    private Map<String, StreamComponent> providers;
    private Map<String, StreamComponent> components;
    private Map<StreamsTask, Future> futures;
    private ExecutorService executor;
    private ExecutorService monitor;
    private int totalTasks;
    private int monitorTasks;
    private LocalStreamProcessMonitorThread monitorThread;
    private Map<String, List<StreamsTask>> tasks;
    private Thread shutdownHook;
    private BroadcastMonitorThread broadcastMonitor;
    private String streamIdentifier;
    private DateTime startedAt;
    private boolean useDeprecatedMonitors;

    public LocalStreamBuilder() {
        this.providers = new HashMap();
        this.components = new HashMap();
        this.futures = new HashMap();
        this.totalTasks = 0;
        this.monitorTasks = 0;
        this.streamIdentifier = DEFAULT_STREAM_IDENTIFIER;
        this.startedAt = new DateTime();
        this.localRuntimeConfiguration = (LocalRuntimeConfiguration) new StreamsConfigurator(LocalRuntimeConfiguration.class).detectCustomConfiguration();
        this.monitoringConfiguration = new ComponentConfigurator(MonitoringConfiguration.class).detectConfiguration();
    }

    public LocalStreamBuilder(LocalRuntimeConfiguration localRuntimeConfiguration) {
        this.providers = new HashMap();
        this.components = new HashMap();
        this.futures = new HashMap();
        this.totalTasks = 0;
        this.monitorTasks = 0;
        this.streamIdentifier = DEFAULT_STREAM_IDENTIFIER;
        this.startedAt = new DateTime();
        this.localRuntimeConfiguration = localRuntimeConfiguration;
        this.monitoringConfiguration = new ComponentConfigurator(MonitoringConfiguration.class).detectConfiguration();
    }

    public LocalStreamBuilder(MonitoringConfiguration monitoringConfiguration) {
        this.providers = new HashMap();
        this.components = new HashMap();
        this.futures = new HashMap();
        this.totalTasks = 0;
        this.monitorTasks = 0;
        this.streamIdentifier = DEFAULT_STREAM_IDENTIFIER;
        this.startedAt = new DateTime();
        this.localRuntimeConfiguration = (LocalRuntimeConfiguration) new StreamsConfigurator(LocalRuntimeConfiguration.class).detectCustomConfiguration();
        this.monitoringConfiguration = monitoringConfiguration;
    }

    public LocalStreamBuilder(LocalRuntimeConfiguration localRuntimeConfiguration, MonitoringConfiguration monitoringConfiguration) {
        this.providers = new HashMap();
        this.components = new HashMap();
        this.futures = new HashMap();
        this.totalTasks = 0;
        this.monitorTasks = 0;
        this.streamIdentifier = DEFAULT_STREAM_IDENTIFIER;
        this.startedAt = new DateTime();
        this.localRuntimeConfiguration = localRuntimeConfiguration;
        this.monitoringConfiguration = monitoringConfiguration;
    }

    public void prepare() {
        this.streamIdentifier = this.localRuntimeConfiguration.getIdentifier();
        this.localRuntimeConfiguration.setStartedAt(Long.valueOf(this.startedAt.getMillis()));
        this.shutdownHook = new Thread(() -> {
            LOGGER.debug("Shutdown hook received.  Beginning shutdown");
            this.stopInternal(true);
        });
        this.useDeprecatedMonitors = false;
        if (this.monitoringConfiguration != null) {
            this.broadcastMonitor = new BroadcastMonitorThread(this.monitoringConfiguration);
        }
    }

    public void setUseDeprecatedMonitors(boolean z) {
        this.useDeprecatedMonitors = z;
    }

    public StreamBuilder newPerpetualStream(String str, StreamsProvider streamsProvider) {
        validateId(str);
        this.providers.put(str, new StreamComponent(str, streamsProvider, true, (StreamsConfiguration) this.localRuntimeConfiguration));
        this.totalTasks++;
        if (this.useDeprecatedMonitors && (streamsProvider instanceof DatumStatusCountable)) {
            this.monitorTasks++;
        }
        return this;
    }

    public StreamBuilder newReadCurrentStream(String str, StreamsProvider streamsProvider) {
        validateId(str);
        this.providers.put(str, new StreamComponent(str, streamsProvider, false, (StreamsConfiguration) this.localRuntimeConfiguration));
        this.totalTasks++;
        if (this.useDeprecatedMonitors && (streamsProvider instanceof DatumStatusCountable)) {
            this.monitorTasks++;
        }
        return this;
    }

    public StreamBuilder newReadNewStream(String str, StreamsProvider streamsProvider, BigInteger bigInteger) {
        validateId(str);
        this.providers.put(str, new StreamComponent(str, streamsProvider, bigInteger, this.localRuntimeConfiguration));
        this.totalTasks++;
        if (this.useDeprecatedMonitors && (streamsProvider instanceof DatumStatusCountable)) {
            this.monitorTasks++;
        }
        return this;
    }

    public StreamBuilder newReadRangeStream(String str, StreamsProvider streamsProvider, DateTime dateTime, DateTime dateTime2) {
        validateId(str);
        this.providers.put(str, new StreamComponent(str, streamsProvider, dateTime, dateTime2, this.localRuntimeConfiguration));
        this.totalTasks++;
        if (this.useDeprecatedMonitors && (streamsProvider instanceof DatumStatusCountable)) {
            this.monitorTasks++;
        }
        return this;
    }

    public StreamBuilder setStreamsConfiguration(StreamsConfiguration streamsConfiguration) {
        this.localRuntimeConfiguration = (LocalRuntimeConfiguration) StreamsJacksonMapper.getInstance().convertValue(streamsConfiguration, LocalRuntimeConfiguration.class);
        return this;
    }

    public StreamsConfiguration getStreamsConfiguration() {
        return (StreamsConfiguration) StreamsJacksonMapper.getInstance().convertValue(this.localRuntimeConfiguration, StreamsConfiguration.class);
    }

    public StreamBuilder addStreamsProcessor(String str, StreamsProcessor streamsProcessor, int i, String... strArr) {
        validateId(str);
        StreamComponent streamComponent = new StreamComponent(str, streamsProcessor, new ThroughputQueue(this.localRuntimeConfiguration.getMaxQueueCapacity().intValue(), str, this.streamIdentifier, this.startedAt.getMillis()), i, this.localRuntimeConfiguration);
        this.components.put(str, streamComponent);
        connectToOtherComponents(strArr, streamComponent);
        this.totalTasks += i;
        if (this.useDeprecatedMonitors && (streamsProcessor instanceof DatumStatusCountable)) {
            this.monitorTasks++;
        }
        return this;
    }

    public StreamBuilder addStreamsPersistWriter(String str, StreamsPersistWriter streamsPersistWriter, int i, String... strArr) {
        validateId(str);
        StreamComponent streamComponent = new StreamComponent(str, streamsPersistWriter, new ThroughputQueue(this.localRuntimeConfiguration.getMaxQueueCapacity().intValue(), str, this.streamIdentifier, this.startedAt.getMillis()), i, this.localRuntimeConfiguration);
        this.components.put(str, streamComponent);
        connectToOtherComponents(strArr, streamComponent);
        this.totalTasks += i;
        if (this.useDeprecatedMonitors && (streamsPersistWriter instanceof DatumStatusCountable)) {
            this.monitorTasks++;
        }
        return this;
    }

    public void start() {
        prepare();
        attachShutdownHandler();
        boolean z = true;
        this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this);
        this.monitor = Executors.newCachedThreadPool();
        HashMap hashMap = new HashMap();
        this.tasks = new HashMap();
        boolean z2 = false;
        try {
            try {
                if (this.useDeprecatedMonitors) {
                    this.monitorThread = new LocalStreamProcessMonitorThread(this.executor, 10);
                    this.monitor.submit(this.monitorThread);
                }
                setupComponentTasks(this.tasks);
                setupProviderTasks(hashMap);
                LOGGER.info("Started stream with {} components", Integer.valueOf(this.tasks.size()));
                while (z) {
                    Uninterruptibles.sleepUninterruptibly(this.localRuntimeConfiguration.getShutdownCheckDelay().longValue(), TimeUnit.MILLISECONDS);
                    z = false;
                    Iterator<StreamsProviderTask> it = hashMap.values().iterator();
                    while (it.hasNext()) {
                        z = z || it.next().isRunning();
                    }
                    for (StreamComponent streamComponent : this.components.values()) {
                        boolean z3 = false;
                        for (StreamsTask streamsTask : streamComponent.getStreamsTasks()) {
                            if (streamsTask instanceof BaseStreamsTask) {
                                z3 = z3 || streamsTask.isRunning();
                            }
                        }
                        z = z || (z3 && streamComponent.getInBoundQueue().size() > 0);
                    }
                    if (z) {
                        Uninterruptibles.sleepUninterruptibly(this.localRuntimeConfiguration.getShutdownCheckInterval().longValue(), TimeUnit.MILLISECONDS);
                    }
                }
                LOGGER.info("Components are no longer running or timed out");
                LOGGER.info("Stream has completed, pausing @ {}", Long.valueOf(System.currentTimeMillis()));
                Uninterruptibles.sleepUninterruptibly(this.localRuntimeConfiguration.getShutdownPauseMs().longValue(), TimeUnit.MILLISECONDS);
                LOGGER.info("Stream has completed, shutting down @ {}", Long.valueOf(System.currentTimeMillis()));
                stopInternal(false);
            } catch (Exception e) {
                LOGGER.warn("Runtime exception.  Beginning shutdown");
                z2 = true;
                LOGGER.info("Stream has completed, pausing @ {}", Long.valueOf(System.currentTimeMillis()));
                Uninterruptibles.sleepUninterruptibly(this.localRuntimeConfiguration.getShutdownPauseMs().longValue(), TimeUnit.MILLISECONDS);
                LOGGER.info("Stream has completed, shutting down @ {}", Long.valueOf(System.currentTimeMillis()));
                stopInternal(true);
            }
        } catch (Throwable th) {
            LOGGER.info("Stream has completed, pausing @ {}", Long.valueOf(System.currentTimeMillis()));
            Uninterruptibles.sleepUninterruptibly(this.localRuntimeConfiguration.getShutdownPauseMs().longValue(), TimeUnit.MILLISECONDS);
            LOGGER.info("Stream has completed, shutting down @ {}", Long.valueOf(System.currentTimeMillis()));
            stopInternal(z2);
            throw th;
        }
    }

    private void attachShutdownHandler() {
        LOGGER.debug("Attaching shutdown handler");
        Runtime.getRuntime().addShutdownHook(this.shutdownHook);
    }

    private void detachShutdownHandler() {
        LOGGER.debug("Detaching shutdown handler");
        Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
    }

    protected void forceShutdown(Map<String, List<StreamsTask>> map) {
        LOGGER.debug("Shutdown failed.  Forcing shutdown");
        Iterator<List<StreamsTask>> it = map.values().iterator();
        while (it.hasNext()) {
            for (StreamsTask streamsTask : it.next()) {
                streamsTask.stopTask();
                if (streamsTask.isWaiting()) {
                    this.futures.get(streamsTask).cancel(true);
                }
            }
        }
        this.executor.shutdown();
        this.monitor.shutdown();
        try {
            if (!this.executor.awaitTermination(this.localRuntimeConfiguration.getExecutorShutdownPauseMs().longValue(), TimeUnit.MILLISECONDS)) {
                this.executor.shutdownNow();
            }
            if (!this.monitor.awaitTermination(this.localRuntimeConfiguration.getMonitorShutdownPauseMs().longValue(), TimeUnit.MILLISECONDS)) {
                this.monitor.shutdownNow();
            }
        } catch (InterruptedException e) {
            this.executor.shutdownNow();
            this.monitor.shutdownNow();
            throw new RuntimeException(e);
        }
    }

    protected void shutdown(Map<String, List<StreamsTask>> map) throws InterruptedException {
        LOGGER.info("Attempting to shutdown tasks");
        if (this.monitorThread != null) {
            this.monitorThread.shutdown();
        }
        this.executor.shutdown();
        Iterator<StreamComponent> it = this.providers.values().iterator();
        while (it.hasNext()) {
            shutDownTask(it.next(), map);
        }
        if (!this.executor.awaitTermination(this.localRuntimeConfiguration.getExecutorShutdownWaitMs().longValue(), TimeUnit.MILLISECONDS)) {
            this.executor.shutdownNow();
            this.executor.awaitTermination(this.localRuntimeConfiguration.getExecutorShutdownWaitMs().longValue(), TimeUnit.MILLISECONDS);
        }
        if (this.monitor.awaitTermination(this.localRuntimeConfiguration.getMonitorShutdownWaitMs().longValue(), TimeUnit.MILLISECONDS)) {
            return;
        }
        this.monitor.shutdownNow();
        this.monitor.awaitTermination(this.localRuntimeConfiguration.getMonitorShutdownWaitMs().longValue(), TimeUnit.MILLISECONDS);
    }

    protected void setupProviderTasks(Map<String, StreamsProviderTask> map) {
        for (StreamComponent streamComponent : this.providers.values()) {
            DatumStatusCountable createConnectedTask = streamComponent.createConnectedTask(getTimeout());
            createConnectedTask.setStreamConfig(this.localRuntimeConfiguration);
            createConnectedTask.setStreamsTaskCounter(new StreamsTaskCounter(streamComponent.getId(), this.streamIdentifier, this.startedAt.getMillis()));
            this.executor.submit((Runnable) createConnectedTask);
            map.put(streamComponent.getId(), (StreamsProviderTask) createConnectedTask);
            if (this.useDeprecatedMonitors && streamComponent.isOperationCountable()) {
                this.monitor.submit(new StatusCounterMonitorThread(streamComponent.getOperation(), 10));
                this.monitor.submit(new StatusCounterMonitorThread(createConnectedTask, 10));
            }
        }
    }

    protected void setupComponentTasks(Map<String, List<StreamsTask>> map) {
        for (StreamComponent streamComponent : this.components.values()) {
            int numTasks = streamComponent.getNumTasks();
            LinkedList linkedList = new LinkedList();
            StreamsTaskCounter streamsTaskCounter = new StreamsTaskCounter(streamComponent.getId(), this.streamIdentifier, this.startedAt.getMillis());
            for (int i = 0; i < numTasks; i++) {
                DatumStatusCountable createConnectedTask = streamComponent.createConnectedTask(getTimeout());
                createConnectedTask.setStreamsTaskCounter(streamsTaskCounter);
                createConnectedTask.setStreamConfig(this.localRuntimeConfiguration);
                this.futures.put(createConnectedTask, this.executor.submit((Runnable) createConnectedTask));
                linkedList.add(createConnectedTask);
                if (this.useDeprecatedMonitors && streamComponent.isOperationCountable()) {
                    this.monitor.submit(new StatusCounterMonitorThread(streamComponent.getOperation(), 10));
                    this.monitor.submit(new StatusCounterMonitorThread(createConnectedTask, 10));
                }
                this.monitor.submit((Runnable) this.broadcastMonitor);
            }
            map.put(streamComponent.getId(), linkedList);
        }
    }

    private void shutDownTask(StreamComponent streamComponent, Map<String, List<StreamsTask>> map) throws InterruptedException {
        List<StreamsTask> list = map.get(streamComponent.getId());
        if (list != null) {
            boolean z = true;
            Iterator<StreamComponent> it = streamComponent.getUpStreamComponents().iterator();
            while (it.hasNext()) {
                List<StreamsTask> list2 = map.get(it.next().getId());
                if (list2 != null) {
                    Iterator<StreamsTask> it2 = list2.iterator();
                    while (it2.hasNext()) {
                        z = z && !it2.next().isRunning();
                    }
                }
            }
            if (z) {
                for (StreamsTask streamsTask : list) {
                    streamsTask.stopTask();
                    if (streamsTask.isWaiting()) {
                        this.futures.get(streamsTask).cancel(true);
                    }
                }
                for (StreamsTask streamsTask2 : list) {
                    for (int i = 0; i < this.localRuntimeConfiguration.getTaskTimeoutMs().longValue() / 1000 && streamsTask2.isRunning(); i++) {
                        Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
                    }
                    if (streamsTask2.isRunning()) {
                        LOGGER.warn("Task {} failed to terminate in allotted timeframe", streamsTask2.toString());
                    }
                }
            }
        }
        if (streamComponent.getDownStreamComponents() != null) {
            Iterator<StreamComponent> it3 = streamComponent.getDownStreamComponents().iterator();
            while (it3.hasNext()) {
                shutDownTask(it3.next(), map);
            }
        }
    }

    public void stop() {
        stopInternal(false);
    }

    protected void stopInternal(boolean z) {
        try {
            try {
                shutdown(this.tasks);
                if (!z) {
                    try {
                        detachShutdownHandler();
                    } catch (Throwable th) {
                        LOGGER.error("StopInternal caught Throwable: {}", th);
                        System.exit(1);
                    }
                }
            } catch (Exception e) {
                LOGGER.error("Exception while trying to shutdown Stream: {}", e);
                forceShutdown(this.tasks);
                if (!z) {
                    try {
                        detachShutdownHandler();
                    } catch (Throwable th2) {
                        LOGGER.error("StopInternal caught Throwable: {}", th2);
                        System.exit(1);
                    }
                }
            }
        } catch (Throwable th3) {
            if (!z) {
                try {
                    detachShutdownHandler();
                } catch (Throwable th4) {
                    LOGGER.error("StopInternal caught Throwable: {}", th4);
                    System.exit(1);
                    throw th3;
                }
            }
            throw th3;
        }
    }

    private void connectToOtherComponents(String[] strArr, StreamComponent streamComponent) {
        StreamComponent streamComponent2;
        for (String str : strArr) {
            if (this.providers.containsKey(str)) {
                streamComponent2 = this.providers.get(str);
            } else {
                if (!this.components.containsKey(str)) {
                    throw new InvalidStreamException("Cannot connect to id, " + str + ", because id does not exist.");
                }
                streamComponent2 = this.components.get(str);
            }
            StreamComponent streamComponent3 = streamComponent2;
            streamComponent3.addOutBoundQueue(streamComponent, streamComponent.getInBoundQueue());
            streamComponent.addInboundQueue(streamComponent3);
        }
    }

    private void validateId(String str) {
        if (this.providers.containsKey(str) || this.components.containsKey(str)) {
            throw new InvalidStreamException("Duplicate id. " + str + " is already assigned to another component");
        }
        if (str.contains(":")) {
            throw new InvalidStreamException("Invalid character, ':', in component id : " + str);
        }
    }

    protected int getTimeout() {
        return this.localRuntimeConfiguration.getProviderTimeoutMs().intValue();
    }
}
