package co.cask.cdap.etl.realtime;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.dataset.table.Put;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.worker.AbstractWorker;
import co.cask.cdap.api.worker.WorkerContext;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.InvalidEntry;
import co.cask.cdap.etl.api.Transform;
import co.cask.cdap.etl.api.TransformContext;
import co.cask.cdap.etl.api.Transformation;
import co.cask.cdap.etl.api.realtime.RealtimeContext;
import co.cask.cdap.etl.api.realtime.RealtimeSink;
import co.cask.cdap.etl.api.realtime.RealtimeSource;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DefaultEmitter;
import co.cask.cdap.etl.common.DefaultStageMetrics;
import co.cask.cdap.etl.common.Destroyables;
import co.cask.cdap.etl.common.LoggedTransform;
import co.cask.cdap.etl.common.NoopMetrics;
import co.cask.cdap.etl.common.Pipeline;
import co.cask.cdap.etl.common.PipelineRegisterer;
import co.cask.cdap.etl.common.SinkInfo;
import co.cask.cdap.etl.common.TransformDetail;
import co.cask.cdap.etl.common.TransformExecutor;
import co.cask.cdap.etl.common.TransformInfo;
import co.cask.cdap.etl.common.TxLookupProvider;
import co.cask.cdap.etl.log.LogStageInjector;
import co.cask.cdap.etl.realtime.config.ETLRealtimeConfig;
import co.cask.cdap.format.StructuredRecordStringConverter;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.gson.Gson;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/etl/realtime/ETLWorker.class */
public class ETLWorker extends AbstractWorker {
    private static final String SEPARATOR = ":";
    private final ETLRealtimeConfig config;
    private Metrics metrics;
    private RealtimeSource<Object> source;
    private String sourceStageName;
    private Map<String, RealtimeSink> sinks;
    private TransformExecutor transformExecutor;
    private DefaultEmitter sourceEmitter;
    private String stateStoreKey;
    private byte[] stateStoreKeyBytes;
    private String appName;
    private Map<String, String> tranformIdToDatasetName;
    private volatile boolean stopped;
    public static final String NAME = ETLWorker.class.getSimpleName();
    private static final Logger LOG = LoggerFactory.getLogger(ETLWorker.class);
    private static final Gson GSON = new Gson();
    private static final Schema ERROR_SCHEMA = Schema.recordOf("error", Schema.Field.of(Constants.ErrorDataset.ERRCODE, Schema.of(Schema.Type.INT)), Schema.Field.of(Constants.ErrorDataset.ERRMSG, Schema.unionOf(Schema.of(Schema.Type.STRING), Schema.of(Schema.Type.NULL))), Schema.Field.of(Constants.ErrorDataset.INVALIDENTRY, Schema.of(Schema.Type.STRING)));

    public ETLWorker(ETLRealtimeConfig eTLRealtimeConfig) {
        this.config = eTLRealtimeConfig;
    }

