package com.datatorrent.contrib.nifi;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.contrib.nifi.mock.MockSiteToSiteClient;
import com.datatorrent.contrib.nifi.mock.MockTransaction;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.util.file.FileUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.class */
public class NiFiSinglePortOutputOperatorTest {
    private Context.OperatorContext context;
    private WindowDataManager windowDataManager;
    private MockSiteToSiteClient.Builder stsBuilder;
    private NiFiDataPacketBuilder<String> dpBuilder;
    private NiFiSinglePortOutputOperator<String> operator;

    /* loaded from: input_file:com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest$StringNiFiDataPacketBuilder.class */
    public static class StringNiFiDataPacketBuilder implements NiFiDataPacketBuilder<String> {
        public NiFiDataPacket createNiFiDataPacket(String str) {
            return new StandardNiFiDataPacket(str.getBytes(StandardCharsets.UTF_8), new HashMap());
        }
    }

    @Before
    public void setup() throws IOException {
        String str = "target/" + getClass().getSimpleName();
        File file = new File(str);
        if (file.exists()) {
            FileUtils.deleteFile(file, true);
        }
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(DAG.APPLICATION_PATH, str);
        this.context = new OperatorContextTestHelper.TestIdOperatorContext(12345, defaultAttributeMap);
        this.windowDataManager = new FSWindowDataManager();
        this.stsBuilder = new MockSiteToSiteClient.Builder();
        this.dpBuilder = new StringNiFiDataPacketBuilder();
        this.operator = new NiFiSinglePortOutputOperator<>(this.stsBuilder, this.dpBuilder, this.windowDataManager, 1);
    }

    @Test
    public void testTransactionPerTuple() throws IOException {
        this.operator.setup(this.context);
        MockSiteToSiteClient mockSiteToSiteClient = (MockSiteToSiteClient) this.operator.client;
        this.operator.beginWindow(1L);
        this.operator.inputPort.process("tuple1");
        Assert.assertEquals(1L, mockSiteToSiteClient.getMockTransactions().size());
        this.operator.inputPort.process("tuple2");
        Assert.assertEquals(2L, mockSiteToSiteClient.getMockTransactions().size());
        this.operator.inputPort.process("tuple3");
        Assert.assertEquals(3L, mockSiteToSiteClient.getMockTransactions().size());
        this.operator.endNewWindow();
        Assert.assertEquals(3L, mockSiteToSiteClient.getMockTransactions().size());
        verifyTransactions(Arrays.asList("tuple1", "tuple2", "tuple3"), mockSiteToSiteClient.getMockTransactions());
    }

    @Test
    public void testBatchSize() throws IOException {
        this.operator = new NiFiSinglePortOutputOperator<>(this.stsBuilder, this.dpBuilder, this.windowDataManager, 3);
        this.operator.setup(this.context);
        MockSiteToSiteClient mockSiteToSiteClient = (MockSiteToSiteClient) this.operator.client;
        this.operator.beginWindow(1L);
        this.operator.inputPort.process("tuple1");
        Assert.assertEquals(0L, mockSiteToSiteClient.getMockTransactions().size());
        this.operator.inputPort.process("tuple2");
        Assert.assertEquals(0L, mockSiteToSiteClient.getMockTransactions().size());
        this.operator.inputPort.process("tuple3");
        Assert.assertEquals(1L, mockSiteToSiteClient.getMockTransactions().size());
        this.operator.inputPort.process("tuple4");
        Assert.assertEquals(1L, mockSiteToSiteClient.getMockTransactions().size());
        this.operator.inputPort.process("tuple5");
        Assert.assertEquals(1L, mockSiteToSiteClient.getMockTransactions().size());
        this.operator.endNewWindow();
        Assert.assertEquals(2L, mockSiteToSiteClient.getMockTransactions().size());
        verifyTransactions(Arrays.asList("tuple1", "tuple2", "tuple3", "tuple4", "tuple5"), mockSiteToSiteClient.getMockTransactions());
    }

    @Test
    public void testReplay() throws IOException {
        this.operator.setup(this.context);
        this.operator.beginWindow(1L);
        this.operator.inputPort.process("tuple1");
        this.operator.inputPort.process("tuple2");
        this.operator.inputPort.process("tuple3");
        this.operator.endWindow();
        Assert.assertEquals(3L, ((MockSiteToSiteClient) this.operator.client).getMockTransactions().size());
        this.operator.setup(this.context);
        this.operator.beginWindow(1L);
        this.operator.inputPort.process("tuple1");
        this.operator.inputPort.process("tuple2");
        this.operator.inputPort.process("tuple3");
        this.operator.endWindow();
        Assert.assertEquals(0L, ((MockSiteToSiteClient) this.operator.client).getMockTransactions().size());
    }

    private void verifyTransactions(List<String> list, List<MockTransaction> list2) throws IOException {
        ArrayList arrayList = new ArrayList();
        Iterator<MockTransaction> it = list2.iterator();
        while (it.hasNext()) {
            List<DataPacket> sentDataPackets = it.next().getSentDataPackets();
            Assert.assertTrue(sentDataPackets.size() > 0);
            Iterator<DataPacket> it2 = sentDataPackets.iterator();
            while (it2.hasNext()) {
                arrayList.add(IOUtils.toString(it2.next().getData()));
            }
        }
        for (String str : list) {
            boolean z = false;
            Iterator it3 = arrayList.iterator();
            while (true) {
                if (it3.hasNext()) {
                    if (((String) it3.next()).equals(str)) {
                        z = true;
                        break;
                    }
                } else {
                    break;
                }
            }
            Assert.assertTrue(z);
        }
    }
}
