package co.cask.cdap.service;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.dataset.lib.PartitionKey;
import co.cask.cdap.api.dataset.lib.PartitionOutput;
import co.cask.cdap.api.dataset.lib.PartitionedFileSet;
import co.cask.cdap.api.dataset.lib.PartitionedFileSetProperties;
import co.cask.cdap.api.dataset.lib.Partitioning;
import co.cask.cdap.api.service.http.AbstractHttpServiceHandler;
import co.cask.cdap.api.service.http.HttpContentConsumer;
import co.cask.cdap.api.service.http.HttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceRequest;
import co.cask.cdap.api.service.http.HttpServiceResponder;
import com.google.common.base.Throwables;
import com.google.common.io.BaseEncoding;
import com.google.common.io.Closeables;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.security.MessageDigest;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.twill.filesystem.Location;

/* loaded from: input_file:co/cask/cdap/service/FileUploadApp.class */
public class FileUploadApp extends AbstractApplication {
    public static final String PFS_NAME = "files";
    public static final String KV_TABLE_NAME = "tracking";
    public static final String SERVICE_NAME = "pfs";

    /* loaded from: input_file:co/cask/cdap/service/FileUploadApp$FileHandler.class */
    public static final class FileHandler extends AbstractHttpServiceHandler {
        @POST
        @Path("/upload/{dataset}/{partition}")
        public HttpContentConsumer upload(HttpServiceRequest httpServiceRequest, HttpServiceResponder httpServiceResponder, @PathParam("dataset") String str, @PathParam("partition") long j) throws Exception {
            final String header = httpServiceRequest.getHeader("Content-MD5");
            if (header == null) {
                httpServiceResponder.sendError(400, "Missing header \"Content-MD5\"");
                return null;
            }
            PartitionKey build = PartitionKey.builder().addLongField("time", j).build();
            final PartitionOutput partitionOutput = getContext().getDataset(str).getPartitionOutput(build);
            final Location location = partitionOutput.getLocation();
            if (!location.mkdirs()) {
                httpServiceResponder.sendError(409, String.format("Partition for key '%s' already exists for dataset '%s'", build, str));
                return null;
            }
            final MessageDigest messageDigest = MessageDigest.getInstance("MD5");
            final WritableByteChannel newChannel = Channels.newChannel(location.append("upload-" + System.currentTimeMillis()).getOutputStream());
            return new HttpContentConsumer() { // from class: co.cask.cdap.service.FileUploadApp.FileHandler.1
                public void onReceived(final ByteBuffer byteBuffer, Transactional transactional) throws Exception {
                    transactional.execute(new TxRunnable() { // from class: co.cask.cdap.service.FileUploadApp.FileHandler.1.1
                        public void run(DatasetContext datasetContext) throws Exception {
                            datasetContext.getDataset(FileUploadApp.KV_TABLE_NAME).increment(Bytes.toBytes(byteBuffer.remaining()), 1L);
                        }
                    });
                    byteBuffer.mark();
                    messageDigest.update(byteBuffer);
                    byteBuffer.reset();
                    newChannel.write(byteBuffer);
                }

                public void onFinish(HttpServiceResponder httpServiceResponder2) throws Exception {
                    newChannel.close();
                    String encode = BaseEncoding.base64().encode(messageDigest.digest());
                    if (!header.equals(encode)) {
                        throw new IllegalArgumentException("MD5 not match. Expected '" + header + "', received '" + encode + "'");
                    }
                    partitionOutput.addPartition();
                    httpServiceResponder2.sendStatus(200);
                }

                public void onError(HttpServiceResponder httpServiceResponder2, Throwable th) {
                    try {
                        try {
                            Closeables.closeQuietly(newChannel);
                            location.delete(true);
                            if (Throwables.getRootCause(th) instanceof IllegalArgumentException) {
                                httpServiceResponder2.sendStatus(400);
                            } else {
                                httpServiceResponder2.sendStatus(500);
                            }
                        } catch (IOException e) {
                            throw Throwables.propagate(e);
                        }
                    } catch (Throwable th2) {
                        if (Throwables.getRootCause(th) instanceof IllegalArgumentException) {
                            httpServiceResponder2.sendStatus(400);
                        } else {
                            httpServiceResponder2.sendStatus(500);
                        }
                        throw th2;
                    }
                }
            };
        }
    }

    public void configure() {
        createDataset(PFS_NAME, PartitionedFileSet.class, PartitionedFileSetProperties.builder().setPartitioning(Partitioning.builder().addLongField("time").build()).setInputFormat(TextInputFormat.class).build());
        createDataset(KV_TABLE_NAME, KeyValueTable.class);
        addService(SERVICE_NAME, new FileHandler(), new HttpServiceHandler[0]);
    }
}
