package com.cloudera.dim.kafka.connect.partition.writers;

import com.cloudera.dim.kafka.connect.storage.PartitionStorage;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/dim/kafka/connect/partition/writers/AbstractPartitionWriter.class */
public abstract class AbstractPartitionWriter implements PartitionWriter {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractPartitionWriter.class);
    protected OutputStream outputStream;
    protected long currentConsumerPosition = -1;
    protected PartitionStorage partitionStorage;

    public void configure(Map<String, ?> map) {
    }

    @Override // com.cloudera.dim.kafka.connect.partition.writers.PartitionWriter
    public void open(TopicPartition topicPartition, PartitionStorage partitionStorage) throws Exception {
        this.partitionStorage = partitionStorage;
        partitionStorage.open(topicPartition, extension());
        this.outputStream = partitionStorage.getOutputStream();
        postOpen();
    }

    protected abstract void postOpen();

    @Override // com.cloudera.dim.kafka.connect.partition.writers.PartitionWriter
    public void write(SinkRecord sinkRecord) throws IOException {
        LOG.debug("writing record {}", sinkRecord);
        doWrite(sinkRecord);
        this.currentConsumerPosition = sinkRecord.kafkaOffset();
    }

    protected abstract void doWrite(SinkRecord sinkRecord) throws IOException;

    @Override // java.io.Flushable
    public void flush() throws IOException {
        if (this.currentConsumerPosition == -1) {
            return;
        }
        close();
        LOG.info("commit current offset {} to underlying storage", Long.valueOf(this.currentConsumerPosition));
        try {
            this.partitionStorage.commit(this.currentConsumerPosition, () -> {
                this.currentConsumerPosition = -1L;
                postFlush();
            });
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    protected abstract void postFlush() throws IOException;

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        doClose();
        this.partitionStorage.close();
    }

    protected abstract void doClose() throws IOException;
}
