package co.cask.cdap.etl.realtime;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.dataset.table.Put;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.dataset.table.TableProperties;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.worker.AbstractWorker;
import co.cask.cdap.api.worker.WorkerContext;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.InvalidEntry;
import co.cask.cdap.etl.api.Transform;
import co.cask.cdap.etl.api.TransformContext;
import co.cask.cdap.etl.api.Transformation;
import co.cask.cdap.etl.api.realtime.RealtimeContext;
import co.cask.cdap.etl.api.realtime.RealtimeSink;
import co.cask.cdap.etl.api.realtime.RealtimeSource;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DefaultStageMetrics;
import co.cask.cdap.etl.common.Destroyables;
import co.cask.cdap.etl.common.LoggedTransform;
import co.cask.cdap.etl.common.PipelinePhase;
import co.cask.cdap.etl.common.SetMultimapCodec;
import co.cask.cdap.etl.common.TrackedTransform;
import co.cask.cdap.etl.common.TransformDetail;
import co.cask.cdap.etl.common.TransformExecutor;
import co.cask.cdap.etl.common.TxLookupProvider;
import co.cask.cdap.etl.log.LogStageInjector;
import co.cask.cdap.etl.planner.PipelinePlan;
import co.cask.cdap.etl.planner.PipelinePlanner;
import co.cask.cdap.etl.planner.StageInfo;
import co.cask.cdap.etl.proto.v2.ETLRealtimeConfig;
import co.cask.cdap.etl.spec.PipelineSpec;
import co.cask.cdap.etl.spec.StageSpec;
import co.cask.cdap.format.StructuredRecordStringConverter;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.SetMultimap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/etl/realtime/ETLWorker.class */
public class ETLWorker extends AbstractWorker {
    private static final String SEPARATOR = ":";
    private static final String UNIQUE_ID = "uniqueid";
    private final ETLRealtimeConfig config;
    private Metrics metrics;
    private RealtimeSource<Object> source;
    private String sourceStageName;
    private Map<String, RealtimeSink> sinks;
    private TransformExecutor transformExecutor;
    private String stateStoreKey;
    private byte[] stateStoreKeyBytes;
    private String appName;
    private Map<String, String> tranformIdToDatasetName;
    private volatile boolean stopped;
    public static final String NAME = ETLWorker.class.getSimpleName();
    private static final Logger LOG = LoggerFactory.getLogger(ETLWorker.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).registerTypeAdapter(SetMultimap.class, new SetMultimapCodec()).create();
    private static final Schema ERROR_SCHEMA = Schema.recordOf("error", Schema.Field.of(Constants.ErrorDataset.ERRCODE, Schema.of(Schema.Type.INT)), Schema.Field.of(Constants.ErrorDataset.ERRMSG, Schema.unionOf(Schema.of(Schema.Type.STRING), Schema.of(Schema.Type.NULL))), Schema.Field.of(Constants.ErrorDataset.INVALIDENTRY, Schema.of(Schema.Type.STRING)));
    private static final Set<String> SUPPORTED_PLUGIN_TYPES = ImmutableSet.of(RealtimeSource.PLUGIN_TYPE, RealtimeSink.PLUGIN_TYPE, Transform.PLUGIN_TYPE);

    public ETLWorker(ETLRealtimeConfig eTLRealtimeConfig) {
        this.config = eTLRealtimeConfig;
    }

