package com.datatorrent.contrib.kinesis;

import com.amazonaws.services.kinesis.model.Record;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.Pair;
import com.datatorrent.contrib.util.FieldValueSerializableGenerator;
import com.datatorrent.contrib.util.POJOTupleGenerateOperator;
import com.datatorrent.contrib.util.TestPOJO;
import com.datatorrent.contrib.util.TupleGenerator;
import com.datatorrent.lib.util.FieldInfo;
import java.nio.ByteBuffer;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/kinesis/KinesisByteArrayOutputOperatorTest.class */
public class KinesisByteArrayOutputOperatorTest extends KinesisOutputOperatorTest<KinesisByteArrayOutputOperator, POJOTupleGenerateOperator> {
    private FieldValueSerializableGenerator fieldValueGenerator;

    /* loaded from: input_file:com/datatorrent/contrib/kinesis/KinesisByteArrayOutputOperatorTest$KinesisEmployeeConsumer.class */
    public static class KinesisEmployeeConsumer extends KinesisTestConsumer {
        private static final Logger logger = LoggerFactory.getLogger(KinesisEmployeeConsumer.class);
        protected FieldValueSerializableGenerator<FieldInfo> fieldValueGenerator;

        public KinesisEmployeeConsumer(String str) {
            super(str);
            this.fieldValueGenerator = FieldValueSerializableGenerator.getFieldValueGenerator(TestPOJO.class, null);
        }

        @Override // com.datatorrent.contrib.kinesis.KinesisTestConsumer
        protected void processRecord(Record record) {
            String partitionKey = record.getPartitionKey();
            ByteBuffer data = record.getData();
            logger.info("partitionKey={} ", partitionKey);
            byte[] bArr = new byte[data.remaining()];
            data.get(bArr, 0, bArr.length);
            TestPOJO testPOJO = new TestPOJO(Long.valueOf(partitionKey).longValue());
            TestPOJO testPOJO2 = (TestPOJO) this.fieldValueGenerator.deserializeObject(bArr);
            if (testPOJO2.outputFieldsEquals(testPOJO)) {
                logger.info("read is same as expected. read={}, expected={}", testPOJO2, testPOJO);
            } else {
                logger.error("read is not same as expected. read={}, expected={}", testPOJO2, testPOJO);
                Assert.assertTrue(false);
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/kinesis/KinesisByteArrayOutputOperatorTest$TestPOJOTupleGenerateOperator.class */
    public static class TestPOJOTupleGenerateOperator extends POJOTupleGenerateOperator<TestPOJO> {
        public TestPOJOTupleGenerateOperator() {
            super(TestPOJO.class);
            setTupleNum(20);
        }
    }

    @Test
    public void testKinesisOutputOperatorInternal() throws Exception {
        KinesisByteArrayOutputOperator kinesisByteArrayOutputOperator = new KinesisByteArrayOutputOperator();
        configureTestingOperator(kinesisByteArrayOutputOperator);
        kinesisByteArrayOutputOperator.setBatchProcessing(false);
        kinesisByteArrayOutputOperator.setup((Context.OperatorContext) null);
        TupleGenerator<TestPOJO> tupleGenerator = new TupleGenerator<>(TestPOJO.class);
        KinesisTestConsumer createConsumerListener = createConsumerListener(this.streamName);
        String prepareIterator = createConsumerListener.prepareIterator();
        for (int i = 0; i < 20; i++) {
            if (i % 2 == 0) {
                prepareIterator = createConsumerListener.processNextIterator(prepareIterator);
            }
            kinesisByteArrayOutputOperator.processTuple(getNextTuple(tupleGenerator));
        }
        createConsumerListener.processNextIterator(prepareIterator);
    }

    protected Pair<String, byte[]> getNextTuple(TupleGenerator<TestPOJO> tupleGenerator) {
        TestPOJO nextTuple = tupleGenerator.getNextTuple();
        if (this.fieldValueGenerator == null) {
            this.fieldValueGenerator = FieldValueSerializableGenerator.getFieldValueGenerator(TestPOJO.class, null);
        }
        return new Pair<>(nextTuple.getRow(), this.fieldValueGenerator.serializeObject(nextTuple));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.contrib.kinesis.KinesisOutputOperatorTest
    public POJOTupleGenerateOperator addGenerateOperator(DAG dag) {
        return dag.addOperator("TestPojoGenerator", TestPOJOTupleGenerateOperator.class);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.contrib.kinesis.KinesisOutputOperatorTest
    public DefaultOutputPort getOutputPortOfGenerator(POJOTupleGenerateOperator pOJOTupleGenerateOperator) {
        return pOJOTupleGenerateOperator.outputPort;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.contrib.kinesis.KinesisOutputOperatorTest
    public KinesisByteArrayOutputOperator addTestingOperator(DAG dag) {
        KinesisByteArrayOutputOperator addOperator = dag.addOperator("Test-KinesisByteArrayOutputOperator", KinesisByteArrayOutputOperator.class);
        addOperator.setBatchProcessing(true);
        return addOperator;
    }

    @Override // com.datatorrent.contrib.kinesis.KinesisOutputOperatorTest
    protected KinesisTestConsumer createConsumerListener(String str) {
        return new KinesisEmployeeConsumer(str);
    }
}
