package co.cask.cdap.internal.app.runtime.schedule.store;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Scanner;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.schedule.SchedulableProgramType;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.transaction.TransactionExecutorFactory;
import co.cask.cdap.internal.AppFabricTestHelper;
import co.cask.cdap.internal.app.runtime.schedule.StreamSizeScheduleState;
import co.cask.cdap.internal.app.runtime.schedule.store.DatasetBasedStreamSizeScheduleStore;
import co.cask.cdap.internal.schedule.StreamSizeSchedule;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.ApplicationId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramId;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import java.util.Map;
import org.apache.tephra.TransactionAware;
import org.apache.tephra.TransactionExecutor;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/store/DatasetBasedStreamSizeScheduleStoreTest.class */
public class DatasetBasedStreamSizeScheduleStoreTest {
    public static DatasetBasedStreamSizeScheduleStore scheduleStore;
    public static TransactionExecutorFactory txExecutorFactory;
    public static DatasetFramework datasetFramework;
    private static final ApplicationId APP_ID = NamespaceId.DEFAULT.app("AppWithStreamSizeSchedule");
    private static final ProgramId PROGRAM_ID = APP_ID.program(ProgramType.WORKFLOW, "SampleWorkflow");
    private static final ApplicationId APP_ID_V1 = NamespaceId.DEFAULT.app("AppWithStreamSizeSchedule", "v1");
    private static final ProgramId PROGRAM_ID_V1 = APP_ID_V1.program(ProgramType.WORKFLOW, "SampleWorkflow");
    private static final Id.Stream STREAM_ID = Id.Stream.from(Id.Namespace.DEFAULT, "stream");
    private static final String SCHEDULE_NAME_1 = "Schedule1";
    private static final StreamSizeSchedule STREAM_SCHEDULE_1 = new StreamSizeSchedule(SCHEDULE_NAME_1, "Every 1M", STREAM_ID.getId(), 1);
    private static final String SCHEDULE_NAME_2 = "Schedule2";
    private static final StreamSizeSchedule STREAM_SCHEDULE_2 = new StreamSizeSchedule(SCHEDULE_NAME_2, "Every 10M", STREAM_ID.getId(), 10);
    private static final Map<String, String> MAP_1 = ImmutableMap.of("key1", "value1", "key2", "value2");
    private static final Map<String, String> MAP_2 = ImmutableMap.of("key3", "value3", "key4", "value4");
    private static final SchedulableProgramType PROGRAM_TYPE = SchedulableProgramType.WORKFLOW;

    @BeforeClass
    public static void set() throws Exception {
        Injector injector = AppFabricTestHelper.getInjector();
        scheduleStore = (DatasetBasedStreamSizeScheduleStore) injector.getInstance(DatasetBasedStreamSizeScheduleStore.class);
        txExecutorFactory = (TransactionExecutorFactory) injector.getInstance(TransactionExecutorFactory.class);
        datasetFramework = (DatasetFramework) injector.getInstance(DatasetFramework.class);
    }

    @Test
    public void testOldDataFormatCompatibility() throws Exception {
        testDeletion(PROGRAM_ID);
        testDeletion(PROGRAM_ID_V1);
    }

