package org.apache.apex.malhar.lib.state.managed;

import com.datatorrent.lib.util.KryoCloneUtils;
import com.datatorrent.lib.util.TestUtils;
import com.datatorrent.netlet.util.Slice;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.managed.Bucket;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.class */
public class IncrementalCheckpointManagerTest {

    @Rule
    public TestMeta testMeta = new TestMeta();
    private static final transient Logger LOG = LoggerFactory.getLogger(IncrementalCheckpointManagerTest.class);

    /* loaded from: input_file:org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest$MockBucketsFileSystem.class */
    static class MockBucketsFileSystem extends BucketsFileSystem {
        private final transient CountDownLatch latch;

        public MockBucketsFileSystem(@NotNull CountDownLatch countDownLatch) {
            this.latch = (CountDownLatch) Preconditions.checkNotNull(countDownLatch);
        }

        protected void writeBucketData(long j, long j2, Map<Slice, Bucket.BucketedValue> map, long j3) throws IOException {
            super.writeBucketData(j, j2, map, j3);
            if (j == 10) {
                this.latch.countDown();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest$TestMeta.class */
    public class TestMeta extends TestWatcher {
        IncrementalCheckpointManager checkpointManager;
        String applicationPath;
        int operatorId = 1;
        MockManagedStateContext managedStateContext;

        TestMeta() {
        }

        protected void starting(Description description) {
            TestUtils.deleteTargetTestClassFolder(description);
            this.applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
            this.managedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(this.operatorId, this.applicationPath));
            this.managedStateContext.getFileAccess().setBasePath(this.applicationPath + "/bucket_data");
            this.managedStateContext.getFileAccess().init();
            this.checkpointManager = new IncrementalCheckpointManager();
            this.managedStateContext.m104getTimeBucketAssigner().setup(this.managedStateContext);
            this.managedStateContext.getBucketsFileSystem().setup(this.managedStateContext);
        }

        protected void finished(Description description) {
            this.managedStateContext.m104getTimeBucketAssigner().teardown();
            this.managedStateContext.getBucketsFileSystem().teardown();
            TestUtils.deleteTargetTestClassFolder(description);
        }
    }

    @Test
    public void testSerde() throws IOException {
        Assert.assertNotNull("state window data manager", (IncrementalCheckpointManager) KryoCloneUtils.cloneObject(this.testMeta.checkpointManager));
    }

    @Test
    public void testSave() throws IOException {
        this.testMeta.checkpointManager.setup(this.testMeta.managedStateContext);
        Map<Long, Map<Slice, Bucket.BucketedValue>> testData = ManagedStateTestUtils.getTestData(0, 5, 0);
        this.testMeta.checkpointManager.save(testData, 10L, false);
        this.testMeta.checkpointManager.teardown();
        KryoCloneUtils createCloneUtils = KryoCloneUtils.createCloneUtils(this.testMeta.checkpointManager);
        this.testMeta.checkpointManager = (IncrementalCheckpointManager) createCloneUtils.getClone();
        this.testMeta.checkpointManager.setup(this.testMeta.managedStateContext);
        Assert.assertEquals("saved", testData, (Map) this.testMeta.checkpointManager.retrieve(10L));
        this.testMeta.checkpointManager.teardown();
    }

    @Test
    public void testTransferWindowFiles() throws IOException, InterruptedException {
        this.testMeta.checkpointManager.setup(this.testMeta.managedStateContext);
        Map<Long, Map<Slice, Bucket.BucketedValue>> testData = ManagedStateTestUtils.getTestData(0, 5, 0);
        this.testMeta.checkpointManager.save(testData, 10L, false);
        this.testMeta.checkpointManager.teardown();
        Thread.sleep(500L);
        this.testMeta.checkpointManager.committed(10L);
        this.testMeta.checkpointManager.transferWindowFiles();
        for (int i = 0; i < 5; i++) {
            ManagedStateTestUtils.validateBucketOnFileSystem(this.testMeta.managedStateContext.getFileAccess(), i, testData.get(Long.valueOf(i)), 1);
        }
    }

    @Test
    public void testTransferWindowFilesExcludeExpiredBuckets() throws IOException, InterruptedException {
        this.testMeta.checkpointManager.setup(this.testMeta.managedStateContext);
        Map<Long, Map<Slice, Bucket.BucketedValue>> testData = ManagedStateTestUtils.getTestData(200, 200 + 10, 0);
        this.testMeta.checkpointManager.setLatestExpiredTimeBucket(102L);
        this.testMeta.checkpointManager.save(testData, 10L, false);
        this.testMeta.checkpointManager.teardown();
        Thread.sleep(500L);
        this.testMeta.checkpointManager.committed(10L);
        this.testMeta.checkpointManager.transferWindowFiles();
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 5; i++) {
            Map<Slice, Bucket.BucketedValue> map = testData.get(Long.valueOf(200 + i));
            HashMap newHashMap = Maps.newHashMap();
            for (Map.Entry<Slice, Bucket.BucketedValue> entry : map.entrySet()) {
                if (entry.getValue().getTimeBucket() > 102) {
                    newHashMap.put(entry.getKey(), entry.getValue());
                }
            }
            hashMap.put(Long.valueOf(200 + i), newHashMap);
        }
        for (int i2 = 0; i2 < 5; i2++) {
            ManagedStateTestUtils.validateBucketOnFileSystem(this.testMeta.managedStateContext.getFileAccess(), 200 + i2, (Map) hashMap.get(Long.valueOf(200 + i2)), 1);
        }
    }

    @Test
    public void testCommitted() throws IOException, InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        MockBucketsFileSystem mockBucketsFileSystem = new MockBucketsFileSystem(countDownLatch);
        this.testMeta.managedStateContext.setBucketsFileSystem(mockBucketsFileSystem);
        mockBucketsFileSystem.setup(this.testMeta.managedStateContext);
        this.testMeta.checkpointManager.setup(this.testMeta.managedStateContext);
        Map<Long, Map<Slice, Bucket.BucketedValue>> testData = ManagedStateTestUtils.getTestData(0, 5, 0);
        this.testMeta.checkpointManager.save(testData, 10L, false);
        this.testMeta.checkpointManager.committed(10L);
        countDownLatch.await();
        this.testMeta.checkpointManager.teardown();
        Thread.sleep(500L);
        for (int i = 0; i < 5; i++) {
            ManagedStateTestUtils.validateBucketOnFileSystem(this.testMeta.managedStateContext.getFileAccess(), i, testData.get(Long.valueOf(i)), 1);
        }
    }

    @Test
    public void testPurge() throws IOException, InterruptedException {
        FileSystem newInstance = FileSystem.newInstance(new Configuration());
        testTransferWindowFiles();
        Assert.assertTrue(newInstance.listLocatedStatus(new Path(this.testMeta.applicationPath + "/bucket_data")).hasNext());
        this.testMeta.managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(200L);
        if (newInstance.listLocatedStatus(new Path(this.testMeta.applicationPath + "/bucket_data")).hasNext()) {
            Assert.fail("All buckets should be deleted");
        }
    }
}