    public void configure() {
        setName(NAME);
        setDescription("Worker Driver for Realtime ETL Pipelines");
        int instances = this.config.getInstances();
        if (instances < 1) {
            throw new IllegalArgumentException("instances must be greater than 0.");
        }
        setInstances(instances);
        if (this.config.getResources() != null) {
            setResources(this.config.getResources());
        }
        PipelineSpec generateSpec = new RealtimePipelineSpecGenerator(getConfigurer(), ImmutableSet.of(RealtimeSource.PLUGIN_TYPE), ImmutableSet.of(RealtimeSink.PLUGIN_TYPE), Table.class, TableProperties.builder().setSchema(ERROR_SCHEMA).build()).generateSpec((RealtimePipelineSpecGenerator) this.config);
        int i = 0;
        Iterator<StageSpec> it = generateSpec.getStages().iterator();
        while (it.hasNext()) {
            if (RealtimeSource.PLUGIN_TYPE.equals(it.next().getPlugin().getType())) {
                i++;
            }
        }
        if (i != 1) {
            throw new IllegalArgumentException("Invalid pipeline. There must only be one source.");
        }
        PipelinePlan plan = new PipelinePlanner(SUPPORTED_PLUGIN_TYPES, ImmutableSet.of(), ImmutableSet.of(), ImmutableSet.of()).plan(generateSpec);
        if (plan.getPhases().size() != 1) {
            throw new IllegalArgumentException("There was an error planning the pipeline. There should only be one phase.");
        }
        PipelinePhase next = plan.getPhases().values().iterator().next();
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.PIPELINE_SPEC_KEY, GSON.toJson(generateSpec));
        hashMap.put(Constants.PIPELINEID, GSON.toJson(next));
        hashMap.put(UNIQUE_ID, String.valueOf(System.currentTimeMillis()));
        hashMap.put(Constants.STAGE_LOGGING_ENABLED, String.valueOf(this.config.isStageLoggingEnabled()));
        setProperties(hashMap);
    }

    public void initialize(WorkerContext workerContext) throws Exception {
        if (Boolean.valueOf(workerContext.getSpecification().getProperty(Constants.STAGE_LOGGING_ENABLED)).booleanValue()) {
            LogStageInjector.start();
        }
        super.initialize(workerContext);
        Map properties = workerContext.getSpecification().getProperties();
        this.appName = workerContext.getApplicationSpecification().getName();
        Preconditions.checkArgument(properties.containsKey(Constants.PIPELINEID));
        Preconditions.checkArgument(properties.containsKey(UNIQUE_ID));
        String str = (String) properties.get(UNIQUE_ID);
        final String name = workerContext.getApplicationSpecification().getName();
        this.stateStoreKey = String.format("%s%s%s%s%s", name, ":", str, ":", Integer.valueOf(workerContext.getInstanceId()));
        this.stateStoreKeyBytes = Bytes.toBytes(this.stateStoreKey);
        getContext().execute(new TxRunnable() { // from class: co.cask.cdap.etl.realtime.ETLWorker.1
            public void run(DatasetContext datasetContext) throws Exception {
                KeyValueTable dataset = datasetContext.getDataset(ETLRealtimeApplication.STATE_TABLE);
                byte[] bytes = Bytes.toBytes(String.format("%s%s", name, ":"));
                CloseableIterator scan = dataset.scan(bytes, Bytes.stopKeyForPrefix(bytes));
                Throwable th = null;
                while (scan.hasNext()) {
                    try {
                        try {
                            KeyValue keyValue = (KeyValue) scan.next();
                            if (Bytes.compareTo(ETLWorker.this.stateStoreKeyBytes, (byte[]) keyValue.getKey()) != 0) {
                                dataset.delete((byte[]) keyValue.getKey());
                            }
                        } catch (Throwable th2) {
                            if (scan != null) {
                                if (th != null) {
                                    try {
                                        scan.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    scan.close();
                                }
                            }
                            throw th2;
                        }
                    } catch (Throwable th4) {
                        th = th4;
                        throw th4;
                    }
                }
                if (scan != null) {
                    if (0 == 0) {
                        scan.close();
                        return;
                    }
                    try {
                        scan.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                }
            }
        });
        PipelinePhase pipelinePhase = (PipelinePhase) GSON.fromJson((String) properties.get(Constants.PIPELINEID), PipelinePhase.class);
        HashMap hashMap = new HashMap();
        initializeSource(workerContext, pipelinePhase);
        initializeTransforms(workerContext, hashMap, pipelinePhase);
        initializeSinks(workerContext, hashMap, pipelinePhase);
        HashSet hashSet = new HashSet();
        hashSet.addAll(pipelinePhase.getStageOutputs(this.sourceStageName));
        this.transformExecutor = new TransformExecutor(hashMap, hashSet);
    }

    private void initializeSource(WorkerContext workerContext, PipelinePhase pipelinePhase) throws Exception {
        String name = pipelinePhase.getStagesOfType(RealtimeSource.PLUGIN_TYPE).iterator().next().getName();
        this.source = (RealtimeSource) workerContext.newPluginInstance(name);
        this.source = new LoggedRealtimeSource(name, this.source);
        WorkerRealtimeContext workerRealtimeContext = new WorkerRealtimeContext(workerContext, this.metrics, new TxLookupProvider(workerContext), pipelinePhase.getStage(name));
        this.sourceStageName = name;
        LOG.debug("Source Class : {}", this.source.getClass().getName());
        this.source.initialize((RealtimeContext) workerRealtimeContext);
    }

    private void initializeSinks(WorkerContext workerContext, Map<String, TransformDetail> map, PipelinePhase pipelinePhase) throws Exception {
        Set<StageInfo> stagesOfType = pipelinePhase.getStagesOfType(RealtimeSink.PLUGIN_TYPE);
        this.sinks = new HashMap(stagesOfType.size());
        for (StageInfo stageInfo : stagesOfType) {
            String name = stageInfo.getName();
            LoggedRealtimeSink loggedRealtimeSink = new LoggedRealtimeSink(name, (RealtimeSink) workerContext.newPluginInstance(name));
            WorkerRealtimeContext workerRealtimeContext = new WorkerRealtimeContext(workerContext, this.metrics, new TxLookupProvider(workerContext), stageInfo);
            LOG.debug("Sink Class : {}", loggedRealtimeSink.getClass().getName());
            loggedRealtimeSink.initialize((RealtimeContext) workerRealtimeContext);
            TrackedRealtimeSink trackedRealtimeSink = new TrackedRealtimeSink(loggedRealtimeSink, new DefaultStageMetrics(this.metrics, name));
            map.put(stageInfo.getName(), new TransformDetail(new TrackedTransform(new Transformation() { // from class: co.cask.cdap.etl.realtime.ETLWorker.2
                @Override // co.cask.cdap.etl.api.Transformation
                public void transform(Object obj, Emitter emitter) throws Exception {
                    emitter.emit(obj);
                }
            }, new DefaultStageMetrics(this.metrics, name), TrackedTransform.RECORDS_IN, null, workerContext.getDataTracer(name)), new HashSet()));
            this.sinks.put(stageInfo.getName(), trackedRealtimeSink);
        }
    }

    private void initializeTransforms(WorkerContext workerContext, Map<String, TransformDetail> map, PipelinePhase pipelinePhase) throws Exception {
        Set<StageInfo> stagesOfType = pipelinePhase.getStagesOfType(Transform.PLUGIN_TYPE);
        Preconditions.checkArgument(stagesOfType != null);
        this.tranformIdToDatasetName = new HashMap(stagesOfType.size());
        for (StageInfo stageInfo : stagesOfType) {
            String name = stageInfo.getName();
            try {
                LoggedTransform loggedTransform = new LoggedTransform(name, (Transform) workerContext.newPluginInstance(name));
                WorkerRealtimeContext workerRealtimeContext = new WorkerRealtimeContext(workerContext, this.metrics, new TxLookupProvider(workerContext), stageInfo);
                LOG.debug("Transform Class : {}", loggedTransform.getClass().getName());
                loggedTransform.initialize((TransformContext) workerRealtimeContext);
                map.put(name, new TransformDetail(new TrackedTransform(loggedTransform, new DefaultStageMetrics(this.metrics, name), workerContext.getDataTracer(name)), pipelinePhase.getStageOutputs(name)));
                if (stageInfo.getErrorDatasetName() != null) {
                    this.tranformIdToDatasetName.put(name, stageInfo.getErrorDatasetName());
                }
            } catch (InstantiationException e) {
                LOG.error("Unable to instantiate Transform", (Throwable) e);
                Throwables.propagate(e);
            }
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:75:0x0266, code lost:
    
        if (r0.equals(r0) == false) goto L61;
     */
    /* JADX WARN: Finally extract failed */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run() {
        /*
            Method dump skipped, instructions count: 838
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: co.cask.cdap.etl.realtime.ETLWorker.run():void");
    }

    private Map<String, List<InvalidEntry>> intializeTransformIdToErrorsList() {
        HashMap hashMap = new HashMap();
        Iterator<String> it = this.tranformIdToDatasetName.keySet().iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), new ArrayList());
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Put constructErrorPut(byte[] bArr, InvalidEntry invalidEntry, long j) throws IOException {
        Put put = new Put(bArr);
        put.add(Constants.ErrorDataset.ERRCODE, invalidEntry.getErrorCode());
        put.add(Constants.ErrorDataset.TIMESTAMP, j);
        if (invalidEntry.getInvalidRecord() instanceof StructuredRecord) {
            put.add(Constants.ErrorDataset.INVALIDENTRY, StructuredRecordStringConverter.toJsonString((StructuredRecord) invalidEntry.getInvalidRecord()));
        } else {
            put.add(Constants.ErrorDataset.INVALIDENTRY, String.format("Error Entry is of type %s, only records of type co.cask.cdap.api.data.format.StructuredRecord is supported currently", invalidEntry.getInvalidRecord().getClass().getName()));
        }
        return put;
    }

    public void stop() {
        this.stopped = true;
    }

    public void destroy() {
        Destroyables.destroyQuietly(this.source);
        Destroyables.destroyQuietly(this.transformExecutor);
        Iterator<RealtimeSink> it = this.sinks.values().iterator();
        while (it.hasNext()) {
            Destroyables.destroyQuietly(it.next());
        }
    }
}
