package com.datatorrent.contrib.kinesis;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import java.util.concurrent.ArrayBlockingQueue;

/* loaded from: input_file:com/datatorrent/contrib/kinesis/KinesisStringOutputOperatorTest.class */
public class KinesisStringOutputOperatorTest extends KinesisOutputOperatorTest<KinesisStringOutputOperator, StringGeneratorInputOperator> {

    /* loaded from: input_file:com/datatorrent/contrib/kinesis/KinesisStringOutputOperatorTest$StringGeneratorInputOperator.class */
    public static class StringGeneratorInputOperator implements InputOperator, Operator.ActivationListener<Context.OperatorContext> {
        public final transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<>();
        private final transient ArrayBlockingQueue<String> stringBuffer = new ArrayBlockingQueue<>(1024);
        private volatile Thread dataGeneratorThread;

        public void beginWindow(long j) {
        }

        public void endWindow() {
        }

        public void setup(Context.OperatorContext operatorContext) {
        }

        public void teardown() {
        }

        public void activate(Context.OperatorContext operatorContext) {
            this.dataGeneratorThread = new Thread("String Generator") { // from class: com.datatorrent.contrib.kinesis.KinesisStringOutputOperatorTest.StringGeneratorInputOperator.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    int i = 0;
                    while (StringGeneratorInputOperator.this.dataGeneratorThread != null && i < 20) {
                        try {
                            i++;
                            StringGeneratorInputOperator.this.stringBuffer.put("testString " + i);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
            };
            this.dataGeneratorThread.start();
        }

        public void deactivate() {
            this.dataGeneratorThread = null;
        }

        public void emitTuples() {
            int size = this.stringBuffer.size();
            while (true) {
                int i = size;
                size--;
                if (i <= 0) {
                    return;
                } else {
                    this.outputPort.emit(this.stringBuffer.poll());
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.contrib.kinesis.KinesisOutputOperatorTest
    public StringGeneratorInputOperator addGenerateOperator(DAG dag) {
        return dag.addOperator("TestStringGenerator", StringGeneratorInputOperator.class);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.contrib.kinesis.KinesisOutputOperatorTest
    public DefaultOutputPort getOutputPortOfGenerator(StringGeneratorInputOperator stringGeneratorInputOperator) {
        return stringGeneratorInputOperator.outputPort;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.contrib.kinesis.KinesisOutputOperatorTest
    public KinesisStringOutputOperator addTestingOperator(DAG dag) {
        return dag.addOperator("KinesisMessageProducer", KinesisStringOutputOperator.class);
    }
}
