package org.apache.hop.pipeline.transforms.eventhubs.listen;

import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import java.util.LinkedList;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.function.Function;
import org.apache.commons.lang.StringUtils;
import org.apache.hop.core.Const;
import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.exception.HopTransformException;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.RowMeta;
import org.apache.hop.pipeline.Pipeline;
import org.apache.hop.pipeline.PipelineMeta;
import org.apache.hop.pipeline.SingleThreadedPipelineExecutor;
import org.apache.hop.pipeline.engines.local.LocalPipelineEngine;
import org.apache.hop.pipeline.transform.BaseTransform;
import org.apache.hop.pipeline.transform.ITransform;
import org.apache.hop.pipeline.transform.RowAdapter;
import org.apache.hop.pipeline.transform.TransformMeta;

/* loaded from: input_file:org/apache/hop/pipeline/transforms/eventhubs/listen/AzureListener.class */
public class AzureListener extends BaseTransform<AzureListenerMeta, AzureListenerData> {
    public AzureListener(TransformMeta transformMeta, AzureListenerMeta azureListenerMeta, AzureListenerData azureListenerData, int i, PipelineMeta pipelineMeta, Pipeline pipeline) {
        super(transformMeta, azureListenerMeta, azureListenerData, i, pipelineMeta, pipeline);
    }

    public boolean init() {
        ((AzureListenerData) this.data).batchSize = Const.toInt(resolve(this.meta.getBatchSize()), 100);
        ((AzureListenerData) this.data).prefetchSize = Const.toInt(resolve(this.meta.getPrefetchSize()), -1);
        ((AzureListenerData) this.data).list = new LinkedList<>();
        return super.init();
    }

    public void dispose() {
        ((AzureListenerData) this.data).executorService.shutdown();
        super.dispose();
    }

