package co.cask.cdap.template.etl.realtime;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.worker.AbstractWorker;
import co.cask.cdap.api.worker.WorkerContext;
import co.cask.cdap.template.etl.api.Transform;
import co.cask.cdap.template.etl.api.TransformContext;
import co.cask.cdap.template.etl.api.Transformation;
import co.cask.cdap.template.etl.api.realtime.RealtimeContext;
import co.cask.cdap.template.etl.api.realtime.RealtimeSink;
import co.cask.cdap.template.etl.api.realtime.RealtimeSource;
import co.cask.cdap.template.etl.api.realtime.SourceState;
import co.cask.cdap.template.etl.common.Constants;
import co.cask.cdap.template.etl.common.DefaultEmitter;
import co.cask.cdap.template.etl.common.Destroyables;
import co.cask.cdap.template.etl.common.ETLStage;
import co.cask.cdap.template.etl.common.StageMetrics;
import co.cask.cdap.template.etl.common.TransformExecutor;
import co.cask.cdap.template.etl.realtime.config.ETLRealtimeConfig;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/template/etl/realtime/ETLWorker.class */
public class ETLWorker extends AbstractWorker {
    private static final Logger LOG = LoggerFactory.getLogger(ETLWorker.class);
    private static final Type STRING_LIST_TYPE = new TypeToken<List<String>>() { // from class: co.cask.cdap.template.etl.realtime.ETLWorker.1
    }.getType();
    private static final Gson GSON = new Gson();
    private static final String SEPARATOR = ":";
    private String adapterName;
    private RealtimeSource source;
    private RealtimeSink sink;
    private List<Metrics> transformMetrics;
    private TransformExecutor transformExecutor;
    private DefaultEmitter sourceEmitter;
    private String stateStoreKey;
    private byte[] stateStoreKeyBytes;
    private Metrics metrics;
    private volatile boolean running;

    public void configure() {
        setName(ETLWorker.class.getSimpleName());
        setDescription("Worker Driver for Realtime ETL Adapters");
    }

    public void initialize(WorkerContext workerContext) throws Exception {
        super.initialize(workerContext);
        Map runtimeArguments = workerContext.getRuntimeArguments();
        Preconditions.checkArgument(runtimeArguments.containsKey(Constants.ADAPTER_NAME));
        Preconditions.checkArgument(runtimeArguments.containsKey(Constants.CONFIG_KEY));
        Preconditions.checkArgument(runtimeArguments.containsKey(Constants.Source.PLUGINID));
        Preconditions.checkArgument(runtimeArguments.containsKey(Constants.Sink.PLUGINID));
        Preconditions.checkArgument(runtimeArguments.containsKey(Constants.Transform.PLUGINIDS));
        Preconditions.checkArgument(runtimeArguments.containsKey(Constants.Realtime.UNIQUE_ID));
        this.adapterName = (String) runtimeArguments.get(Constants.ADAPTER_NAME);
        this.stateStoreKey = String.format("%s%s%s%s%s", this.adapterName, ":", runtimeArguments.get(Constants.Realtime.UNIQUE_ID), ":", Integer.valueOf(workerContext.getInstanceId()));
        this.stateStoreKeyBytes = Bytes.toBytes(this.stateStoreKey);
        ETLRealtimeConfig eTLRealtimeConfig = (ETLRealtimeConfig) GSON.fromJson((String) runtimeArguments.get(Constants.CONFIG_KEY), ETLRealtimeConfig.class);
        getContext().execute(new TxRunnable() { // from class: co.cask.cdap.template.etl.realtime.ETLWorker.2
            public void run(DatasetContext datasetContext) throws Exception {
                KeyValueTable dataset = datasetContext.getDataset(ETLRealtimeTemplate.STATE_TABLE);
                byte[] bytes = Bytes.toBytes(String.format("%s%s", ETLWorker.this.adapterName, ":"));
                CloseableIterator scan = dataset.scan(bytes, Bytes.stopKeyForPrefix(bytes));
                while (scan.hasNext()) {
                    try {
                        KeyValue keyValue = (KeyValue) scan.next();
                        if (Bytes.compareTo(ETLWorker.this.stateStoreKeyBytes, (byte[]) keyValue.getKey()) != 0) {
                            dataset.delete((byte[]) keyValue.getKey());
                        }
                    } finally {
                        scan.close();
                    }
                }
            }
        });
        initializeSource(workerContext, eTLRealtimeConfig.getSource());
        List<Transformation> initializeTransforms = initializeTransforms(workerContext, eTLRealtimeConfig.getTransforms());
        initializeSink(workerContext, eTLRealtimeConfig.getSink());
        this.transformExecutor = new TransformExecutor(initializeTransforms, this.transformMetrics);
    }

    private void initializeSource(WorkerContext workerContext, ETLStage eTLStage) throws Exception {
        String str = (String) workerContext.getRuntimeArguments().get(Constants.Source.PLUGINID);
        this.source = (RealtimeSource) workerContext.newPluginInstance(str);
        WorkerRealtimeContext workerRealtimeContext = new WorkerRealtimeContext(workerContext, this.metrics, str);
        LOG.info("Source Stage : {}", eTLStage.getName());
        LOG.info("Source Class : {}", this.source.getClass().getName());
        this.source.initialize((RealtimeContext) workerRealtimeContext);
        this.sourceEmitter = new DefaultEmitter(new StageMetrics(this.metrics, StageMetrics.Type.SOURCE, eTLStage.getName()));
    }

