/*
 * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
 * or more contributor license agreements. Licensed under the "Elastic License
 * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
 * Public License v 1"; you may not use this file except in compliance with, at
 * your election, the "Elastic License 2.0", the "GNU Affero General Public
 * License v3.0 only", or the "Server Side Public License, v 1".
 */

package org.elasticsearch.index.translog;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.BytesRefRecycler;

import java.io.IOException;
import java.util.zip.CRC32;

import static org.elasticsearch.common.bytes.BytesReferenceTestUtils.equalBytes;
import static org.hamcrest.Matchers.equalTo;

/**
 * Tests to ensure that the multistep optimized TranslogStreamOutput serialization matches the standard
 * StreamOutput version.
 */
public class TranslogHeaderWriterTests extends ESTestCase {

    public void testIndexOperationSerializationMatches() throws IOException {
        RecyclerBytesStreamOutput headerOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE);
        RecyclerBytesStreamOutput fullOperationOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE);

        for (int i = 0; i < 30; i++) {
            // Test both the fast path (single page) and slow page (cross page)
            int offset;
            if (randomBoolean()) {
                offset = 0;
            } else {
                offset = BytesRefRecycler.NON_RECYCLING_INSTANCE.pageSize() - 1;
            }
            headerOutput.seek(offset);
            fullOperationOutput.seek(offset);

            BytesArray source = new BytesArray(randomByteArrayOfLength(randomIntBetween(50, 4196)));
            Translog.Index index = getIndex(source);

            BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
            BufferedChecksumStreamOutput output = new BufferedChecksumStreamOutput(bytesStreamOutput);
            Translog.writeOperationNoSize(output, index);
            BytesReference expectedWithoutSize = bytesStreamOutput.bytes();

            TranslogHeaderWriter.writeIndexHeader(headerOutput, index);
            serializationMatches(headerOutput.bytes(), source, offset, fullOperationOutput, expectedWithoutSize);
        }
    }

    public void testIndexOperationSerializationMatchesPreReorderedOperation() throws IOException {
        RecyclerBytesStreamOutput headerOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE);
        RecyclerBytesStreamOutput fullOperationOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE);

        for (int i = 0; i < 30; i++) {
            // Test both the fast path (single page) and slow page (cross page)
            int offset;
            if (randomBoolean()) {
                offset = 0;
            } else {
                offset = BytesRefRecycler.NON_RECYCLING_INSTANCE.pageSize() - 1;
            }
            headerOutput.seek(offset);
            fullOperationOutput.seek(offset);

            BytesArray source = new BytesArray(randomByteArrayOfLength(randomIntBetween(50, 4196)));
            Translog.Index index = getIndex(source);

            BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
            BufferedChecksumStreamOutput output = new BufferedChecksumStreamOutput(bytesStreamOutput);
            output.setTransportVersion(TransportVersion.fromId(Translog.REORDERED_TRANSLOG_OPERATIONS.id() - 1));
            Translog.writeOperationNoSize(output, index);
            BytesReference expectedWithoutSize = bytesStreamOutput.bytes();

            TranslogHeaderWriter.writeIndexHeader(headerOutput, index);
            operationMatches(headerOutput.bytes(), source, offset, fullOperationOutput, expectedWithoutSize);
        }
    }

    private static Translog.Index getIndex(BytesReference source) {
        String id = randomAlphaOfLength(20);
        long seqNo = randomLongBetween(0, Long.MAX_VALUE);
        long primaryTerm = randomLongBetween(0, Long.MAX_VALUE);
        long version = randomLongBetween(0, Long.MAX_VALUE);
        String routing = randomAlphaOfLength(20);
        long autoGeneratedIdTimestamp = randomLongBetween(0, Long.MAX_VALUE);
        return new Translog.Index(id, seqNo, primaryTerm, version, source, routing, autoGeneratedIdTimestamp);
    }

    public void testDeleteOperationSerializationMatches() throws IOException {
        RecyclerBytesStreamOutput headerOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE);
        RecyclerBytesStreamOutput fullOperationOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE);

        for (int i = 0; i < 30; i++) {
            // Test both the fast path (single page) and slow page (cross page)
            int offset;
            if (randomBoolean()) {
                offset = 0;
            } else {
                offset = BytesRefRecycler.NON_RECYCLING_INSTANCE.pageSize() - 1;
            }
            headerOutput.seek(offset);
            fullOperationOutput.seek(offset);

            Translog.Delete delete = getDelete();

            BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
            BufferedChecksumStreamOutput output = new BufferedChecksumStreamOutput(bytesStreamOutput);
            Translog.writeOperationNoSize(output, delete);
            BytesReference expectedWithoutSize = bytesStreamOutput.bytes();

            TranslogHeaderWriter.writeDeleteHeader(headerOutput, delete);
            serializationMatches(headerOutput.bytes(), null, offset, fullOperationOutput, expectedWithoutSize);
        }
    }

    public void testDeleteOperationSerializationMatchesPreReorderedOperation() throws IOException {
        RecyclerBytesStreamOutput headerOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE);
        RecyclerBytesStreamOutput fullOperationOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE);

        for (int i = 0; i < 30; i++) {
            // Test both the fast path (single page) and slow page (cross page)
            int offset;
            if (randomBoolean()) {
                offset = 0;
            } else {
                offset = BytesRefRecycler.NON_RECYCLING_INSTANCE.pageSize() - 1;
            }
            headerOutput.seek(offset);
            fullOperationOutput.seek(offset);

            Translog.Delete delete = getDelete();

            BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
            BufferedChecksumStreamOutput output = new BufferedChecksumStreamOutput(bytesStreamOutput);
            output.setTransportVersion(TransportVersion.fromId(Translog.REORDERED_TRANSLOG_OPERATIONS.id() - 1));
            Translog.writeOperationNoSize(output, delete);
            BytesReference expectedWithoutSize = bytesStreamOutput.bytes();

            TranslogHeaderWriter.writeDeleteHeader(headerOutput, delete);
            operationMatches(headerOutput.bytes(), null, offset, fullOperationOutput, expectedWithoutSize);
        }
    }

    private static Translog.Delete getDelete() {
        String id = randomAlphaOfLength(20);
        long seqNo = randomLongBetween(0, Long.MAX_VALUE);
        long primaryTerm = randomLongBetween(0, Long.MAX_VALUE);
        long version = randomLongBetween(0, Long.MAX_VALUE);
        Translog.Delete delete = new Translog.Delete(id, seqNo, primaryTerm, version);
        return delete;
    }

    public void testNoOpOperationSerializationMatches() throws IOException {
        RecyclerBytesStreamOutput headerOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE);
        RecyclerBytesStreamOutput fullOperationOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE);

        for (int i = 0; i < 30; i++) {
            // Test both the fast path (single page) and slow page (cross page)
            int offset;
            if (randomBoolean()) {
                offset = 0;
            } else {
                offset = BytesRefRecycler.NON_RECYCLING_INSTANCE.pageSize() - 1;
            }
            headerOutput.seek(offset);
            fullOperationOutput.seek(offset);

            long seqNo = randomLongBetween(0, Long.MAX_VALUE);
            long primaryTerm = randomLongBetween(0, Long.MAX_VALUE);
            String reason = randomAlphaOfLength(20);
            Translog.NoOp noOp = new Translog.NoOp(seqNo, primaryTerm, reason);

            BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
            BufferedChecksumStreamOutput output = new BufferedChecksumStreamOutput(bytesStreamOutput);
            Translog.writeOperationNoSize(output, noOp);
            BytesReference expectedWithoutSize = bytesStreamOutput.bytes();

            TranslogHeaderWriter.writeNoOpHeader(headerOutput, noOp);
            serializationMatches(headerOutput.bytes(), null, offset, fullOperationOutput, expectedWithoutSize);
        }
    }

    private static void serializationMatches(
        BytesReference headerBytes,
        @Nullable BytesReference source,
        int offset,
        RecyclerBytesStreamOutput operationOutput,
        BytesReference expectedWithoutSize
    ) throws IOException {
        Translog.Serialized serialized = Translog.Serialized.create(
            headerBytes.slice(offset, headerBytes.length() - offset),
            source,
            new CRC32()
        );
        serialized.writeToTranslogBuffer(operationOutput);

        BytesReference actualWithSize = operationOutput.bytes();
        assertThat(actualWithSize.slice(4 + offset, actualWithSize.length() - offset - 4), equalBytes(expectedWithoutSize));
    }

    private static void operationMatches(
        BytesReference headerBytes,
        BytesReference source,
        int offset,
        RecyclerBytesStreamOutput fullOperationOutput,
        BytesReference expectedWithoutSize
    ) throws IOException {
        Translog.Serialized serialized = Translog.Serialized.create(
            headerBytes.slice(offset, headerBytes.length() - offset),
            source,
            new CRC32()
        );
        serialized.writeToTranslogBuffer(fullOperationOutput);

        BytesReference actualWithSize = fullOperationOutput.bytes();
        StreamInput actual = actualWithSize.slice(4 + offset, actualWithSize.length() - offset - 4).streamInput();

        assertThat(Translog.Operation.readOperation(actual), equalTo(Translog.Operation.readOperation(expectedWithoutSize.streamInput())));
    }
}
