/*
 * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
 * or more contributor license agreements. Licensed under the "Elastic License
 * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
 * Public License v 1"; you may not use this file except in compliance with, at
 * your election, the "Elastic License 2.0", the "GNU Affero General Public
 * License v3.0 only", or the "Server Side Public License, v 1".
 */

package org.elasticsearch.index.translog;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutputHelper;
import org.elasticsearch.common.util.ByteUtils;

import java.io.IOException;

/**
 * This class provided specialized serialization methods for translog operations. The goal is to allow direct
 * access to the current recycler bytes stream output page and write the entire operation with a single bounds
 * check.
 */
public final class TranslogHeaderWriter {

    public static final int FIXED_INDEX_HEADER_SIZE = 39;
    public static final int FIXED_DELETE_HEADER_SIZE = 30;
    public static final int FIXED_NO_OP_HEADER_SIZE = 21;

    public static final int OPERATION_TYPE_OFFSET = 4;
    public static final int SERIALIZATION_FORMAT_OFFSET = 5;
    public static final int VERSION_OFFSET = 6;
    public static final int SEQ_NO_OFFSET = 14;
    public static final int PRIMARY_TERM_OFFSET = 22;

    public static final int INDEX_AUTO_GENERATED_ID_TIMESTAMP_OFFSET = 30;
    public static final int INDEX_UID_LENGTH_OFFSET = 38;

    private TranslogHeaderWriter() {
        // This class depends on the serialization format fitting in a single vInt byte. If we advance pass 127 we need to tweak the logic
        // to write 2 bytes.
        assert Translog.Index.SERIALIZATION_FORMAT <= 127;
        assert Translog.Delete.SERIALIZATION_FORMAT <= 127;
    }

    public static void writeIndexHeader(RecyclerBytesStreamOutput buffer, Translog.Index index) throws IOException {
        int uidLen = index.uid().length;
        int uidVIntLen = RecyclerBytesStreamOutput.vIntLength(uidLen);
        BytesRef page = buffer.tryGetPageForWrite(FIXED_INDEX_HEADER_SIZE + uidLen + uidVIntLen);
        if (page != null) {
            writeFastIndexHeader(buffer, index, page, uidVIntLen);
        } else {
            writeSlowIndexHeader(buffer, index);
        }
    }

    private static void writeFastIndexHeader(RecyclerBytesStreamOutput buffer, Translog.Index index, BytesRef page, int uidVIntLen)
        throws IOException {
        BytesRef uid = index.uid();
        String routing = index.routing();

        int off = page.offset;
        byte[] bytes = page.bytes;
        bytes[off + OPERATION_TYPE_OFFSET] = Translog.Operation.Type.INDEX.id();
        // This is technically a vInt in the serialization, but until we advance past 127 we can just directly serialize as a byte
        bytes[off + SERIALIZATION_FORMAT_OFFSET] = (byte) Translog.Index.SERIALIZATION_FORMAT;
        ByteUtils.writeLongBE(index.version(), bytes, off + VERSION_OFFSET);
        ByteUtils.writeLongBE(index.seqNo(), bytes, off + SEQ_NO_OFFSET);
        ByteUtils.writeLongBE(index.primaryTerm(), bytes, off + PRIMARY_TERM_OFFSET);
        ByteUtils.writeLongBE(index.getAutoGeneratedIdTimestamp(), bytes, off + INDEX_AUTO_GENERATED_ID_TIMESTAMP_OFFSET);
        StreamOutputHelper.putVInt(bytes, uid.length, off + INDEX_UID_LENGTH_OFFSET);
        System.arraycopy(uid.bytes, uid.offset, bytes, off + INDEX_UID_LENGTH_OFFSET + uidVIntLen, uid.length);
        bytes[off + INDEX_UID_LENGTH_OFFSET + uidVIntLen + uid.length] = index.routing() == null ? (byte) 0 : (byte) 1;

        long variableLengthStart = buffer.position();
        // Write variable length items in header
        if (routing != null) {
            buffer.writeString(routing);
        }

        BytesReference source = index.source();
        int sourceLength = source == null ? 0 : source.length();
        // We write this so that we have a fully serialized header ready to append the source to. This allows us to fully calculate the
        // checksum
        buffer.writeVInt(sourceLength);

        int variableLengthSize = (int) (buffer.position() - variableLengthStart);
        int sizeOfOperation = FIXED_INDEX_HEADER_SIZE - Integer.BYTES + uidVIntLen + uid.length + variableLengthSize + sourceLength
            + Integer.BYTES;
        ByteUtils.writeIntBE(sizeOfOperation, bytes, off);
    }

