package org.apache.apex.malhar.lib.state.managed;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.netlet.util.Slice;
import java.util.Random;
import org.apache.apex.malhar.lib.join.POJOPartitionJoinOperatorTest;
import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/apex/malhar/lib/state/managed/SliceBloomFilterTest.class */
public class SliceBloomFilterTest {
    private int loop = 100000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/apex/malhar/lib/state/managed/SliceBloomFilterTest$FilterOperator.class */
    public static class FilterOperator extends BaseOperator {
        private SliceBloomFilter bloomFilter;
        private SerializationBuffer buffer;
        public final transient DefaultInputPort<String> input;
        private int count;

        private FilterOperator() {
            this.bloomFilter = new SliceBloomFilter(10000, 0.99d);
            this.buffer = SerializationBuffer.READ_BUFFER;
            this.input = new DefaultInputPort<String>() { // from class: org.apache.apex.malhar.lib.state.managed.SliceBloomFilterTest.FilterOperator.1
                public void process(String str) {
                    FilterOperator.this.processTuple(str);
                }
            };
            this.count = 0;
        }

        public void setup(Context.OperatorContext operatorContext) {
        }

        public void processTuple(String str) {
            this.buffer.writeString(str);
            this.bloomFilter.mightContain(this.buffer.toSlice());
            this.buffer.reset();
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/state/managed/SliceBloomFilterTest$TestInputOperator.class */
    private static class TestInputOperator extends BaseOperator implements InputOperator {
        public final transient DefaultOutputPort<String> data;
        private int current;

        private TestInputOperator() {
            this.data = new DefaultOutputPort<>();
            this.current = 0;
        }

        public void emitTuples() {
            DefaultOutputPort<String> defaultOutputPort = this.data;
            StringBuilder append = new StringBuilder().append("");
            int i = this.current;
            this.current = i + 1;
            defaultOutputPort.emit(append.append(i).toString());
        }
    }

    @Test
    public void testBloomFilterForBytes() {
        Random random = new Random();
        byte[] bArr = new byte[this.loop + POJOPartitionJoinOperatorTest.TOTAL_TUPLES_PROCESS];
        random.nextBytes(bArr);
        System.currentTimeMillis();
        SliceBloomFilter sliceBloomFilter = new SliceBloomFilter(100000, 0.99d);
        for (int i = 0; i < this.loop; i++) {
            sliceBloomFilter.put(new Slice(bArr, i, (i % POJOPartitionJoinOperatorTest.TOTAL_TUPLES_PROCESS) + 1));
        }
        for (int i2 = 0; i2 < this.loop; i2++) {
            Assert.assertTrue(sliceBloomFilter.mightContain(new Slice(bArr, i2, (i2 % POJOPartitionJoinOperatorTest.TOTAL_TUPLES_PROCESS) + 1)));
        }
    }

    @Test
    public void testBloomFilterForInt() {
        testBloomFilterForInt(2);
        testBloomFilterForInt(3);
        testBloomFilterForInt(5);
        testBloomFilterForInt(7);
    }

    public void testBloomFilterForInt(int i) {
        SerializationBuffer serializationBuffer = SerializationBuffer.READ_BUFFER;
        SliceBloomFilter sliceBloomFilter = new SliceBloomFilter(this.loop, 0.3d);
        for (int i2 = 0; i2 < this.loop; i2++) {
            if (i2 % i == 0) {
                serializationBuffer.writeInt(i2);
                sliceBloomFilter.put(serializationBuffer.toSlice());
            }
        }
        serializationBuffer.getWindowedBlockStream().releaseAllFreeMemory();
        int i3 = 0;
        for (int i4 = 0; i4 < this.loop; i4++) {
            serializationBuffer.writeInt(i4);
            if (!sliceBloomFilter.mightContain(serializationBuffer.toSlice())) {
                Assert.assertTrue(i4 % i != 0);
            } else if (i4 % 2 != 0) {
                i3++;
            }
        }
        serializationBuffer.getWindowedBlockStream().releaseAllFreeMemory();
        Assert.assertTrue(((double) i3) / ((double) this.loop) <= 0.3d);
        for (int i5 = 0; i5 < this.loop; i5++) {
            if (i5 % i == 0) {
                serializationBuffer.writeInt(i5);
                Assert.assertTrue(sliceBloomFilter.mightContain(serializationBuffer.toSlice()));
            }
        }
        serializationBuffer.getWindowedBlockStream().releaseAllFreeMemory();
    }

    @Test
    public void testBloomFilterForApplication() throws Exception {
        Configuration configuration = new Configuration(false);
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        TestInputOperator testInputOperator = new TestInputOperator();
        dag.addOperator("Generator", testInputOperator);
        FilterOperator filterOperator = new FilterOperator();
        dag.addOperator("filterOperator", filterOperator);
        dag.addStream("Data", testInputOperator.data, filterOperator.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
        newInstance.prepareDAG(new StreamingApplication() { // from class: org.apache.apex.malhar.lib.state.managed.SliceBloomFilterTest.1
            public void populateDAG(DAG dag2, Configuration configuration2) {
            }
        }, configuration);
        LocalMode.Controller controller = newInstance.getController();
        controller.run(3000L);
        controller.shutdown();
    }
}
