package co.cask.cdap.etl.realtime;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.dataset.table.Put;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.worker.AbstractWorker;
import co.cask.cdap.api.worker.WorkerContext;
import co.cask.cdap.etl.api.InvalidEntry;
import co.cask.cdap.etl.api.Transform;
import co.cask.cdap.etl.api.TransformContext;
import co.cask.cdap.etl.api.realtime.RealtimeContext;
import co.cask.cdap.etl.api.realtime.RealtimeSink;
import co.cask.cdap.etl.api.realtime.RealtimeSource;
import co.cask.cdap.etl.api.realtime.SourceState;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DefaultEmitter;
import co.cask.cdap.etl.common.Destroyables;
import co.cask.cdap.etl.common.Pipeline;
import co.cask.cdap.etl.common.PipelineRegisterer;
import co.cask.cdap.etl.common.PluginID;
import co.cask.cdap.etl.common.SinkInfo;
import co.cask.cdap.etl.common.StageMetrics;
import co.cask.cdap.etl.common.StructuredRecordStringConverter;
import co.cask.cdap.etl.common.TransformDetail;
import co.cask.cdap.etl.common.TransformExecutor;
import co.cask.cdap.etl.common.TransformInfo;
import co.cask.cdap.etl.common.TransformResponse;
import co.cask.cdap.etl.realtime.config.ETLRealtimeConfig;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/etl/realtime/ETLWorker.class */
public class ETLWorker extends AbstractWorker {
    private static final String SEPARATOR = ":";
    private final ETLRealtimeConfig config;
    private RealtimeSource source;
    private List<RealtimeSink> sinks;
    private List<Metrics> transformMetrics;
    private TransformExecutor transformExecutor;
    private DefaultEmitter sourceEmitter;
    private String stateStoreKey;
    private byte[] stateStoreKeyBytes;
    private String appName;
    private Metrics metrics;
    private List<TransformDetail> transformationDetailList;
    private Map<String, String> tranformIdToDatasetName;
    private volatile boolean stopped;
    public static final String NAME = ETLWorker.class.getSimpleName();
    private static final Logger LOG = LoggerFactory.getLogger(ETLWorker.class);
    private static final Type TRANSFORMDETAILS_LIST_TYPE = new TypeToken<List<TransformInfo>>() { // from class: co.cask.cdap.etl.realtime.ETLWorker.1
    }.getType();
    private static final Type SINK_INFO_TYPE = new TypeToken<List<SinkInfo>>() { // from class: co.cask.cdap.etl.realtime.ETLWorker.2
    }.getType();
    private static final Gson GSON = new Gson();
    private static final Schema ERROR_SCHEMA = Schema.recordOf("error", new Schema.Field[]{Schema.Field.of(Constants.ErrorDataset.ERRCODE, Schema.of(Schema.Type.INT)), Schema.Field.of(Constants.ErrorDataset.ERRMSG, Schema.unionOf(new Schema[]{Schema.of(Schema.Type.STRING), Schema.of(Schema.Type.NULL)})), Schema.Field.of(Constants.ErrorDataset.INVALIDENTRY, Schema.of(Schema.Type.STRING))});

    public ETLWorker(ETLRealtimeConfig eTLRealtimeConfig) {
        this.config = eTLRealtimeConfig;
    }

