package co.cask.cdap.data2.datafabric.dataset.service.executor;

import co.cask.cdap.api.dataset.DatasetDefinition;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.table.Get;
import co.cask.cdap.api.dataset.table.Put;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.discovery.RandomEndpointStrategy;
import co.cask.cdap.common.discovery.TimeLimitEndpointStrategy;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.guice.IOModule;
import co.cask.cdap.common.guice.KafkaClientModule;
import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.utils.Networks;
import co.cask.cdap.data.runtime.DataFabricModules;
import co.cask.cdap.data.runtime.DataSetServiceModules;
import co.cask.cdap.data.runtime.DataSetsModules;
import co.cask.cdap.data.runtime.TransactionMetricsModule;
import co.cask.cdap.data2.datafabric.dataset.service.DatasetService;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.explore.guice.ExploreClientModule;
import co.cask.cdap.gateway.auth.AuthModule;
import co.cask.common.http.HttpRequest;
import co.cask.common.http.HttpRequests;
import co.cask.tephra.DefaultTransactionExecutor;
import co.cask.tephra.TransactionAware;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionManager;
import co.cask.tephra.inmemory.InMemoryTxSystemClient;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:co/cask/cdap/data2/datafabric/dataset/service/executor/DatasetOpExecutorServiceTest.class */
public class DatasetOpExecutorServiceTest {
    private static final Gson GSON = new Gson();

    @ClassRule
    public static TemporaryFolder tmpFolder = new TemporaryFolder();
    private DatasetService managerService;
    private DatasetFramework dsFramework;
    private TimeLimitEndpointStrategy endpointStrategy;
    private TransactionManager txManager;

    @Before
    public void setUp() throws IOException {
        Configuration configuration = new Configuration();
        CConfiguration create = CConfiguration.create();
        File file = new File(tmpFolder.newFolder(), "datasetUser");
        file.mkdirs();
        create.set("dataset.service.output.dir", file.getAbsolutePath());
        create.set("dataset.service.bind.address", "localhost");
        create.set("dataset.executor.bind.address", "localhost");
        create.setInt("dataset.executor.bind.port", Networks.getRandomPort());
        Injector createInjector = Guice.createInjector(new Module[]{new ConfigModule(create, configuration), new IOModule(), new ZKClientModule(), new KafkaClientModule(), new DiscoveryRuntimeModule().getInMemoryModules(), new LocationRuntimeModule().getInMemoryModules(), new DataFabricModules().getInMemoryModules(), new DataSetsModules().getLocalModule(), new DataSetServiceModules().getInMemoryModule(), new AuthModule(), new TransactionMetricsModule(), new ExploreClientModule()});
        this.txManager = (TransactionManager) createInjector.getInstance(TransactionManager.class);
        this.txManager.startAndWait();
        this.managerService = (DatasetService) createInjector.getInstance(DatasetService.class);
        this.managerService.startAndWait();
        this.dsFramework = (DatasetFramework) createInjector.getInstance(DatasetFramework.class);
        this.endpointStrategy = new TimeLimitEndpointStrategy(new RandomEndpointStrategy(((DiscoveryServiceClient) createInjector.getInstance(DiscoveryServiceClient.class)).discover("dataset.service")), 1L, TimeUnit.SECONDS);
    }

    @After
    public void tearDown() {
        this.dsFramework = null;
        this.managerService.stopAndWait();
        this.managerService = null;
    }

    @Test
    public void testRest() throws Exception {
        testAdminOp("bob", "exists", 404, null);
        this.dsFramework.addInstance("table", "bob", DatasetProperties.EMPTY);
        testAdminOp("bob", "exists", 200, true);
        testAdminOp("joe", "exists", 404, null);
        final TransactionAware transactionAware = (Table) this.dsFramework.getDataset("bob", DatasetDefinition.NO_ARGUMENTS, (ClassLoader) null);
        DefaultTransactionExecutor defaultTransactionExecutor = new DefaultTransactionExecutor(new InMemoryTxSystemClient(this.txManager), ImmutableList.of(transactionAware));
        defaultTransactionExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.datafabric.dataset.service.executor.DatasetOpExecutorServiceTest.1
            public void apply() throws Exception {
                transactionAware.put(new Put("key1", "col1", "val1"));
            }
        });
        defaultTransactionExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.datafabric.dataset.service.executor.DatasetOpExecutorServiceTest.2
            public void apply() throws Exception {
                Assert.assertEquals("val1", transactionAware.get(new Get("key1", new String[]{"col1"})).getString("col1"));
            }
        });
        testAdminOp("bob", "truncate", 200, null);
        defaultTransactionExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.datafabric.dataset.service.executor.DatasetOpExecutorServiceTest.3
            public void apply() throws Exception {
                Assert.assertTrue(transactionAware.get(new Get("key1", new String[]{"col1"})).isEmpty());
            }
        });
        testAdminOp("bob", "upgrade", 200, null);
        this.dsFramework.deleteInstance("bob");
        testAdminOp("bob", "exists", 404, null);
    }

    @Test
    public void testUpdate() throws Exception {
        testAdminOp("bob", "exists", 404, null);
        this.dsFramework.addInstance("table", "bob", DatasetProperties.EMPTY);
        testAdminOp("bob", "exists", 200, true);
        this.dsFramework.updateInstance("bob", DatasetProperties.builder().add("dataset.table.ttl", "10000").build());
        testAdminOp("bob", "upgrade", 200, null);
        this.dsFramework.deleteInstance("bob");
        testAdminOp("bob", "exists", 404, null);
    }

    private void testAdminOp(String str, String str2, int i, Object obj) throws URISyntaxException, IOException {
        DatasetAdminOpResponse response = getResponse(HttpRequests.execute(HttpRequest.post(resolve(String.format("/data/datasets/%s/admin/%s", str, str2))).build()).getResponseBody());
        Assert.assertEquals(i, r0.getResponseCode());
        Assert.assertEquals(obj, response.getResult());
    }

    private URL resolve(String str) throws URISyntaxException, MalformedURLException {
        InetSocketAddress socketAddress = this.endpointStrategy.pick().getSocketAddress();
        return new URL(String.format("http://%s:%d%s%s", socketAddress.getHostName(), Integer.valueOf(socketAddress.getPort()), "/v2", str));
    }

    private DatasetAdminOpResponse getResponse(byte[] bArr) {
        return (DatasetAdminOpResponse) Objects.firstNonNull(GSON.fromJson(new String(bArr), DatasetAdminOpResponse.class), new DatasetAdminOpResponse((Object) null, (String) null));
    }
}
