package org.apache.tephra.persist;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.phoenix.shaded.com.ibm.icu.text.DateFormat;
import org.apache.phoenix.shaded.javax.annotation.Nullable;
import org.apache.tephra.TxConstants;
import org.apache.tephra.metrics.MetricsCollector;
import org.apache.tephra.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.tephra.shaded.com.google.common.base.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tephra/persist/AbstractTransactionLog.class */
public abstract class AbstractTransactionLog implements TransactionLog {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionLog.class);
    private final MetricsCollector metricsCollector;
    protected long timestamp;
    private volatile boolean initialized;
    private volatile boolean closing;
    private volatile boolean closed;
    private TransactionLogWriter writer;
    private final long slowAppendThreshold;
    private final AtomicLong logSequence = new AtomicLong();
    private long writtenUpTo = 0;
    private volatile long syncedUpTo = 0;
    private final Queue<Entry> pendingWrites = new ConcurrentLinkedQueue();
    private int countSinceLastSync = 0;
    private long positionBeforeWrite = -1;
    private final Stopwatch stopWatch = new Stopwatch();

    @VisibleForTesting
    @Deprecated
    /* loaded from: input_file:org/apache/tephra/persist/AbstractTransactionLog$CaskEntry.class */
    static class CaskEntry implements Writable {
        private LongWritable key;
        private org.apache.phoenix.shaded.co.cask.tephra.persist.TransactionEdit edit;

        public CaskEntry() {
            this.key = new LongWritable();
            this.edit = new org.apache.phoenix.shaded.co.cask.tephra.persist.TransactionEdit();
        }

        CaskEntry(LongWritable longWritable, org.apache.phoenix.shaded.co.cask.tephra.persist.TransactionEdit transactionEdit) {
            this.key = longWritable;
            this.edit = transactionEdit;
        }

        public LongWritable getKey() {
            return this.key;
        }

        public org.apache.phoenix.shaded.co.cask.tephra.persist.TransactionEdit getEdit() {
            return this.edit;
        }

        public void write(DataOutput dataOutput) throws IOException {
            this.key.write(dataOutput);
            this.edit.write(dataOutput);
        }

        public void readFields(DataInput dataInput) throws IOException {
            this.key.readFields(dataInput);
            this.edit.readFields(dataInput);
        }
    }

    /* loaded from: input_file:org/apache/tephra/persist/AbstractTransactionLog$Entry.class */
    public static class Entry implements Writable {
        private LongWritable key;
        private TransactionEdit edit;

        public Entry() {
            this.key = new LongWritable();
            this.edit = new TransactionEdit();
        }

        public Entry(LongWritable longWritable, TransactionEdit transactionEdit) {
            this.key = longWritable;
            this.edit = transactionEdit;
        }

        public LongWritable getKey() {
            return this.key;
        }

        public TransactionEdit getEdit() {
            return this.edit;
        }

        public void write(DataOutput dataOutput) throws IOException {
            this.key.write(dataOutput);
            this.edit.write(dataOutput);
        }

        public void readFields(DataInput dataInput) throws IOException {
            this.key.readFields(dataInput);
            this.edit.readFields(dataInput);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractTransactionLog(long j, MetricsCollector metricsCollector, Configuration configuration) {
        this.timestamp = j;
        this.metricsCollector = metricsCollector;
        this.slowAppendThreshold = configuration.getLong(TxConstants.TransactionLog.CFG_SLOW_APPEND_THRESHOLD, 1000L);
    }

    private synchronized void init() throws IOException {
        if (this.initialized) {
            return;
        }
        this.writer = createWriter();
        this.initialized = true;
    }

    protected abstract TransactionLogWriter createWriter() throws IOException;

    @Override // org.apache.tephra.persist.TransactionLog
    public abstract String getName();

    @Override // org.apache.tephra.persist.TransactionLog
    public long getTimestamp() {
        return this.timestamp;
    }

    @Override // org.apache.tephra.persist.TransactionLog
    public void append(TransactionEdit transactionEdit) throws IOException {
        append(Collections.singletonList(transactionEdit));
    }

    @Override // org.apache.tephra.persist.TransactionLog
    public void append(List<TransactionEdit> list) throws IOException {
        if (this.closing) {
            throw new IOException("Log " + getName() + " is closing or already closed, cannot append");
        }
        if (!this.initialized) {
            init();
        }
        synchronized (this.logSequence) {
            Iterator<TransactionEdit> it = list.iterator();
            while (it.hasNext()) {
                this.pendingWrites.add(new Entry(new LongWritable(this.logSequence.getAndIncrement()), it.next()));
            }
        }
        sync();
    }

    @Nullable
    private Entry[] getPendingWrites() {
        synchronized (this) {
            if (this.pendingWrites.isEmpty()) {
                return null;
            }
            Entry[] entryArr = new Entry[this.pendingWrites.size()];
            for (int i = 0; i < entryArr.length; i++) {
                entryArr[i] = this.pendingWrites.remove();
            }
            return entryArr;
        }
    }

    private void startTimerIfNeeded(TransactionLogWriter transactionLogWriter, int i) throws IOException {
        if (this.positionBeforeWrite == -1) {
            this.positionBeforeWrite = transactionLogWriter.getPosition();
            this.countSinceLastSync = 0;
            this.stopWatch.reset().start();
        }
        this.countSinceLastSync += i;
    }

    private void stopTimer(TransactionLogWriter transactionLogWriter) throws IOException {
        if (this.positionBeforeWrite != -1) {
            this.stopWatch.stop();
            long elapsedMillis = this.stopWatch.elapsedMillis();
            long position = transactionLogWriter.getPosition() - this.positionBeforeWrite;
            if (elapsedMillis >= this.slowAppendThreshold) {
                Logger logger = LOG;
                Object[] objArr = new Object[5];
                objArr[0] = getName();
                objArr[1] = Long.valueOf(elapsedMillis);
                objArr[2] = Integer.valueOf(this.countSinceLastSync);
                objArr[3] = this.countSinceLastSync == 1 ? DateFormat.YEAR : "ies";
                objArr[4] = Long.valueOf(position);
                logger.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.", objArr);
            }
            this.metricsCollector.histogram("wal.sync.size", this.countSinceLastSync);
            this.metricsCollector.histogram("wal.sync.bytes", (int) position);
        }
        this.positionBeforeWrite = -1L;
        this.countSinceLastSync = 0;
    }

    private void sync() throws IOException {
        long j = 0;
        synchronized (this) {
            if (this.closed) {
                if (!this.pendingWrites.isEmpty()) {
                    throw new IOException("Unexpected state: Writer is closed but there are pending edits. Cannot guarantee that edits were persisted");
                }
                return;
            }
            Entry[] pendingWrites = getPendingWrites();
            if (pendingWrites != null) {
                int length = pendingWrites.length;
                startTimerIfNeeded(this.writer, length);
                this.writer.commitMarker(length);
                for (Entry entry : pendingWrites) {
                    this.writer.append(entry);
                }
                j = pendingWrites[pendingWrites.length - 1].getKey().get();
                this.writtenUpTo = j;
            }
            if (this.syncedUpTo >= j) {
                return;
            }
            synchronized (this) {
                if (this.syncedUpTo >= j) {
                    return;
                }
                if (this.closed) {
                    throw new IOException(String.format("Unexpected state: Writer is closed but there are unsynced edits up to sequence id %d, and writes have been synced up to sequence id %d. Cannot guarantee that edits are persisted.", Long.valueOf(j), Long.valueOf(this.syncedUpTo)));
                }
                this.writer.sync();
                this.syncedUpTo = this.writtenUpTo;
                stopTimer(this.writer);
            }
        }
    }

    @Override // org.apache.tephra.persist.TransactionLog
    public synchronized void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.closing = true;
        if (!this.pendingWrites.isEmpty()) {
            sync();
        }
        if (this.writer != null) {
            this.writer.close();
        }
        this.closed = true;
    }

    public boolean isClosed() {
        return this.closed;
    }

    @Override // org.apache.tephra.persist.TransactionLog
    public abstract TransactionLogReader getReader() throws IOException;
}