    private void initializeSink(WorkerContext workerContext, ETLStage eTLStage) throws Exception {
        String str = (String) workerContext.getRuntimeArguments().get(Constants.Sink.PLUGINID);
        this.sink = (RealtimeSink) workerContext.newPluginInstance(str);
        WorkerRealtimeContext workerRealtimeContext = new WorkerRealtimeContext(workerContext, this.metrics, str);
        LOG.info("Sink Stage : {}", eTLStage.getName());
        LOG.info("Sink Class : {}", this.sink.getClass().getName());
        this.sink.initialize((RealtimeContext) workerRealtimeContext);
        this.sink = new TrackedRealtimeSink(this.sink, this.metrics, eTLStage.getName());
    }

    private List<Transformation> initializeTransforms(WorkerContext workerContext, List<ETLStage> list) throws Exception {
        List list2 = (List) GSON.fromJson((String) workerContext.getRuntimeArguments().get(Constants.Transform.PLUGINIDS), STRING_LIST_TYPE);
        ArrayList newArrayList = Lists.newArrayList();
        Preconditions.checkArgument(list2 != null);
        Preconditions.checkArgument(list.size() == list2.size());
        this.transformMetrics = Lists.newArrayListWithCapacity(list.size());
        for (int i = 0; i < list.size(); i++) {
            ETLStage eTLStage = list.get(i);
            String str = (String) list2.get(i);
            try {
                Transform transform = (Transform) workerContext.newPluginInstance(str);
                RealtimeTransformContext realtimeTransformContext = new RealtimeTransformContext(workerContext, this.metrics, str);
                LOG.info("Transform Stage : {}", eTLStage.getName());
                LOG.info("Transform Class : {}", transform.getClass().getName());
                transform.initialize((TransformContext) realtimeTransformContext);
                newArrayList.add(transform);
                this.transformMetrics.add(new StageMetrics(this.metrics, StageMetrics.Type.TRANSFORM, eTLStage.getName()));
            } catch (InstantiationException e) {
                LOG.error("Unable to instantiate Transform : {}", eTLStage.getName(), e);
                Throwables.propagate(e);
            }
        }
        return newArrayList;
    }

    public void run() {
        final SourceState sourceState = new SourceState();
        final SourceState sourceState2 = new SourceState();
        final ArrayList newArrayList = Lists.newArrayList();
        this.running = true;
        getContext().execute(new TxRunnable() { // from class: co.cask.cdap.template.etl.realtime.ETLWorker.3
            public void run(DatasetContext datasetContext) throws Exception {
                byte[] read = datasetContext.getDataset(ETLRealtimeTemplate.STATE_TABLE).read(ETLWorker.this.stateStoreKeyBytes);
                if (read != null) {
                    sourceState.setState((SourceState) ETLWorker.GSON.fromJson(Bytes.toString(read), SourceState.class));
                }
            }
        });
        while (this.running) {
            try {
                SourceState poll = this.source.poll(this.sourceEmitter, new SourceState(sourceState));
                if (poll != null) {
                    sourceState2.setState(poll);
                }
                Iterator it = this.sourceEmitter.iterator();
                while (it.hasNext()) {
                    Object next = it.next();
                    try {
                        Iterator it2 = this.transformExecutor.runOneIteration(next).iterator();
                        while (it2.hasNext()) {
                            newArrayList.add(it2.next());
                        }
                    } catch (Exception e) {
                        LOG.warn("Adapter {} : Exception thrown while processing data {}", new Object[]{this.adapterName, next, e});
                    }
                }
                this.sourceEmitter.reset();
                try {
                    try {
                        if (!newArrayList.isEmpty() || !sourceState2.equals(sourceState)) {
                            getContext().execute(new TxRunnable() { // from class: co.cask.cdap.template.etl.realtime.ETLWorker.4
                                public void run(DatasetContext datasetContext) throws Exception {
                                    if (!newArrayList.isEmpty()) {
                                        ETLWorker.this.sink.write(newArrayList, new DefaultDataWriter(ETLWorker.this.getContext(), datasetContext));
                                    }
                                    if (sourceState2.equals(sourceState)) {
                                        return;
                                    }
                                    datasetContext.getDataset(ETLRealtimeTemplate.STATE_TABLE).write(ETLWorker.this.stateStoreKey, ETLWorker.GSON.toJson(sourceState2));
                                }
                            });
                            sourceState.setState(sourceState2);
                        }
                        newArrayList.clear();
                    } catch (Exception e2) {
                        LOG.warn("Adapter {} : Exception thrown during persisting of data", this.adapterName, e2);
                        newArrayList.clear();
                    }
                } catch (Throwable th) {
                    newArrayList.clear();
                    throw th;
                }
            } catch (Exception e3) {
                LOG.warn("Adapter {} : Exception thrown during polling of Source for data", this.adapterName, e3);
                this.sourceEmitter.reset();
            }
        }
    }

    public void stop() {
        this.running = false;
    }

    public void destroy() {
        Destroyables.destroyQuietly(this.source);
        Destroyables.destroyQuietly(this.transformExecutor);
        Destroyables.destroyQuietly(this.sink);
    }
}