    private void testDeletion(final ProgramId programId) throws Exception {
        final boolean equals = programId.getVersion().equals("-SNAPSHOT");
        final TransactionAware transactionAware = (Table) datasetFramework.getDataset(NamespaceId.SYSTEM.dataset("schedulestore"), ImmutableMap.of(), (ClassLoader) null);
        Assert.assertNotNull(transactionAware);
        TransactionExecutor createExecutor = txExecutorFactory.createExecutor(ImmutableList.of(transactionAware));
        final byte[] bytes = Bytes.toBytes("streamSizeSchedule");
        final byte[] stopKeyForPrefix = Bytes.stopKeyForPrefix(bytes);
        createExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.store.DatasetBasedStreamSizeScheduleStoreTest.1
            public void apply() throws Exception {
                Scanner scan = transactionAware.scan(bytes, stopKeyForPrefix);
                Assert.assertNull(scan.next());
                scan.close();
            }
        });
        scheduleStore.persist(programId, PROGRAM_TYPE, STREAM_SCHEDULE_1, MAP_1, 0L, 0L, 0L, 0L, true);
        createExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.store.DatasetBasedStreamSizeScheduleStoreTest.2
            public void apply() throws Exception {
                String rowKey = DatasetBasedStreamSizeScheduleStoreTest.scheduleStore.getRowKey(new ProgramId(programId.getNamespace(), programId.getApplication(), programId.getType(), programId.getProgram()), DatasetBasedStreamSizeScheduleStoreTest.PROGRAM_TYPE, DatasetBasedStreamSizeScheduleStoreTest.STREAM_SCHEDULE_1.getName());
                Row row = transactionAware.get(Bytes.toBytes(DatasetBasedStreamSizeScheduleStoreTest.scheduleStore.getRowKey(programId, DatasetBasedStreamSizeScheduleStoreTest.PROGRAM_TYPE, DatasetBasedStreamSizeScheduleStoreTest.STREAM_SCHEDULE_1.getName())));
                Assert.assertFalse(row.isEmpty());
                byte[] bytes2 = Bytes.toBytes(DatasetBasedStreamSizeScheduleStoreTest.scheduleStore.removeAppVersion(rowKey));
                for (Map.Entry entry : row.getColumns().entrySet()) {
                    transactionAware.put(bytes2, (byte[]) entry.getKey(), (byte[]) entry.getValue());
                }
            }
        });
        createExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.store.DatasetBasedStreamSizeScheduleStoreTest.3
            public void apply() throws Exception {
                Scanner scan = transactionAware.scan(bytes, stopKeyForPrefix);
                int i = 0;
                while (scan.next() != null) {
                    i++;
                }
                scan.close();
                Assert.assertEquals(2L, i);
            }
        });
        scheduleStore.delete(programId, PROGRAM_TYPE, STREAM_SCHEDULE_1.getName());
        createExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.store.DatasetBasedStreamSizeScheduleStoreTest.4
            public void apply() throws Exception {
                Scanner scan = transactionAware.scan(bytes, stopKeyForPrefix);
                if (equals) {
                    Assert.assertNull(scan.next());
                } else {
                    Assert.assertNotNull(scan.next());
                    Assert.assertNull(scan.next());
                }
                scan.close();
            }
        });
        if (equals) {
            return;
        }
        createExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.store.DatasetBasedStreamSizeScheduleStoreTest.5
            public void apply() throws Exception {
                byte[] bytes2 = Bytes.toBytes(DatasetBasedStreamSizeScheduleStoreTest.scheduleStore.removeAppVersion(DatasetBasedStreamSizeScheduleStoreTest.scheduleStore.getRowKey(new ProgramId(programId.getNamespace(), programId.getApplication(), programId.getType(), programId.getProgram()), DatasetBasedStreamSizeScheduleStoreTest.PROGRAM_TYPE, DatasetBasedStreamSizeScheduleStoreTest.STREAM_SCHEDULE_1.getName())));
                Assert.assertFalse(transactionAware.get(bytes2).isEmpty());
                transactionAware.delete(bytes2);
            }
        });
    }

    @Test
    public void testStreamSizeSchedule() throws Exception {
        scheduleStore.persist(PROGRAM_ID, PROGRAM_TYPE, STREAM_SCHEDULE_1, MAP_1, 0L, 0L, 0L, 0L, true);
        scheduleStore.persist(PROGRAM_ID, PROGRAM_TYPE, STREAM_SCHEDULE_2, MAP_2, 1000L, 10L, 1000L, 10L, false);
        Assert.assertEquals(ImmutableList.of(new StreamSizeScheduleState(PROGRAM_ID, PROGRAM_TYPE, STREAM_SCHEDULE_1, MAP_1, 0L, 0L, 0L, 0L, true), new StreamSizeScheduleState(PROGRAM_ID, PROGRAM_TYPE, STREAM_SCHEDULE_2, MAP_2, 1000L, 10L, 1000L, 10L, false)), scheduleStore.list());
        scheduleStore.suspend(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1);
        Assert.assertEquals(ImmutableList.of(new StreamSizeScheduleState(PROGRAM_ID, PROGRAM_TYPE, STREAM_SCHEDULE_1, MAP_1, 0L, 0L, 0L, 0L, false), new StreamSizeScheduleState(PROGRAM_ID, PROGRAM_TYPE, STREAM_SCHEDULE_2, MAP_2, 1000L, 10L, 1000L, 10L, false)), scheduleStore.list());
        scheduleStore.resume(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2);
        Assert.assertEquals(ImmutableList.of(new StreamSizeScheduleState(PROGRAM_ID, PROGRAM_TYPE, STREAM_SCHEDULE_1, MAP_1, 0L, 0L, 0L, 0L, false), new StreamSizeScheduleState(PROGRAM_ID, PROGRAM_TYPE, STREAM_SCHEDULE_2, MAP_2, 1000L, 10L, 1000L, 10L, true)), scheduleStore.list());
        scheduleStore.updateBaseRun(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2, 10000L, 100L);
        Assert.assertEquals(ImmutableList.of(new StreamSizeScheduleState(PROGRAM_ID, PROGRAM_TYPE, STREAM_SCHEDULE_1, MAP_1, 0L, 0L, 0L, 0L, false), new StreamSizeScheduleState(PROGRAM_ID, PROGRAM_TYPE, STREAM_SCHEDULE_2, MAP_2, 10000L, 100L, 1000L, 10L, true)), scheduleStore.list());
        scheduleStore.updateLastRun(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1, 100L, 10000L, (DatasetBasedStreamSizeScheduleStore.TransactionMethod) null);
        Assert.assertEquals(ImmutableList.of(new StreamSizeScheduleState(PROGRAM_ID, PROGRAM_TYPE, STREAM_SCHEDULE_1, MAP_1, 0L, 0L, 100L, 10000L, false), new StreamSizeScheduleState(PROGRAM_ID, PROGRAM_TYPE, STREAM_SCHEDULE_2, MAP_2, 10000L, 100L, 1000L, 10L, true)), scheduleStore.list());
        scheduleStore.updateSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1, STREAM_SCHEDULE_2);
        Assert.assertEquals(ImmutableList.of(new StreamSizeScheduleState(PROGRAM_ID, PROGRAM_TYPE, STREAM_SCHEDULE_2, MAP_1, 0L, 0L, 100L, 10000L, false), new StreamSizeScheduleState(PROGRAM_ID, PROGRAM_TYPE, STREAM_SCHEDULE_2, MAP_2, 10000L, 100L, 1000L, 10L, true)), scheduleStore.list());
        scheduleStore.delete(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1);
        Assert.assertEquals(ImmutableList.of(new StreamSizeScheduleState(PROGRAM_ID, PROGRAM_TYPE, STREAM_SCHEDULE_2, MAP_2, 10000L, 100L, 1000L, 10L, true)), scheduleStore.list());
        scheduleStore.delete(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2);
        Assert.assertEquals(ImmutableList.of(), scheduleStore.list());
    }
}
