/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.dsbulk.workflow.unload;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.shaded.guava.common.base.Stopwatch;
import com.datastax.oss.dsbulk.codecs.api.ConvertingCodecFactory;
import com.datastax.oss.dsbulk.connectors.api.CommonConnectorFeature;
import com.datastax.oss.dsbulk.connectors.api.Connector;
import com.datastax.oss.dsbulk.connectors.api.ConnectorFeature;
import com.datastax.oss.dsbulk.connectors.api.Record;
import com.datastax.oss.dsbulk.connectors.api.RecordMetadata;
import com.datastax.oss.dsbulk.executor.api.reader.BulkReader;
import com.datastax.oss.dsbulk.executor.api.result.ReadResult;
import com.datastax.oss.dsbulk.workflow.api.Workflow;
import com.datastax.oss.dsbulk.workflow.api.utils.DurationUtils;
import com.datastax.oss.dsbulk.workflow.commons.log.LogManager;
import com.datastax.oss.dsbulk.workflow.commons.metrics.MetricsManager;
import com.datastax.oss.dsbulk.workflow.commons.schema.ReadResultMapper;
import com.datastax.oss.dsbulk.workflow.commons.settings.CodecSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.ConnectorSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.DriverSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.EngineSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.ExecutorSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.LogSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.MonitoringSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.SchemaGenerationStrategy;
import com.datastax.oss.dsbulk.workflow.commons.settings.SchemaSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.SettingsManager;
import com.datastax.oss.dsbulk.workflow.commons.utils.CloseableUtils;
import com.datastax.oss.dsbulk.workflow.commons.utils.ClusterInformationUtils;
import com.typesafe.config.Config;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class UnloadWorkflow
implements Workflow {
    private static final Logger LOGGER = LoggerFactory.getLogger(UnloadWorkflow.class);
    private final SettingsManager settingsManager;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private String executionId;
    private Connector connector;
    private Set<Scheduler> schedulers;
    private ReadResultMapper readResultMapper;
    private MetricsManager metricsManager;
    private LogManager logManager;
    private CqlSession session;
    private BulkReader executor;
    private List<Statement<?>> readStatements;
    private Function<Publisher<Record>, Publisher<Record>> writer;
    private Function<Flux<ReadResult>, Flux<ReadResult>> totalItemsMonitor;
    private Function<Flux<Record>, Flux<Record>> failedRecordsMonitor;
    private Function<Flux<ReadResult>, Flux<ReadResult>> failedReadResultsMonitor;
    private Function<Flux<Record>, Flux<Record>> failedRecordsHandler;
    private Function<Flux<ReadResult>, Flux<ReadResult>> totalItemsCounter;
    private Function<Flux<ReadResult>, Flux<ReadResult>> failedReadsHandler;
    private Function<Flux<ReadResult>, Flux<ReadResult>> queryWarningsHandler;
    private Function<Flux<Record>, Flux<Record>> unmappableRecordsHandler;
    private Function<Flux<Void>, Flux<Void>> terminationHandler;
    private int readConcurrency;
    private int numCores;
    private int writeConcurrency;

    UnloadWorkflow(Config config) {
        this.settingsManager = new SettingsManager(config);
    }

    public void init() throws Exception {
        this.settingsManager.init("UNLOAD", false, SchemaGenerationStrategy.READ_AND_MAP);
        this.executionId = this.settingsManager.getExecutionId();
        LogSettings logSettings = this.settingsManager.getLogSettings();
        DriverSettings driverSettings = this.settingsManager.getDriverSettings();
        ConnectorSettings connectorSettings = this.settingsManager.getConnectorSettings();
        SchemaSettings schemaSettings = this.settingsManager.getSchemaSettings();
        ExecutorSettings executorSettings = this.settingsManager.getExecutorSettings();
        CodecSettings codecSettings = this.settingsManager.getCodecSettings();
        MonitoringSettings monitoringSettings = this.settingsManager.getMonitoringSettings();
        EngineSettings engineSettings = this.settingsManager.getEngineSettings();
        engineSettings.init();
        if (engineSettings.isDryRun()) {
            throw new IllegalArgumentException("Dry-run is not supported for unload");
        }
        logSettings.init();
        connectorSettings.init(false);
        this.connector = connectorSettings.getConnector();
        this.connector.init();
        driverSettings.init(false);
        logSettings.logEffectiveSettings(this.settingsManager.getEffectiveBulkLoaderConfig(), driverSettings.getDriverConfig());
        codecSettings.init();
        monitoringSettings.init();
        executorSettings.init();
        ConvertingCodecFactory codecFactory = codecSettings.createCodecFactory(schemaSettings.isAllowExtraFields(), schemaSettings.isAllowMissingFields());
        this.session = driverSettings.newSession(this.executionId, codecFactory.getCodecRegistry(), monitoringSettings.getRegistry());
        ClusterInformationUtils.printDebugInfoAboutCluster((CqlSession)this.session);
        schemaSettings.init(this.session, codecFactory, this.connector.supports((ConnectorFeature)CommonConnectorFeature.INDEXED_RECORDS), this.connector.supports((ConnectorFeature)CommonConnectorFeature.MAPPED_RECORDS));
        this.logManager = logSettings.newLogManager(this.session, false);
        this.logManager.init();
        this.metricsManager = monitoringSettings.newMetricsManager(false, false, this.logManager.getOperationDirectory(), logSettings.getVerbosity(), this.session.getContext().getProtocolVersion(), this.session.getContext().getCodecRegistry(), schemaSettings.getRowType());
        this.metricsManager.init();
        RecordMetadata recordMetadata = this.connector.getRecordMetadata();
        this.readResultMapper = schemaSettings.createReadResultMapper(this.session, recordMetadata, codecFactory, logSettings.isSources());
        this.readStatements = schemaSettings.createReadStatements(this.session);
        this.executor = executorSettings.newReadExecutor(this.session, this.metricsManager.getExecutionListener(), schemaSettings.isSearchQuery());
        this.closed.set(false);
        this.writer = this.connector.write();
        this.totalItemsMonitor = this.metricsManager.newTotalItemsMonitor();
        this.failedRecordsMonitor = this.metricsManager.newFailedItemsMonitor();
        this.failedReadResultsMonitor = this.metricsManager.newFailedItemsMonitor();
        this.failedRecordsHandler = this.logManager.newFailedRecordsHandler();
        this.totalItemsCounter = this.logManager.newTotalItemsCounter();
        this.failedReadsHandler = this.logManager.newFailedReadsHandler();
        this.queryWarningsHandler = this.logManager.newQueryWarningsHandler();
        this.unmappableRecordsHandler = this.logManager.newUnmappableRecordsHandler();
        this.terminationHandler = this.logManager.newTerminationHandler();
        this.numCores = Runtime.getRuntime().availableProcessors();
        if (this.connector.writeConcurrency() < 1) {
            throw new IllegalArgumentException("Invalid write concurrency: " + this.connector.writeConcurrency());
        }
        this.writeConcurrency = this.connector.writeConcurrency();
        LOGGER.debug("Using write concurrency: {}", (Object)this.writeConcurrency);
        this.readConcurrency = Math.min(this.readStatements.size(), engineSettings.getMaxConcurrentQueries().orElse(this.numCores));
        LOGGER.debug("Using read concurrency: {} (user-supplied: {})", (Object)this.readConcurrency, (Object)engineSettings.getMaxConcurrentQueries().isPresent());
        this.schedulers = new HashSet<Scheduler>();
    }

    public boolean execute() {
        String elapsedStr;
        LOGGER.debug("{} started.", (Object)this);
        this.metricsManager.start();
        Flux<Record> flux = this.writeConcurrency == 1 ? this.oneWriter() : (this.writeConcurrency < this.numCores / 2 || this.readConcurrency < this.numCores / 2 ? this.fewWriters() : this.manyWriters());
        Stopwatch timer = Stopwatch.createStarted();
        flux.then().flux().transform(this.terminationHandler).blockLast();
        timer.stop();
        int totalErrors = this.logManager.getTotalErrors();
        this.metricsManager.stop(timer.elapsed(), totalErrors == 0);
        Duration elapsed = DurationUtils.round((Duration)timer.elapsed(), (TimeUnit)TimeUnit.SECONDS);
        String string = elapsedStr = elapsed.isZero() ? "less than one second" : DurationUtils.formatDuration((Duration)elapsed);
        if (totalErrors == 0) {
            LOGGER.info("{} completed successfully in {}.", (Object)this, (Object)elapsedStr);
        } else {
            LOGGER.warn("{} completed with {} errors in {}.", new Object[]{this, totalErrors, elapsedStr});
        }
        return totalErrors == 0;
    }

    private Flux<Record> oneWriter() {
        int numThreads = Math.min(this.numCores * 2, this.readConcurrency);
        Scheduler scheduler = numThreads == 1 ? Schedulers.immediate() : Schedulers.newParallel((int)numThreads, (ThreadFactory)new DefaultThreadFactory("workflow"));
        this.schedulers.add(scheduler);
        return Flux.fromIterable(this.readStatements).flatMap(results -> Flux.from((Publisher)this.executor.readReactive(results)).publishOn(scheduler, 500).transform(this.queryWarningsHandler).transform(this.totalItemsMonitor).transform(this.totalItemsCounter).transform(this.failedReadResultsMonitor).transform(this.failedReadsHandler).map(arg_0 -> ((ReadResultMapper)this.readResultMapper).map(arg_0)).transform(this.failedRecordsMonitor).transform(this.unmappableRecordsHandler), this.readConcurrency, 500).transform(this.writer).transform(this.failedRecordsMonitor).transform(this.failedRecordsHandler);
    }

    private Flux<Record> fewWriters() {
        int numThreadsForReads = Math.min(this.numCores, this.readConcurrency);
        Scheduler schedulerForReads = numThreadsForReads == 1 ? Schedulers.immediate() : Schedulers.newParallel((int)numThreadsForReads, (ThreadFactory)new DefaultThreadFactory("workflow-read"));
        int numThreadsForWrites = Math.min(this.numCores, this.writeConcurrency);
        Scheduler schedulerForWrites = Schedulers.newParallel((int)numThreadsForWrites, (ThreadFactory)new DefaultThreadFactory("workflow-write"));
        this.schedulers.add(schedulerForReads);
        this.schedulers.add(schedulerForWrites);
        return Flux.fromIterable(this.readStatements).flatMap(results -> Flux.from((Publisher)this.executor.readReactive(results)).publishOn(schedulerForReads, 500).transform(this.queryWarningsHandler).transform(this.totalItemsMonitor).transform(this.totalItemsCounter).transform(this.failedReadResultsMonitor).transform(this.failedReadsHandler).map(arg_0 -> ((ReadResultMapper)this.readResultMapper).map(arg_0)).transform(this.failedRecordsMonitor).transform(this.unmappableRecordsHandler), this.readConcurrency, 500).parallel(this.writeConcurrency).runOn(schedulerForWrites).groups().flatMap(records -> records.transform(this.writer).transform(this.failedRecordsMonitor).transform(this.failedRecordsHandler), this.writeConcurrency, 500);
    }

    private Flux<Record> manyWriters() {
        int actualConcurrency = Math.min(this.readConcurrency, this.writeConcurrency);
        int numThreads = Math.min(this.numCores * 2, actualConcurrency);
        Scheduler scheduler = Schedulers.newParallel((int)numThreads, (ThreadFactory)new DefaultThreadFactory("workflow"));
        this.schedulers.add(scheduler);
        return Flux.fromIterable(this.readStatements).flatMap(results -> {
            Flux records = Flux.from((Publisher)this.executor.readReactive(results)).publishOn(scheduler, 500).transform(this.queryWarningsHandler).transform(this.totalItemsMonitor).transform(this.totalItemsCounter).transform(this.failedReadResultsMonitor).transform(this.failedReadsHandler).map(arg_0 -> ((ReadResultMapper)this.readResultMapper).map(arg_0)).transform(this.failedRecordsMonitor).transform(this.unmappableRecordsHandler);
            records = actualConcurrency == this.writeConcurrency ? records.transform(this.writer) : records.window(500).flatMap(window -> window.transform(this.writer), 1, 500);
            return records.transform(this.failedRecordsMonitor).transform(this.failedRecordsHandler);
        }, actualConcurrency, 500);
    }

    public void close() throws Exception {
        if (this.closed.compareAndSet(false, true)) {
            LOGGER.debug("{} closing.", (Object)this);
            Exception e = CloseableUtils.closeQuietly((AutoCloseable)this.metricsManager, null);
            e = CloseableUtils.closeQuietly((AutoCloseable)this.logManager, (Exception)e);
            e = CloseableUtils.closeQuietly((AutoCloseable)this.connector, (Exception)e);
            if (this.schedulers != null) {
                for (Scheduler scheduler : this.schedulers) {
                    e = CloseableUtils.closeQuietly((Disposable)scheduler, (Exception)e);
                }
            }
            e = CloseableUtils.closeQuietly((AutoCloseable)this.executor, (Exception)e);
            e = CloseableUtils.closeQuietly((AutoCloseable)this.session, (Exception)e);
            if (this.metricsManager != null) {
                this.metricsManager.reportFinalMetrics();
            }
            LOGGER.debug("{} closed.", (Object)this);
            if (e != null) {
                throw e;
            }
        }
    }

    public String toString() {
        if (this.executionId == null) {
            return "Operation";
        }
        return "Operation " + this.executionId;
    }
}

