package com.datatorrent.contrib.kinesis;

import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.contrib.kinesis.AbstractKinesisOutputOperator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/kinesis/KinesisOutputOperatorTest.class */
public abstract class KinesisOutputOperatorTest<O extends AbstractKinesisOutputOperator, G extends Operator> extends KinesisOperatorTestBase {
    private static final Logger logger = LoggerFactory.getLogger(KinesisOutputOperatorTest.class);
    protected static final int maxTuple = 20;
    protected CountDownLatch doneLatch;
    private boolean enableConsumer = true;
    private Thread listenerThread;

    @Override // com.datatorrent.contrib.kinesis.KinesisOperatorTestBase
    @Before
    public void beforeTest() {
        this.shardCount = 1;
        super.beforeTest();
    }

    @Test
    public void testKinesisOutputOperator() throws Exception {
        KinesisTestConsumer kinesisTestConsumer = null;
        if (this.enableConsumer) {
            kinesisTestConsumer = createConsumerListener(this.streamName);
            if (kinesisTestConsumer != null) {
                this.doneLatch = new CountDownLatch(20);
                kinesisTestConsumer.setDoneLatch(this.doneLatch);
                this.listenerThread = new Thread(kinesisTestConsumer);
                this.listenerThread.start();
            }
        }
        LocalMode newInstance = LocalMode.newInstance();
        StreamingApplication streamingApplication = new StreamingApplication() { // from class: com.datatorrent.contrib.kinesis.KinesisOutputOperatorTest.1
            public void populateDAG(DAG dag, Configuration configuration) {
            }
        };
        DAG dag = newInstance.getDAG();
        G addGenerateOperator = addGenerateOperator(dag);
        O addTestingOperator = addTestingOperator(dag);
        configureTestingOperator(addTestingOperator);
        dag.addStream("Kinesis message", getOutputPortOfGenerator(addGenerateOperator), ((AbstractKinesisOutputOperator) addTestingOperator).inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        newInstance.prepareDAG(streamingApplication, new Configuration(false));
        LocalMode.Controller controller = newInstance.getController();
        controller.runAsync();
        if (this.doneLatch != null) {
            this.doneLatch.await(300000, TimeUnit.MILLISECONDS);
        } else {
            try {
                Thread.sleep(300000);
            } catch (Exception e) {
            }
        }
        if (kinesisTestConsumer != null) {
            kinesisTestConsumer.setIsAlive(false);
        }
        if (this.listenerThread != null) {
            this.listenerThread.join(1000L);
        }
        controller.shutdown();
        if (kinesisTestConsumer != null) {
            Assert.assertEquals("Number of emitted tuples", 20L, kinesisTestConsumer.holdingBuffer.size());
            logger.debug(String.format("Number of emitted tuples: %d", Integer.valueOf(kinesisTestConsumer.holdingBuffer.size())));
        }
        if (kinesisTestConsumer != null) {
            kinesisTestConsumer.close();
        }
    }

    protected KinesisTestConsumer createConsumerListener(String str) {
        return new KinesisTestConsumer(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void configureTestingOperator(O o) {
        o.setAccessKey(this.credentials.getCredentials().getAWSAccessKeyId());
        o.setSecretKey(this.credentials.getCredentials().getAWSSecretKey());
        o.setBatchSize(500);
        o.setStreamName(this.streamName);
    }

    protected abstract G addGenerateOperator(DAG dag);

    protected abstract DefaultOutputPort getOutputPortOfGenerator(G g);

    protected abstract O addTestingOperator(DAG dag);
}