    public void configure() {
        setName(NAME);
        setDescription("Worker Driver for Realtime ETL Adapters");
        int intValue = this.config.getInstances() != null ? this.config.getInstances().intValue() : 1;
        if (intValue < 1) {
            throw new IllegalArgumentException("instances must be greater than 0.");
        }
        setInstances(intValue);
        if (this.config.getResources() != null) {
            setResources(this.config.getResources());
        }
        Pipeline registerPlugins = new PipelineRegisterer(getConfigurer(), "realtime").registerPlugins(this.config, Table.class, DatasetProperties.builder().add("schema", ERROR_SCHEMA.toString()).build(), false);
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.Source.PLUGINID, registerPlugins.getSource());
        hashMap.put(Constants.Sink.PLUGINIDS, GSON.toJson(registerPlugins.getSinks()));
        hashMap.put(Constants.Transform.PLUGINIDS, GSON.toJson(registerPlugins.getTransforms()));
        hashMap.put(Constants.Realtime.UNIQUE_ID, String.valueOf(System.currentTimeMillis()));
        setProperties(hashMap);
    }

    public void initialize(WorkerContext workerContext) throws Exception {
        super.initialize(workerContext);
        Map properties = workerContext.getSpecification().getProperties();
        this.appName = workerContext.getApplicationSpecification().getName();
        Preconditions.checkArgument(properties.containsKey(Constants.Source.PLUGINID));
        Preconditions.checkArgument(properties.containsKey(Constants.Sink.PLUGINIDS));
        Preconditions.checkArgument(properties.containsKey(Constants.Transform.PLUGINIDS));
        Preconditions.checkArgument(properties.containsKey(Constants.Realtime.UNIQUE_ID));
        String str = (String) properties.get(Constants.Realtime.UNIQUE_ID);
        final String name = workerContext.getApplicationSpecification().getName();
        this.stateStoreKey = String.format("%s%s%s%s%s", name, ":", str, ":", Integer.valueOf(workerContext.getInstanceId()));
        this.stateStoreKeyBytes = Bytes.toBytes(this.stateStoreKey);
        getContext().execute(new TxRunnable() { // from class: co.cask.cdap.etl.realtime.ETLWorker.3
            public void run(DatasetContext datasetContext) throws Exception {
                KeyValueTable dataset = datasetContext.getDataset(ETLRealtimeApplication.STATE_TABLE);
                byte[] bytes = Bytes.toBytes(String.format("%s%s", name, ":"));
                CloseableIterator scan = dataset.scan(bytes, Bytes.stopKeyForPrefix(bytes));
                while (scan.hasNext()) {
                    try {
                        KeyValue keyValue = (KeyValue) scan.next();
                        if (Bytes.compareTo(ETLWorker.this.stateStoreKeyBytes, (byte[]) keyValue.getKey()) != 0) {
                            dataset.delete((byte[]) keyValue.getKey());
                        }
                    } finally {
                        scan.close();
                    }
                }
            }
        });
        initializeSource(workerContext);
        this.transformationDetailList = initializeTransforms(workerContext);
        initializeSinks(workerContext);
        this.transformExecutor = new TransformExecutor(this.transformationDetailList);
    }

    private void initializeSource(WorkerContext workerContext) throws Exception {
        String property = workerContext.getSpecification().getProperty(Constants.Source.PLUGINID);
        this.source = (RealtimeSource) workerContext.newPluginInstance(property);
        WorkerRealtimeContext workerRealtimeContext = new WorkerRealtimeContext(workerContext, this.metrics, property);
        LOG.debug("Source Class : {}", this.source.getClass().getName());
        this.source.initialize((RealtimeContext) workerRealtimeContext);
        this.sourceEmitter = new DefaultEmitter(new StageMetrics(this.metrics, PluginID.from(property)));
    }

    private void initializeSinks(WorkerContext workerContext) throws Exception {
        List<SinkInfo> list = (List) GSON.fromJson(workerContext.getSpecification().getProperty(Constants.Sink.PLUGINIDS), SINK_INFO_TYPE);
        this.sinks = Lists.newArrayListWithCapacity(list.size());
        for (SinkInfo sinkInfo : list) {
            RealtimeSink realtimeSink = (RealtimeSink) workerContext.newPluginInstance(sinkInfo.getSinkId());
            WorkerRealtimeContext workerRealtimeContext = new WorkerRealtimeContext(workerContext, this.metrics, sinkInfo.getSinkId());
            LOG.debug("Sink Class : {}", realtimeSink.getClass().getName());
            realtimeSink.initialize((RealtimeContext) workerRealtimeContext);
            this.sinks.add(new TrackedRealtimeSink(realtimeSink, this.metrics, PluginID.from(sinkInfo.getSinkId())));
        }
    }

    private List<TransformDetail> initializeTransforms(WorkerContext workerContext) throws Exception {
        List list = (List) GSON.fromJson(workerContext.getSpecification().getProperty(Constants.Transform.PLUGINIDS), TRANSFORMDETAILS_LIST_TYPE);
        Preconditions.checkArgument(list != null);
        ArrayList arrayList = new ArrayList(list.size());
        this.tranformIdToDatasetName = new HashMap(list.size());
        for (int i = 0; i < list.size(); i++) {
            String transformId = ((TransformInfo) list.get(i)).getTransformId();
            try {
                Transform transform = (Transform) workerContext.newPluginInstance(transformId);
                RealtimeTransformContext realtimeTransformContext = new RealtimeTransformContext(workerContext, this.metrics, transformId);
                LOG.debug("Transform Class : {}", transform.getClass().getName());
                transform.initialize((TransformContext) realtimeTransformContext);
                arrayList.add(new TransformDetail(transformId, transform, new StageMetrics(this.metrics, PluginID.from(transformId))));
                if (((TransformInfo) list.get(i)).getErrorDatasetName() != null) {
                    this.tranformIdToDatasetName.put(transformId, ((TransformInfo) list.get(i)).getErrorDatasetName());
                }
            } catch (InstantiationException e) {
                LOG.error("Unable to instantiate Transform", e);
                Throwables.propagate(e);
            }
        }
        return arrayList;
    }

    public void run() {
        final SourceState sourceState = new SourceState();
        final SourceState sourceState2 = new SourceState();
        final ArrayList newArrayList = Lists.newArrayList();
        final Map<String, List<InvalidEntry>> intializeTransformIdToErrorsList = intializeTransformIdToErrorsList();
        HashSet newHashSet = Sets.newHashSet();
        getContext().execute(new TxRunnable() { // from class: co.cask.cdap.etl.realtime.ETLWorker.4
            public void run(DatasetContext datasetContext) throws Exception {
                byte[] read = datasetContext.getDataset(ETLRealtimeApplication.STATE_TABLE).read(ETLWorker.this.stateStoreKeyBytes);
                if (read != null) {
                    sourceState.setState((SourceState) ETLWorker.GSON.fromJson(Bytes.toString(read), SourceState.class));
                }
            }
        });
        while (!this.stopped) {
            try {
                SourceState poll = this.source.poll(this.sourceEmitter, new SourceState(sourceState));
                if (poll != null) {
                    sourceState2.setState(poll);
                }
                Iterator it = this.sourceEmitter.iterator();
                while (it.hasNext()) {
                    Object next = it.next();
                    try {
                        TransformResponse runOneIteration = this.transformExecutor.runOneIteration(next);
                        while (runOneIteration.getEmittedRecords().hasNext()) {
                            newArrayList.add(runOneIteration.getEmittedRecords().next());
                        }
                        for (Map.Entry entry : runOneIteration.getMapTransformIdToErrorEmitter().entrySet()) {
                            String str = (String) entry.getKey();
                            if (this.tranformIdToDatasetName.containsKey(str)) {
                                intializeTransformIdToErrorsList.get(str).addAll((Collection) entry.getValue());
                            } else if (!newHashSet.contains(str)) {
                                LOG.warn("Error records were emitted in transform {}, but error dataset is not configured for this transform", str);
                            }
                        }
                    } catch (Exception e) {
                        LOG.warn("Exception thrown while processing data {}", next, e);
                    }
                }
                this.sourceEmitter.reset();
            } catch (Exception e2) {
                LOG.warn("Exception thrown during polling of Source for data", e2);
                this.sourceEmitter.reset();
            }
            try {
                try {
                    if (!newArrayList.isEmpty() || !sourceState2.equals(sourceState)) {
                        getContext().execute(new TxRunnable() { // from class: co.cask.cdap.etl.realtime.ETLWorker.5
                            /* JADX WARN: Type inference failed for: r0v51, types: [byte[], byte[][]] */
                            public void run(DatasetContext datasetContext) throws Exception {
                                if (!newArrayList.isEmpty()) {
                                    DefaultDataWriter defaultDataWriter = new DefaultDataWriter(ETLWorker.this.getContext(), datasetContext);
                                    Iterator it2 = ETLWorker.this.sinks.iterator();
                                    while (it2.hasNext()) {
                                        ((RealtimeSink) it2.next()).write(newArrayList, defaultDataWriter);
                                    }
                                }
                                for (Map.Entry entry2 : intializeTransformIdToErrorsList.entrySet()) {
                                    String str2 = (String) entry2.getKey();
                                    Table dataset = datasetContext.getDataset((String) ETLWorker.this.tranformIdToDatasetName.get(str2));
                                    long currentTimeMillis = System.currentTimeMillis();
                                    byte[] bytes = Bytes.toBytes(currentTimeMillis);
                                    String str3 = ETLWorker.this.appName + ":" + str2;
                                    for (InvalidEntry invalidEntry : (List) entry2.getValue()) {
                                        byte[] concat = Bytes.concat((byte[][]) new byte[]{bytes, Bytes.toBytes(str3), Bytes.toBytes(UUID.randomUUID())});
                                        dataset.write(concat, ETLWorker.this.constructErrorPut(concat, invalidEntry, currentTimeMillis));
                                    }
                                }
                                if (!sourceState2.equals(sourceState)) {
                                    datasetContext.getDataset(ETLRealtimeApplication.STATE_TABLE).write(ETLWorker.this.stateStoreKey, ETLWorker.GSON.toJson(sourceState2));
                                }
                                ETLWorker.this.transformExecutor.resetEmitters();
                            }
                        });
                        sourceState.setState(sourceState2);
                    }
                    newArrayList.clear();
                    Iterator<List<InvalidEntry>> it2 = intializeTransformIdToErrorsList.values().iterator();
                    while (it2.hasNext()) {
                        it2.next().clear();
                    }
                } catch (Exception e3) {
                    LOG.warn("Exception thrown during persisting of data", e3);
                    newArrayList.clear();
                    Iterator<List<InvalidEntry>> it3 = intializeTransformIdToErrorsList.values().iterator();
                    while (it3.hasNext()) {
                        it3.next().clear();
                    }
                }
            } catch (Throwable th) {
                newArrayList.clear();
                Iterator<List<InvalidEntry>> it4 = intializeTransformIdToErrorsList.values().iterator();
                while (it4.hasNext()) {
                    it4.next().clear();
                }
                throw th;
            }
        }
    }

    private Map<String, List<InvalidEntry>> intializeTransformIdToErrorsList() {
        HashMap hashMap = new HashMap();
        Iterator<String> it = this.tranformIdToDatasetName.keySet().iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), new ArrayList());
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Put constructErrorPut(byte[] bArr, InvalidEntry invalidEntry, long j) throws IOException {
        Put put = new Put(bArr);
        put.add(Constants.ErrorDataset.ERRCODE, invalidEntry.getErrorCode());
        put.add(Constants.ErrorDataset.TIMESTAMP, j);
        if (invalidEntry.getInvalidRecord() instanceof StructuredRecord) {
            put.add(Constants.ErrorDataset.INVALIDENTRY, StructuredRecordStringConverter.toJsonString((StructuredRecord) invalidEntry.getInvalidRecord()));
        } else {
            put.add(Constants.ErrorDataset.INVALIDENTRY, String.format("Error Entry is of type %s, only records of type co.cask.cdap.api.data.format.StructuredRecord is supported currently", invalidEntry.getInvalidRecord().getClass().getName()));
        }
        return put;
    }

    public void stop() {
        this.stopped = true;
    }

    public void destroy() {
        Destroyables.destroyQuietly(this.source);
        Destroyables.destroyQuietly(this.transformExecutor);
        Iterator<RealtimeSink> it = this.sinks.iterator();
        while (it.hasNext()) {
            Destroyables.destroyQuietly(it.next());
        }
    }
}