    public boolean processRow() throws HopException {
        ((AzureListenerData) this.data).outputRowMeta = new RowMeta();
        this.meta.getRegularRowMeta(((AzureListenerData) this.data).outputRowMeta, this);
        ((AzureListenerData) this.data).outputField = resolve(this.meta.getOutputField());
        ((AzureListenerData) this.data).partitionIdField = resolve(this.meta.getPartitionIdField());
        ((AzureListenerData) this.data).offsetField = resolve(this.meta.getOffsetField());
        ((AzureListenerData) this.data).sequenceNumberField = resolve(this.meta.getSequenceNumberField());
        ((AzureListenerData) this.data).hostField = resolve(this.meta.getHostField());
        ((AzureListenerData) this.data).enqueuedTimeField = resolve(this.meta.getEnqueuedTimeField());
        String resolve = resolve(this.meta.getNamespace());
        String resolve2 = resolve(this.meta.getEventHubName());
        String resolve3 = resolve(this.meta.getSasKeyName());
        String resolve4 = resolve(this.meta.getSasKey());
        String resolve5 = resolve(this.meta.getConsumerGroupName());
        String resolve6 = resolve(this.meta.getStorageContainerName());
        String resolve7 = resolve(this.meta.getStorageConnectionString());
        String resolve8 = resolve(this.meta.getBatchPipeline());
        String resolve9 = resolve(this.meta.getBatchInputTransform());
        String resolve10 = resolve(this.meta.getBatchOutputTransform());
        if (StringUtils.isNotEmpty(resolve8) && StringUtils.isNotEmpty(resolve9)) {
            logBasic("Passing rows to a batching transformation running single threaded : " + resolve8);
            ((AzureListenerData) this.data).stt = true;
            ((AzureListenerData) this.data).sttMaxWaitTime = Const.toLong(resolve(this.meta.getBatchMaxWaitTime()), -1L);
            ((AzureListenerData) this.data).sttPipelineMeta = AzureListenerMeta.loadBatchPipelineMeta(this.meta, this.metadataProvider, this);
            ((AzureListenerData) this.data).sttPipelineMeta.setPipelineType(PipelineMeta.PipelineType.SingleThreaded);
            ((AzureListenerData) this.data).sttPipeline = new LocalPipelineEngine(((AzureListenerData) this.data).sttPipelineMeta, this, this);
            ((AzureListenerData) this.data).sttPipeline.setParent(getPipeline());
            ((AzureListenerData) this.data).sttPipeline.setParentPipeline(getPipeline());
            getPipeline().addActiveSubPipeline(getTransformName(), ((AzureListenerData) this.data).sttPipeline);
            ((AzureListenerData) this.data).sttPipeline.prepareExecution();
            ((AzureListenerData) this.data).sttRowProducer = ((AzureListenerData) this.data).sttPipeline.addRowProducer(resolve9, 0);
            if (StringUtils.isNotEmpty(resolve10)) {
                ITransform findRunThread = ((AzureListenerData) this.data).sttPipeline.findRunThread(resolve10);
                if (findRunThread == null) {
                    throw new HopTransformException("Unable to find output transform '" + resolve10 + "'in batch pipeline");
                }
                findRunThread.addRowListener(new RowAdapter() { // from class: org.apache.hop.pipeline.transforms.eventhubs.listen.AzureListener.1
                    public void rowWrittenEvent(IRowMeta iRowMeta, Object[] objArr) throws HopTransformException {
                        AzureListener.this.putRow(iRowMeta, objArr);
                    }
                });
            }
            ((AzureListenerData) this.data).sttPipeline.startThreads();
            ((AzureListenerData) this.data).sttExecutor = new SingleThreadedPipelineExecutor(((AzureListenerData) this.data).sttPipeline);
            if (!((AzureListenerData) this.data).sttExecutor.init()) {
                logError("Initializing batch transformation failed");
                stopAll();
                setErrors(1L);
                return false;
            }
        } else {
            ((AzureListenerData) this.data).stt = false;
        }
        this.log.logDetailed("Creating connection string builder");
        ((AzureListenerData) this.data).connectionStringBuilder = new ConnectionStringBuilder().setNamespaceName(resolve).setEventHubName(resolve2).setSasKeyName(resolve3).setSasKey(resolve4);
        this.log.logDetailed("Opening new executor service");
        ((AzureListenerData) this.data).executorService = Executors.newSingleThreadScheduledExecutor();
        this.log.logDetailed("Creating event hub client");
        try {
            ((AzureListenerData) this.data).eventHubClient = EventHubClient.createFromConnectionStringSync(((AzureListenerData) this.data).connectionStringBuilder.toString(), ((AzureListenerData) this.data).executorService);
            try {
                EventProcessorHost build = EventProcessorHost.EventProcessorHostBuilder.newBuilder(EventProcessorHost.createHostName("HopHost"), resolve5).useAzureStorageCheckpointLeaseManager(resolve7, resolve6, "hop").useEventHubConnectionString(((AzureListenerData) this.data).connectionStringBuilder.toString()).build();
                this.log.logDetailed("Set up events host named " + build.getHostName());
                EventProcessorOptions eventProcessorOptions = new EventProcessorOptions();
                eventProcessorOptions.setExceptionNotification(new AzureListenerErrorNotificationHandler(this));
                if (!StringUtils.isNotEmpty(this.meta.getBatchSize())) {
                    eventProcessorOptions.setMaxBatchSize(Const.toInt(resolve(this.meta.getBatchSize()), 100));
                }
                if (!StringUtils.isNotEmpty(this.meta.getPrefetchSize())) {
                    eventProcessorOptions.setPrefetchCount(Const.toInt(resolve(this.meta.getPrefetchSize()), 100));
                }
                ((AzureListenerData) this.data).executorService = Executors.newSingleThreadScheduledExecutor();
                try {
                    ((AzureListenerData) this.data).eventHubClient = EventHubClient.createFromConnectionStringSync(((AzureListenerData) this.data).connectionStringBuilder.toString(), ((AzureListenerData) this.data).executorService);
                    final AzureListenerEventProcessor azureListenerEventProcessor = new AzureListenerEventProcessor(this, (AzureListenerData) this.data, ((AzureListenerData) this.data).batchSize);
                    if (((AzureListenerData) this.data).stt && ((AzureListenerData) this.data).sttMaxWaitTime > 0) {
                        logBasic("Checking for stalled rows every 100ms to see if we exceed the maximum wait time: " + ((AzureListenerData) this.data).sttMaxWaitTime);
                        try {
                            new Timer().schedule(new TimerTask() { // from class: org.apache.hop.pipeline.transforms.eventhubs.listen.AzureListener.2
                                @Override // java.util.TimerTask, java.lang.Runnable
                                public void run() {
                                    if (azureListenerEventProcessor.getLastIterationTime() <= 0 || azureListenerEventProcessor.getPassedRowsCount() <= 0) {
                                        return;
                                    }
                                    long currentTimeMillis = System.currentTimeMillis() - azureListenerEventProcessor.getLastIterationTime();
                                    if (currentTimeMillis > ((AzureListenerData) AzureListener.this.data).sttMaxWaitTime) {
                                        AzureListener.this.logDetailed("Stalled rows detected with wait time of " + (currentTimeMillis / 1000.0d));
                                        try {
                                            try {
                                                azureListenerEventProcessor.startWait();
                                                azureListenerEventProcessor.doOneIteration();
                                                azureListenerEventProcessor.endWait();
                                                AzureListener.this.logDetailed("Done processing after max wait time.");
                                            } catch (Exception e) {
                                                throw new RuntimeException("Error in batch iteration when max wait time was exceeded", e);
                                            }
                                        } catch (Throwable th) {
                                            azureListenerEventProcessor.endWait();
                                            throw th;
                                        }
                                    }
                                }
                            }, 100L, 100L);
                        } catch (RuntimeException e) {
                            throw new HopTransformException("Error in batch iteration when max wait time was exceeded", e);
                        }
                    }
                    try {
                        build.registerEventProcessorFactory(partitionContext -> {
                            return azureListenerEventProcessor;
                        }).whenComplete((r5, th) -> {
                            if (th != null) {
                                logError("Failure while registering: " + th.toString());
                                if (th.getCause() != null) {
                                    logError("Inner exception: " + th.getCause().toString());
                                }
                                setErrors(1L);
                                stopAll();
                                setOutputDone();
                            }
                        }).thenAccept(r52 -> {
                            while (!isStopped() && !outputIsDone()) {
                                try {
                                    Thread.sleep(0L, 100);
                                } catch (InterruptedException e2) {
                                }
                            }
                        }).thenCompose(r3 -> {
                            return build.unregisterEventProcessor();
                        }).exceptionally((Function<Throwable, ? extends U>) th2 -> {
                            logError("Failure while unregistering: " + th2.toString());
                            if (th2.getCause() == null) {
                                return null;
                            }
                            logError("Inner exception: " + th2.getCause().toString());
                            return null;
                        }).get();
                        setOutputDone();
                        return false;
                    } catch (Exception e2) {
                        throw new HopException("Error in event processor", e2);
                    }
                } catch (Exception e3) {
                    throw new HopTransformException("Unable to create event hub client", e3);
                }
            } catch (Exception e4) {
                throw new HopException("Unable to set up events host processor", e4);
            }
        } catch (Exception e5) {
            throw new HopTransformException("Unable to create event hub client", e5);
        }
    }
}