    private static void writeSlowIndexHeader(RecyclerBytesStreamOutput buffer, Translog.Index index) throws IOException {
        final long start = buffer.position();
        buffer.skip(Integer.BYTES);
        buffer.writeByte(Translog.Operation.Type.INDEX.id());
        index.writeHeader(Translog.Index.SERIALIZATION_FORMAT, buffer);
        final long end = buffer.position();
        // The total operation size is the header size + source size + 4 bytes for checksum
        final int operationSize = (int) (end - Integer.BYTES - start) + index.source().length() + Integer.BYTES;
        buffer.seek(start);
        buffer.writeInt(operationSize);
        buffer.seek(end);
    }

    public static void writeDeleteHeader(RecyclerBytesStreamOutput buffer, Translog.Delete delete) throws IOException {
        int uidLen = delete.uid().length;
        int uidVIntLen = RecyclerBytesStreamOutput.vIntLength(uidLen);
        BytesRef page = buffer.tryGetPageForWrite(FIXED_DELETE_HEADER_SIZE + uidLen + uidVIntLen);
        if (page != null) {
            writeFastDeleteHeader(delete, page, uidVIntLen);
        } else {
            writeSlowDeleteHeader(buffer, delete);
        }
    }

    private static void writeFastDeleteHeader(Translog.Delete delete, BytesRef page, int uidVIntLen) throws IOException {
        BytesRef uid = delete.uid();

        int off = page.offset;
        byte[] bytes = page.bytes;
        bytes[off + OPERATION_TYPE_OFFSET] = Translog.Operation.Type.DELETE.id();
        // This is technically a vInt in the serialization, but until we advance past 127 we can just directly serialize as a byte
        bytes[off + SERIALIZATION_FORMAT_OFFSET] = (byte) Translog.Delete.SERIALIZATION_FORMAT;
        ByteUtils.writeLongBE(delete.version(), bytes, off + VERSION_OFFSET);
        ByteUtils.writeLongBE(delete.seqNo(), bytes, off + SEQ_NO_OFFSET);
        ByteUtils.writeLongBE(delete.primaryTerm(), bytes, off + PRIMARY_TERM_OFFSET);
        StreamOutputHelper.putVInt(bytes, uid.length, off + FIXED_DELETE_HEADER_SIZE);
        System.arraycopy(uid.bytes, uid.offset, bytes, off + FIXED_DELETE_HEADER_SIZE + uidVIntLen, uid.length);

        int sizeOfOperation = FIXED_DELETE_HEADER_SIZE - Integer.BYTES + uidVIntLen + uid.length + Integer.BYTES;
        ByteUtils.writeIntBE(sizeOfOperation, bytes, off);
    }

    private static void writeSlowDeleteHeader(RecyclerBytesStreamOutput buffer, Translog.Delete delete) throws IOException {
        final long start = buffer.position();
        buffer.skip(Integer.BYTES);
        buffer.writeByte(Translog.Operation.Type.DELETE.id());
        delete.writeHeader(Translog.Delete.SERIALIZATION_FORMAT, buffer);
        final long end = buffer.position();
        // The total operation size is the header size + 4 bytes for checksum
        final int operationSize = (int) (end - Integer.BYTES - start) + Integer.BYTES;
        buffer.seek(start);
        buffer.writeInt(operationSize);
        buffer.seek(end);
    }

    public static void writeNoOpHeader(RecyclerBytesStreamOutput buffer, Translog.NoOp noop) throws IOException {
        BytesRef bytesRef = buffer.tryGetPageForWrite(FIXED_NO_OP_HEADER_SIZE);
        if (bytesRef != null) {
            int off = bytesRef.offset;
            byte[] bytes = bytesRef.bytes;
            bytes[off + OPERATION_TYPE_OFFSET] = Translog.Operation.Type.NO_OP.id();
            ByteUtils.writeLongBE(noop.seqNo(), bytes, off + 5);
            ByteUtils.writeLongBE(noop.primaryTerm(), bytes, off + 13);

            long variableLengthStart = buffer.position();
            // Write variable length items in header
            buffer.writeString(noop.reason());
            int variableLengthSize = (int) (buffer.position() - variableLengthStart);
            // The total operation size is the header size + 4 bytes for checksum
            int sizeOfOperation = FIXED_NO_OP_HEADER_SIZE - Integer.BYTES + variableLengthSize + Integer.BYTES;
            ByteUtils.writeIntBE(sizeOfOperation, bytes, off);
        } else {
            writeSlowNoOpHeader(buffer, noop);
        }
    }

    private static void writeSlowNoOpHeader(RecyclerBytesStreamOutput buffer, Translog.NoOp noop) throws IOException {
        final long start = buffer.position();
        buffer.skip(Integer.BYTES);
        buffer.writeByte(Translog.Operation.Type.NO_OP.id());
        // No versioning for no-op
        noop.writeHeader(-1, buffer);
        final long end = buffer.position();
        // The total operation size is the header size + 4 bytes for checksum
        final int operationSize = (int) (end - Integer.BYTES - start) + Integer.BYTES;
        buffer.seek(start);
        buffer.writeInt(operationSize);
        buffer.seek(end);
    }
}
