package org.apache.apex.malhar.lib.state.managed;

import com.datatorrent.api.Context;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.datatorrent.lib.util.TestUtils;
import com.datatorrent.netlet.util.Slice;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.apex.malhar.lib.state.managed.Bucket;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.class */
public class ManagedStateImplTest {

    @Rule
    public TestMeta testMeta = new TestMeta();
    private static final Logger LOG = LoggerFactory.getLogger(ManagedStateImplTest.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest$TestMeta.class */
    public class TestMeta extends TestWatcher {
        ManagedStateImpl managedState;
        Context.OperatorContext operatorContext;
        String applicationPath;

        TestMeta() {
        }

        protected void starting(Description description) {
            TestUtils.deleteTargetTestClassFolder(description);
            this.managedState = new ManagedStateImpl();
            this.applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
            this.managedState.getFileAccess().setBasePath(this.applicationPath + "/bucket_data");
            this.operatorContext = ManagedStateTestUtils.getOperatorContext(1, this.applicationPath);
        }

        protected void finished(Description description) {
            TestUtils.deleteTargetTestClassFolder(description);
        }
    }

    @Test
    public void testSerde() throws IOException {
        Assert.assertEquals("num buckets", ((ManagedStateImpl) KryoCloneUtils.cloneObject(this.testMeta.managedState)).getNumBuckets(), this.testMeta.managedState.getNumBuckets());
    }

    @Test
    public void testSimplePutGet() {
        Slice sliceFor = ManagedStateTestUtils.getSliceFor("1");
        this.testMeta.managedState.setup(this.testMeta.operatorContext);
        this.testMeta.managedState.beginWindow(System.currentTimeMillis());
        this.testMeta.managedState.put(0L, sliceFor, sliceFor);
        Slice sync = this.testMeta.managedState.getSync(0L, sliceFor);
        this.testMeta.managedState.endWindow();
        Assert.assertEquals("value of one", sliceFor, sync);
        this.testMeta.managedState.teardown();
    }

    @Test
    public void testAsyncGetFromFlash() throws ExecutionException, InterruptedException {
        Slice sliceFor = ManagedStateTestUtils.getSliceFor("1");
        this.testMeta.managedState.setup(this.testMeta.operatorContext);
        this.testMeta.managedState.beginWindow(System.currentTimeMillis());
        this.testMeta.managedState.put(0L, sliceFor, sliceFor);
        Assert.assertEquals("value of one", sliceFor, (Slice) this.testMeta.managedState.getAsync(0L, sliceFor).get());
        this.testMeta.managedState.teardown();
    }

    @Test
    public void testIncrementalCheckpoint() {
        Slice sliceFor = ManagedStateTestUtils.getSliceFor("1");
        this.testMeta.managedState.setup(this.testMeta.operatorContext);
        long currentTimeMillis = System.currentTimeMillis();
        this.testMeta.managedState.beginWindow(currentTimeMillis);
        this.testMeta.managedState.put(0L, sliceFor, sliceFor);
        this.testMeta.managedState.endWindow();
        this.testMeta.managedState.beforeCheckpoint(currentTimeMillis);
        Bucket.DefaultBucket bucket = this.testMeta.managedState.getBucket(0L);
        Assert.assertEquals("value of one", sliceFor, ((Bucket.BucketedValue) ((Map) bucket.getCheckpointedData().get(Long.valueOf(currentTimeMillis))).get(sliceFor)).getValue());
        Slice sliceFor2 = ManagedStateTestUtils.getSliceFor("2");
        this.testMeta.managedState.beginWindow(currentTimeMillis + 1);
        this.testMeta.managedState.put(0L, sliceFor2, sliceFor2);
        this.testMeta.managedState.endWindow();
        this.testMeta.managedState.beforeCheckpoint(currentTimeMillis + 1);
        Assert.assertEquals("value of two", sliceFor2, ((Bucket.BucketedValue) ((Map) bucket.getCheckpointedData().get(Long.valueOf(currentTimeMillis + 1))).get(sliceFor2)).getValue());
        this.testMeta.managedState.teardown();
    }

    @Test
    public void testAsyncGetFromCheckpoint() throws ExecutionException, InterruptedException {
        Slice sliceFor = ManagedStateTestUtils.getSliceFor("1");
        this.testMeta.managedState.setup(this.testMeta.operatorContext);
        long currentTimeMillis = System.currentTimeMillis();
        this.testMeta.managedState.beginWindow(currentTimeMillis);
        this.testMeta.managedState.put(0L, sliceFor, sliceFor);
        this.testMeta.managedState.endWindow();
        this.testMeta.managedState.beforeCheckpoint(currentTimeMillis);
        Assert.assertEquals("value of one", sliceFor, this.testMeta.managedState.getAsync(0L, sliceFor).get());
        this.testMeta.managedState.teardown();
    }

    @Test
    public void testCommitted() {
        Slice sliceFor = ManagedStateTestUtils.getSliceFor("1");
        Slice sliceFor2 = ManagedStateTestUtils.getSliceFor("2");
        commitHelper(sliceFor, sliceFor2);
        Bucket.DefaultBucket bucket = this.testMeta.managedState.getBucket(0L);
        Assert.assertEquals("value of one", sliceFor, ((Bucket.BucketedValue) ((Map) bucket.getCommittedData().firstEntry().getValue()).get(sliceFor)).getValue());
        Assert.assertNull("value of two", ((Map) bucket.getCommittedData().firstEntry().getValue()).get(sliceFor2));
        this.testMeta.managedState.teardown();
    }

    @Test
    public void testAsyncGetFromCommitted() throws ExecutionException, InterruptedException {
        Slice sliceFor = ManagedStateTestUtils.getSliceFor("1");
        commitHelper(sliceFor, ManagedStateTestUtils.getSliceFor("2"));
        Assert.assertEquals("value of one", sliceFor, this.testMeta.managedState.getAsync(0L, sliceFor).get());
    }

    @Test
    public void testFreeWindowTransferRaceCondition() throws Exception {
        this.testMeta.managedState.setMaxMemorySize(1L);
        this.testMeta.managedState.setCheckStateSizeInterval(Duration.millis(1L));
        this.testMeta.managedState.setup(this.testMeta.operatorContext);
        long j = 300;
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= j) {
                break;
            }
            this.testMeta.managedState.beginWindow(j3);
            Slice sliceFor = ManagedStateTestUtils.getSliceFor(Long.toString(j3));
            this.testMeta.managedState.put(0L, sliceFor, sliceFor);
            this.testMeta.managedState.endWindow();
            this.testMeta.managedState.beforeCheckpoint(j3);
            this.testMeta.managedState.checkpointed(j3);
            Thread.sleep(1L);
            j2 = j3 + 1;
        }
        this.testMeta.managedState.committed(j - 2);
        this.testMeta.managedState.beginWindow(j + 1);
        for (int i = 300 - 1; i > 0; i--) {
            Assert.assertNotNull("null value for key " + i, this.testMeta.managedState.getSync(0L, ManagedStateTestUtils.getSliceFor(Integer.toString(i))));
        }
        this.testMeta.managedState.endWindow();
        this.testMeta.managedState.teardown();
    }

    private void commitHelper(Slice slice, Slice slice2) {
        this.testMeta.managedState.setup(this.testMeta.operatorContext);
        long currentTimeMillis = System.currentTimeMillis();
        this.testMeta.managedState.beginWindow(currentTimeMillis);
        this.testMeta.managedState.put(0L, slice, slice);
        this.testMeta.managedState.endWindow();
        this.testMeta.managedState.beforeCheckpoint(currentTimeMillis);
        this.testMeta.managedState.beginWindow(currentTimeMillis + 1);
        this.testMeta.managedState.put(0L, slice2, slice2);
        this.testMeta.managedState.endWindow();
        this.testMeta.managedState.beforeCheckpoint(currentTimeMillis + 1);
        this.testMeta.managedState.committed(currentTimeMillis);
    }
}
