/*
 * Decompiled with CFR 0.152.
 */
package de.xab.porter.core;

import de.xab.porter.api.dataconnection.DataConnection;
import de.xab.porter.api.dataconnection.SrcConnection;
import de.xab.porter.api.task.Context;
import de.xab.porter.api.task.Properties;
import de.xab.porter.common.spi.ExtensionLoader;
import de.xab.porter.common.util.Loggers;
import de.xab.porter.transfer.channel.Channel;
import de.xab.porter.transfer.exception.ConnectionException;
import de.xab.porter.transfer.reader.Reader;
import de.xab.porter.transfer.reporter.Reporter;
import de.xab.porter.transfer.writer.Writer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class Task {
    private final Logger logger = Loggers.getLogger(this.getClass());
    private final Context context;
    private Map<? extends Reader<?>, String> readers;
    private List<Map.Entry<? extends Writer<?>, Channel>> writers;

    public Task(Context context) {
        this.context = context;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void init() {
        SrcConnection srcConnection = this.context.getSrcConnection();
        Reader splitReader = (Reader)ExtensionLoader.getExtensionLoader(Reader.class).loadExtension(srcConnection.getConnectorType(), srcConnection.getType());
        if (srcConnection.getProperties().isSplit()) {
            List<String> sequels;
            try {
                splitReader.connect((DataConnection)srcConnection);
                sequels = splitReader.split();
            }
            catch (ConnectionException e) {
                sequels = List.of(srcConnection.getSql());
                this.logger.warning("reader connection failed " + e.getMessage());
            }
            finally {
                splitReader.close();
            }
            this.readers = sequels.stream().map(sql -> Map.entry((Reader)ExtensionLoader.getExtensionLoader(Reader.class).loadExtension(srcConnection.getConnectorType(), srcConnection.getType()), sql)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        } else {
            this.readers = Map.of(splitReader, srcConnection.getSql());
        }
        this.readers.forEach((reader, sql) -> reader.setChannels(new ArrayList()));
        this.registerChannel();
    }

    public void registerChannel() {
        Properties properties = this.context.getProperties();
        List sinkConnections = this.context.getSinkConnections();
        Reporter reporter = (Reporter)ExtensionLoader.getExtensionLoader(Reporter.class).loadExtension(null, properties.getReporter());
        this.writers = sinkConnections.stream().map(sink -> {
            Writer writer = (Writer)ExtensionLoader.getExtensionLoader(Writer.class).loadExtension(sink.getConnectorType(), sink.getType());
            try {
                writer.connect((DataConnection)sink);
            }
            catch (ConnectionException e) {
                this.logger.severe("writer connection failed" + e.getMessage());
                writer.close();
            }
            Channel channel = (Channel)ExtensionLoader.getExtensionLoader(Channel.class).loadExtension(null, this.context.getProperties().getChannel());
            channel.setOnReadListener(data -> {
                writer.write(data);
                reporter.report(data);
            });
            this.readers.forEach((reader, sql) -> reader.getChannels().add(channel));
            return Map.entry(writer, channel);
        }).collect(Collectors.toList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        CountDownLatch countDownLatch = new CountDownLatch(this.readers.size());
        ExecutorService executorService = Executors.newFixedThreadPool(this.readers.size());
        ArrayList futures = new ArrayList(this.readers.size());
        this.readers.forEach((reader, sql) -> {
            Future<Long> future = executorService.submit(() -> {
                long rows = 0L;
                try {
                    reader.connect((DataConnection)this.context.getSrcConnection());
                    rows = reader.read(sql);
                }
                catch (ConnectionException e) {
                    this.logger.severe("reader connection failed " + e.getMessage());
                }
                finally {
                    reader.close();
                    countDownLatch.countDown();
                }
                return rows;
            });
            futures.add(future);
        });
        try {
            countDownLatch.await();
            long totalRows = futures.stream().mapToLong(future -> {
                try {
                    return (Long)future.get();
                }
                catch (InterruptedException e) {
                    this.logger.severe("future is interrupted, " + e.getMessage());
                }
                catch (ExecutionException e) {
                    this.logger.severe("future execution failed, " + e.getMessage());
                }
                return 0L;
            }).sum();
            this.logger.info("task execution over, total read " + totalRows + " rows.");
        }
        catch (InterruptedException e) {
            this.logger.warning("reader interrupted");
        }
        finally {
            this.writers.forEach(writer -> ((Writer)writer.getKey()).close());
            executorService.shutdown();
        }
    }
}