    public void configure() {
        setName(NAME);
        setDescription("Worker Driver for Realtime ETL Adapters");
        int intValue = this.config.getInstances() != null ? this.config.getInstances().intValue() : 1;
        if (intValue < 1) {
            throw new IllegalArgumentException("instances must be greater than 0.");
        }
        setInstances(intValue);
        if (this.config.getResources() != null) {
            setResources(this.config.getResources());
        }
        Pipeline registerPlugins = new PipelineRegisterer(getConfigurer(), "realtime").registerPlugins(this.config, Table.class, DatasetProperties.builder().add("schema", ERROR_SCHEMA.toString()).build(), false);
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.PIPELINEID, GSON.toJson(registerPlugins));
        hashMap.put(Constants.Realtime.UNIQUE_ID, String.valueOf(System.currentTimeMillis()));
        hashMap.put(Constants.STAGE_LOGGING_ENABLED, String.valueOf(this.config.isStageLoggingEnabled()));
        setProperties(hashMap);
    }

    public void initialize(WorkerContext workerContext) throws Exception {
        if (Boolean.valueOf(workerContext.getSpecification().getProperty(Constants.STAGE_LOGGING_ENABLED)).booleanValue()) {
            LogStageInjector.start();
        }
        super.initialize(workerContext);
        Map properties = workerContext.getSpecification().getProperties();
        this.appName = workerContext.getApplicationSpecification().getName();
        Preconditions.checkArgument(properties.containsKey(Constants.PIPELINEID));
        Preconditions.checkArgument(properties.containsKey(Constants.Realtime.UNIQUE_ID));
        String str = (String) properties.get(Constants.Realtime.UNIQUE_ID);
        final String name = workerContext.getApplicationSpecification().getName();
        this.stateStoreKey = String.format("%s%s%s%s%s", name, ":", str, ":", Integer.valueOf(workerContext.getInstanceId()));
        this.stateStoreKeyBytes = Bytes.toBytes(this.stateStoreKey);
        getContext().execute(new TxRunnable() { // from class: co.cask.cdap.etl.realtime.ETLWorker.1
            public void run(DatasetContext datasetContext) throws Exception {
                KeyValueTable dataset = datasetContext.getDataset(ETLRealtimeApplication.STATE_TABLE);
                byte[] bytes = Bytes.toBytes(String.format("%s%s", name, ":"));
                CloseableIterator scan = dataset.scan(bytes, Bytes.stopKeyForPrefix(bytes));
                while (scan.hasNext()) {
                    try {
                        KeyValue keyValue = (KeyValue) scan.next();
                        if (Bytes.compareTo(ETLWorker.this.stateStoreKeyBytes, (byte[]) keyValue.getKey()) != 0) {
                            dataset.delete((byte[]) keyValue.getKey());
                        }
                    } finally {
                        scan.close();
                    }
                }
            }
        });
        Map<String, List<String>> connections = ((Pipeline) GSON.fromJson((String) properties.get(Constants.PIPELINEID), Pipeline.class)).getConnections();
        HashMap hashMap = new HashMap();
        new DefaultEmitter(this.metrics);
        initializeSource(workerContext);
        initializeTransforms(workerContext, hashMap, connections);
        initializeSinks(workerContext, hashMap);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(connections.get(this.sourceStageName));
        this.transformExecutor = new TransformExecutor(hashMap, arrayList);
    }

    private void initializeSource(WorkerContext workerContext) throws Exception {
        String source = ((Pipeline) GSON.fromJson(workerContext.getSpecification().getProperty(Constants.PIPELINEID), Pipeline.class)).getSource();
        this.source = (RealtimeSource) workerContext.newPluginInstance(source);
        this.source = new LoggedRealtimeSource(source, this.source);
        WorkerRealtimeContext workerRealtimeContext = new WorkerRealtimeContext(workerContext, this.metrics, new TxLookupProvider(workerContext), source);
        this.sourceStageName = source;
        LOG.debug("Source Class : {}", this.source.getClass().getName());
        this.source.initialize((RealtimeContext) workerRealtimeContext);
        this.sourceEmitter = new DefaultEmitter(workerRealtimeContext.getMetrics());
    }

    private void initializeSinks(WorkerContext workerContext, Map<String, TransformDetail> map) throws Exception {
        List<SinkInfo> sinks = ((Pipeline) GSON.fromJson(workerContext.getSpecification().getProperty(Constants.PIPELINEID), Pipeline.class)).getSinks();
        this.sinks = new HashMap(sinks.size());
        for (SinkInfo sinkInfo : sinks) {
            String sinkId = sinkInfo.getSinkId();
            LoggedRealtimeSink loggedRealtimeSink = new LoggedRealtimeSink(sinkId, (RealtimeSink) workerContext.newPluginInstance(sinkId));
            WorkerRealtimeContext workerRealtimeContext = new WorkerRealtimeContext(workerContext, this.metrics, new TxLookupProvider(workerContext), sinkId);
            LOG.debug("Sink Class : {}", loggedRealtimeSink.getClass().getName());
            loggedRealtimeSink.initialize((RealtimeContext) workerRealtimeContext);
            TrackedRealtimeSink trackedRealtimeSink = new TrackedRealtimeSink(loggedRealtimeSink, workerRealtimeContext.getMetrics());
            map.put(sinkInfo.getSinkId(), new TransformDetail(new Transformation() { // from class: co.cask.cdap.etl.realtime.ETLWorker.2
                @Override // co.cask.cdap.etl.api.Transformation
                public void transform(Object obj, Emitter emitter) throws Exception {
                    emitter.emit(obj);
                }
            }, new NoopMetrics(), new ArrayList()));
            this.sinks.put(sinkInfo.getSinkId(), trackedRealtimeSink);
        }
    }

    private void initializeTransforms(WorkerContext workerContext, Map<String, TransformDetail> map, Map<String, List<String>> map2) throws Exception {
        List<TransformInfo> transforms = ((Pipeline) GSON.fromJson(workerContext.getSpecification().getProperty(Constants.PIPELINEID), Pipeline.class)).getTransforms();
        Preconditions.checkArgument(transforms != null);
        this.tranformIdToDatasetName = new HashMap(transforms.size());
        for (TransformInfo transformInfo : transforms) {
            String transformId = transformInfo.getTransformId();
            try {
                LoggedTransform loggedTransform = new LoggedTransform(transformId, (Transform) workerContext.newPluginInstance(transformId));
                WorkerRealtimeContext workerRealtimeContext = new WorkerRealtimeContext(workerContext, this.metrics, new TxLookupProvider(workerContext), transformId);
                LOG.debug("Transform Class : {}", loggedTransform.getClass().getName());
                loggedTransform.initialize((TransformContext) workerRealtimeContext);
                map.put(transformId, new TransformDetail(loggedTransform, new DefaultStageMetrics(this.metrics, transformId), map2.get(transformId)));
                if (transformInfo.getErrorDatasetName() != null) {
                    this.tranformIdToDatasetName.put(transformId, transformInfo.getErrorDatasetName());
                }
            } catch (InstantiationException e) {
                LOG.error("Unable to instantiate Transform", (Throwable) e);
                Throwables.propagate(e);
            }
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:72:0x022f, code lost:
    
        if (r0.equals(r0) == false) goto L56;
     */
    /* JADX WARN: Finally extract failed */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run() {
        /*
            Method dump skipped, instructions count: 783
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: co.cask.cdap.etl.realtime.ETLWorker.run():void");
    }

    private Map<String, List<InvalidEntry>> intializeTransformIdToErrorsList() {
        HashMap hashMap = new HashMap();
        Iterator<String> it = this.tranformIdToDatasetName.keySet().iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), new ArrayList());
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Put constructErrorPut(byte[] bArr, InvalidEntry invalidEntry, long j) throws IOException {
        Put put = new Put(bArr);
        put.add(Constants.ErrorDataset.ERRCODE, invalidEntry.getErrorCode());
        put.add(Constants.ErrorDataset.TIMESTAMP, j);
        if (invalidEntry.getInvalidRecord() instanceof StructuredRecord) {
            put.add(Constants.ErrorDataset.INVALIDENTRY, StructuredRecordStringConverter.toJsonString((StructuredRecord) invalidEntry.getInvalidRecord()));
        } else {
            put.add(Constants.ErrorDataset.INVALIDENTRY, String.format("Error Entry is of type %s, only records of type co.cask.cdap.api.data.format.StructuredRecord is supported currently", invalidEntry.getInvalidRecord().getClass().getName()));
        }
        return put;
    }

    public void stop() {
        this.stopped = true;
    }

    public void destroy() {
        Destroyables.destroyQuietly(this.source);
        Destroyables.destroyQuietly(this.transformExecutor);
        Iterator<RealtimeSink> it = this.sinks.values().iterator();
        while (it.hasNext()) {
            Destroyables.destroyQuietly(it.next());
        }
    }
}
