package com.cloudera.dim.kafka.connect.s3;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.cloudera.dim.kafka.connect.s3.aws.AmazonS3ClientFactory;
import com.cloudera.dim.kafka.connect.storage.OffsetCommittable;
import com.cloudera.dim.kafka.connect.storage.PartitionStorage;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:com/cloudera/dim/kafka/connect/s3/S3PartitionStorage.class */
public class S3PartitionStorage extends OutputStream implements PartitionStorage, KafkaOffsetListener {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) S3PartitionStorage.class);
    private int maxBufferSize;
    private long filePosition;
    private int partNumber;
    private String bucketName;
    private String keyName;
    private InitiateMultipartUploadResult initResponse;
    private AmazonS3 s3Client;
    private TopicPartition topicPartition;
    private String fileExtension;
    private ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
    private List<PartETag> partETags = new ArrayList();

    public void configure(Map<String, ?> map) {
        S3SinkConnectorConfig s3SinkConnectorConfig = new S3SinkConnectorConfig(map);
        this.maxBufferSize = s3SinkConnectorConfig.getInt("aws.s3.upload_buffer_size").intValue();
        if (this.maxBufferSize <= 0 || this.maxBufferSize > S3SinkConnectorConfig.FIVE_MEG) {
            throw new IllegalStateException("aws.s3.upload_buffer_size must be between 1 and " + S3SinkConnectorConfig.FIVE_MEG + ", not " + this.maxBufferSize);
        }
        this.s3Client = ((AmazonS3ClientFactory) s3SinkConnectorConfig.getConfiguredInstance("aws.s3.client.factory", AmazonS3ClientFactory.class)).getClient();
        this.bucketName = s3SinkConnectorConfig.getString("aws.s3.bucket");
        if (bucketExists(this.s3Client, this.bucketName)) {
            return;
        }
        createBucket(this.s3Client, this.bucketName);
    }

    @Override // com.cloudera.dim.kafka.connect.s3.KafkaOffsetListener
    public void notifyKafkaOffset(long j) {
        if (this.initResponse == null) {
            this.partETags.clear();
            this.filePosition = 0L;
            this.partNumber = 0;
            this.keyName = String.format("%s_%d_%d.%s", this.topicPartition.topic(), Integer.valueOf(this.topicPartition.partition()), Long.valueOf(j), this.fileExtension);
            this.initResponse = initiateMultipartUpload(this.s3Client, new InitiateMultipartUploadRequest(this.bucketName, this.keyName));
            LOGGER.debug("Started upload: uploadId {} S3 bucket {}, key {}", this.initResponse.getUploadId(), this.bucketName, this.keyName);
        }
    }

    @Override // com.cloudera.dim.kafka.connect.storage.PartitionStorage
    public void open(TopicPartition topicPartition, String str) {
        Objects.requireNonNull(topicPartition, "topicPartition is null");
        Objects.requireNonNull(str, "fileExtension is null");
        LOGGER.debug("Opening topic {}, partition {}, fileExtension {}", topicPartition.topic(), Integer.valueOf(topicPartition.partition()), str);
        this.topicPartition = topicPartition;
        this.fileExtension = str;
    }

    @Override // com.cloudera.dim.kafka.connect.storage.PartitionStorage
    public OutputStream getOutputStream() {
        return this;
    }

    @Override // java.io.OutputStream
    public void write(int i) {
        this.byteArrayOutputStream.write(i);
        if (this.byteArrayOutputStream.size() == this.maxBufferSize) {
            uploadPart();
        }
    }

    private void uploadPart() {
        UploadPartRequest withUploadId = new UploadPartRequest().withBucketName(this.bucketName).withKey(this.keyName).withUploadId(this.initResponse.getUploadId());
        int i = this.partNumber + 1;
        this.partNumber = i;
        UploadPartRequest withPartSize = withUploadId.withPartNumber(i).withFileOffset(this.filePosition).withInputStream(new ByteArrayInputStream(this.byteArrayOutputStream.toByteArray())).withPartSize(this.byteArrayOutputStream.size());
        LOGGER.debug("Uploading part: uploadId {} S3 bucket {}, key {}, part length {}", this.initResponse.getUploadId(), this.bucketName, this.keyName, Integer.valueOf(this.byteArrayOutputStream.size()));
        this.partETags.add(uploadPart(this.s3Client, withPartSize).getPartETag());
        this.filePosition += this.byteArrayOutputStream.size();
        this.byteArrayOutputStream.reset();
    }

    private UploadPartResult uploadPart(AmazonS3 amazonS3, UploadPartRequest uploadPartRequest) {
        return amazonS3.uploadPart(uploadPartRequest);
    }

    @Override // com.cloudera.dim.kafka.connect.storage.OffsetCommittable
    public void commit(long j, OffsetCommittable.CommitCallback commitCallback) throws Exception {
        Objects.requireNonNull(commitCallback, "commitCallback is null");
        if (this.byteArrayOutputStream.size() > 0) {
            uploadPart();
        }
        LOGGER.debug("Committing: uploadId {} S3 bucket {}, key {}, total message length {} ", this.initResponse.getUploadId(), this.bucketName, this.keyName, Long.valueOf(this.filePosition));
        completeMultipartUpload(this.s3Client, new CompleteMultipartUploadRequest(this.bucketName, this.keyName, this.initResponse.getUploadId(), this.partETags));
        commitCallback.commitCompleted();
        this.initResponse = null;
    }

    @Override // com.cloudera.dim.kafka.connect.storage.OffsetCommittable
    public Long lastCommittedOffset() {
        throw new UnsupportedOperationException();
    }

    private void createBucket(AmazonS3 amazonS3, String str) {
        LOGGER.info("Creating bucket {}", str);
        amazonS3.createBucket(str);
    }

    private boolean bucketExists(AmazonS3 amazonS3, String str) {
        return amazonS3.doesBucketExistV2(str);
    }

    private InitiateMultipartUploadResult initiateMultipartUpload(AmazonS3 amazonS3, InitiateMultipartUploadRequest initiateMultipartUploadRequest) {
        return amazonS3.initiateMultipartUpload(initiateMultipartUploadRequest);
    }

    private void completeMultipartUpload(AmazonS3 amazonS3, CompleteMultipartUploadRequest completeMultipartUploadRequest) {
        amazonS3.completeMultipartUpload(completeMultipartUploadRequest);
    }
}
