/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
 * Copyright by The HDF Group.                                               *
 * All rights reserved.                                                      *
 *                                                                           *
 * This file is part of HDF5.  The full HDF5 copyright notice, including     *
 * terms governing use, modification, and redistribution, is contained in    *
 * the COPYING file, which can be found at the root of the source code       *
 * distribution tree, or in https://www.hdfgroup.org/licenses.               *
 * If you do not have access to either file, you may request a copy from     *
 * help@hdfgroup.org.                                                        *
 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */

/*
 * Programmer:  Richard Warren
 *              Wednesday, July 1, 2020
 *
 * Purpose:     This is part of a parallel subfiling I/O driver.
 *
 */

/***********/
/* Headers */
/***********/

#include "H5FDsubfiling_priv.h"

/*-------------------------------------------------------------------------
 * Function:    H5FD__subfiling__truncate_sub_files
 *
 *              Note: This code should be moved -- most likely to the IOC
 *                    code files.
 *
 * Purpose:     Apply a truncate operation to the subfiles.
 *
 *              In the context of the I/O concentrators, the eof must be
 *              translated into the appropriate value for each of the
 *              subfiles, and then applied to same.
 *
 *              Further, we must ensure that all prior I/O requests complete
 *              before the truncate is applied.
 *
 *              We do this as follows:
 *
 *              1) Run a barrier on entry.
 *
 *              2) Determine if this rank is a IOC.  If it is, compute
 *                 the correct EOF for this subfile, and send a truncate
 *                 request to the IOC.
 *
 *              3) On the IOC thread, allow all pending I/O requests
 *                 received prior to the truncate request to complete
 *                 before performing the truncate.
 *
 *              4) Run a barrier on exit.
 *
 *              Observe that the barrier on entry ensures that any prior
 *              I/O requests will have been queue before the truncate
 *              request is sent to the IOC.
 *
 *              Similarly, the barrier on exit ensures that no subsequent
 *              I/O request will reach the IOC before the truncate request
 *              has been queued.
 *
 * Return:      SUCCEED/FAIL
 *
 * Programmer:  JRM -- 12/13/21
 *
 *-------------------------------------------------------------------------
 */
herr_t
H5FD__subfiling__truncate_sub_files(hid_t context_id, int64_t logical_file_eof, MPI_Comm comm)
{
    subfiling_context_t *sf_context = NULL;
    MPI_Request         *recv_reqs  = NULL;
    int64_t              msg[3]     = {0};
    int64_t             *recv_msgs  = NULL;
    int                  mpi_size;
    int                  mpi_code;
    herr_t               ret_value = SUCCEED;

    if (MPI_SUCCESS != (mpi_code = MPI_Comm_size(comm, &mpi_size)))
        H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Comm_size failed", mpi_code);

    /* Barrier on entry */
    if (mpi_size > 1)
        if (MPI_SUCCESS != (mpi_code = MPI_Barrier(comm)))
            H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code);

    if (NULL == (sf_context = (subfiling_context_t *)H5_get_subfiling_object(context_id)))
        H5_SUBFILING_GOTO_ERROR(H5E_FILE, H5E_BADVALUE, FAIL, "can't get subfile context");

    if (sf_context->topology->rank_is_ioc) {
        int64_t num_full_stripes;
        int64_t num_leftover_stripes;
        int64_t partial_stripe_len;
        int     num_subfiles_owned;

        num_full_stripes     = logical_file_eof / sf_context->sf_blocksize_per_stripe;
        partial_stripe_len   = logical_file_eof % sf_context->sf_blocksize_per_stripe;
        num_leftover_stripes = partial_stripe_len / sf_context->sf_stripe_size;

        num_subfiles_owned = sf_context->sf_num_fids;

        if (NULL == (recv_reqs = HDmalloc((size_t)num_subfiles_owned * sizeof(*recv_reqs))))
            H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL,
                                    "can't allocate receive requests array");
        if (NULL == (recv_msgs = HDmalloc((size_t)num_subfiles_owned * 3 * sizeof(*recv_msgs))))
            H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "can't allocate message array");

        /*
         * Post early receives for messages from the IOC main
         * thread that will signal completion of the truncate
         * operation
         */
        for (int i = 0; i < num_subfiles_owned; i++) {
            if (MPI_SUCCESS !=
                (mpi_code = MPI_Irecv(&recv_msgs[3 * i], 1, H5_subfiling_rpc_msg_type,
                                      sf_context->topology->io_concentrators[sf_context->topology->ioc_idx],
                                      TRUNC_COMPLETED, sf_context->sf_eof_comm, &recv_reqs[i])))
                H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code);
        }

        /* Compute the EOF for each subfile this IOC owns */
        for (int i = 0; i < num_subfiles_owned; i++) {
            int64_t subfile_eof = num_full_stripes * sf_context->sf_stripe_size;
            int64_t global_subfile_idx;

            global_subfile_idx =
                (i * sf_context->topology->n_io_concentrators) + sf_context->topology->ioc_idx;

            if (global_subfile_idx < num_leftover_stripes) {
                subfile_eof += sf_context->sf_stripe_size;
            }
            else if (global_subfile_idx == num_leftover_stripes) {
                subfile_eof += partial_stripe_len % sf_context->sf_stripe_size;
            }

            /* Direct the IOC to truncate this subfile to the correct EOF */
            msg[0] = subfile_eof;
            msg[1] = i;
            msg[2] = -1; /* padding -- not used in this message */

            if (MPI_SUCCESS !=
                (mpi_code = MPI_Send(msg, 1, H5_subfiling_rpc_msg_type,
                                     sf_context->topology->io_concentrators[sf_context->topology->ioc_idx],
                                     TRUNC_OP, sf_context->sf_msg_comm)))
                H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code);
        }

        /* Wait for truncate operations to complete */
        if (MPI_SUCCESS != (mpi_code = MPI_Waitall(num_subfiles_owned, recv_reqs, MPI_STATUSES_IGNORE)))
            H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Waitall", mpi_code);

            /* sanity check -- compute the file eof using the same mechanism used to
             * compute the subfile eof.  Assert that the computed value and the
             * actual value match.
             *
             * Do this only for debug builds -- probably delete this before release.
             *
             *                                           JRM -- 12/15/21
             */

#ifndef NDEBUG
        {
            int64_t test_file_eof = 0;

            for (int i = 0; i < sf_context->sf_num_subfiles; i++) {
                test_file_eof += num_full_stripes * sf_context->sf_stripe_size;

                if (i < num_leftover_stripes) {
                    test_file_eof += sf_context->sf_stripe_size;
                }
                else if (i == num_leftover_stripes) {
                    test_file_eof += partial_stripe_len % sf_context->sf_stripe_size;
                }
            }

            HDassert(test_file_eof == logical_file_eof);
        }
#endif /* NDEBUG */
    }

    /* Barrier on exit */
    if (mpi_size > 1)
        if (MPI_SUCCESS != (mpi_code = MPI_Barrier(comm)))
            H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code);

done:
    HDfree(recv_msgs);
    HDfree(recv_reqs);

    H5_SUBFILING_FUNC_LEAVE;
} /* H5FD__subfiling__truncate_sub_files() */

/*-------------------------------------------------------------------------
 * Function:    H5FD__subfiling__get_real_eof
 *
 *              Note: This code should be moved -- most likely to the IOC
 *                    code files.
 *
 * Purpose:     Query each subfile to get its local EOF, and then use this
 *              data to calculate the actual EOF.
 *
 *              Do this as follows:
 *
 *              1) allocate an array of int64_t of length equal to the
 *                 the number of subfiles, and initialize all fields to -1.
 *
 *              2) Send each subfile's IOC a message requesting that
 *                 subfile's EOF.
 *
 *              3) Await reply from each IOC, storing the reply in
 *                 the appropriate entry in the array allocated in 1.
 *
 *              4) After all IOCs have replied, compute the offset of
 *                 each subfile in the logical file.  Take the maximum
 *                 of these values, and report this value as the overall
 *                 EOF.
 *
 *              Note that this operation is not collective, and can return
 *              invalid data if other ranks perform writes while this
 *              operation is in progress.
 *
 *              SUBFILING NOTE:
 *              The EOF calculation for subfiling is somewhat different
 *              than for the more traditional HDF5 file implementations.
 *              This statement derives from the fact that unlike "normal"
 *              HDF5 files, subfiling introduces a multi-file representation
 *              of a single HDF5 file.  The plurality of subfiles represents
 *              a software RAID-0 based HDF5 file.  As such, each subfile
 *              contains a designated portion of the address space of the
 *              virtual HDF5 storage.  We have no notion of HDF5 datatypes,
 *              datasets, metadata, or other HDF5 structures; only BYTES.
 *
 *              The organization of the bytes within subfiles is consistent
 *              with the RAID-0 striping, i.e. there are IO Concentrators
 *              (IOCs) which correspond to a stripe-count (in Lustre) as
 *              well as a stripe_size.  The combination of these two
 *              variables determines the "address" (a combination of IOC
 *              and a file offset) of any storage operation.
 *
 *              Having a defined storage layout, the virtual file EOF
 *              calculation should be the MAXIMUM value returned by the
 *              collection of IOCs.  Every MPI rank which hosts an IOC
 *              maintains its own EOF by updating that value for each
 *              WRITE operation that completes, i.e. if a new local EOF
 *              is greater than the existing local EOF, the new EOF
 *              will replace the old.  The local EOF calculation is as
 *              follows.
 *              1. At file creation, each IOC is assigned a rank value
 *                 (0 to N-1, where N is the total number of IOCs) and
 *                 a 'sf_base_addr' = 'ioc_idx' * 'sf_stripe_size')
 *                 we also determine the 'sf_blocksize_per_stripe' which
 *                 is simply the 'sf_stripe_size' * 'n_ioc_concentrators'
 *
 *              2. For every write operation, the IOC receives a message
 *                 containing a file_offset and the data_size.
 *
 *              3. The file_offset + data_size are in turn used to
 *                 create a stripe_id:
 *                   IOC-(ioc_rank)       IOC-(ioc_rank+1)
 *                   |<- sf_base_address  |<- sf_base_address  |
 *                ID +--------------------+--------------------+
 *                 0:|<- sf_stripe_size ->|<- sf_stripe_size ->|
 *                 1:|<- sf_stripe_size ->|<- sf_stripe_size ->|
 *                   ~                    ~                    ~
 *                 N:|<- sf_stripe_size ->|<- sf_stripe_size ->|
 *                   +--------------------+--------------------+
 *
 *                The new 'stripe_id' is then used to calculate a
 *                potential new EOF:
 *                sf_eof = (stripe_id * sf_blocksize_per_stripe) + sf_base_addr
 *                         + ((file_offset + data_size) % sf_stripe_size)
 *
 *              4. If (sf_eof > current_sf_eof), then current_sf_eof = sf_eof.
 *
 * Return:      SUCCEED/FAIL
 *
 * Programmer:  JRM -- 1/18/22
 *
 *-------------------------------------------------------------------------
 */
herr_t
H5FD__subfiling__get_real_eof(hid_t context_id, int64_t *logical_eof_ptr)
{
    subfiling_context_t *sf_context  = NULL;
    MPI_Request         *recv_reqs   = NULL;
    int64_t             *recv_msg    = NULL;
    int64_t             *sf_eofs     = NULL; /* dynamically allocated array for subfile EOFs */
    int64_t              msg[3]      = {0, 0, 0};
    int64_t              logical_eof = 0;
    int64_t              sf_logical_eof;
    int                  n_io_concentrators = 0;
    int                  num_subfiles       = 0;
    int                  mpi_code;            /* MPI return code */
    herr_t               ret_value = SUCCEED; /* Return value */

    HDassert(logical_eof_ptr);

    if (NULL == (sf_context = (subfiling_context_t *)H5_get_subfiling_object(context_id)))
        H5_SUBFILING_GOTO_ERROR(H5E_FILE, H5E_BADVALUE, FAIL, "can't get subfile context");

    HDassert(sf_context->topology);

    n_io_concentrators = sf_context->topology->n_io_concentrators;
    num_subfiles       = sf_context->sf_num_subfiles;

    HDassert(n_io_concentrators > 0);
    HDassert(num_subfiles >= n_io_concentrators);

    if (NULL == (sf_eofs = HDmalloc((size_t)num_subfiles * sizeof(int64_t))))
        H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "can't allocate subfile EOFs array");
    if (NULL == (recv_reqs = HDmalloc((size_t)num_subfiles * sizeof(*recv_reqs))))
        H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "can't allocate receive requests array");
    if (NULL == (recv_msg = HDmalloc((size_t)num_subfiles * sizeof(msg))))
        H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "can't allocate message array");

    for (int i = 0; i < num_subfiles; i++) {
        sf_eofs[i]   = -1;
        recv_reqs[i] = MPI_REQUEST_NULL;
    }

    /* Post early non-blocking receives for the EOF of each subfile */
    for (int i = 0; i < num_subfiles; i++) {
        int ioc_rank = sf_context->topology->io_concentrators[i % n_io_concentrators];

        if (MPI_SUCCESS != (mpi_code = MPI_Irecv(&recv_msg[3 * i], 1, H5_subfiling_rpc_msg_type, ioc_rank,
                                                 GET_EOF_COMPLETED, sf_context->sf_eof_comm, &recv_reqs[i])))
            H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv", mpi_code);
    }

    /* Send each subfile's IOC a message requesting that subfile's EOF */

    msg[1] = -1; /* padding -- not used in this message */
    msg[2] = -1; /* padding -- not used in this message */

    for (int i = 0; i < num_subfiles; i++) {
        int ioc_rank = sf_context->topology->io_concentrators[i % n_io_concentrators];

        /* Set subfile index for receiving IOC */
        msg[0] = i / n_io_concentrators;

        if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 1, H5_subfiling_rpc_msg_type, ioc_rank, GET_EOF_OP,
                                                sf_context->sf_msg_comm)))
            H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send", mpi_code);
    }

    /* Wait for EOF communication to complete */
    if (MPI_SUCCESS != (mpi_code = MPI_Waitall(num_subfiles, recv_reqs, MPI_STATUSES_IGNORE)))
        H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Waitall", mpi_code);

    for (int i = 0; i < num_subfiles; i++) {
        int ioc_rank = (int)recv_msg[3 * i];

        HDassert(ioc_rank >= 0);
        HDassert(ioc_rank < n_io_concentrators);
        HDassert(sf_eofs[i] == -1);

        sf_eofs[i] = recv_msg[(3 * i) + 1];
    }

    /* 4) After all IOCs have replied, compute the offset of
     *    each subfile in the logical file.  Take the maximum
     *    of these values, and report this value as the overall
     *    EOF.
     */

    for (int i = 0; i < num_subfiles; i++) {

        /* compute number of complete stripes */
        sf_logical_eof = sf_eofs[i] / sf_context->sf_stripe_size;

        /* multiply by stripe size */
        sf_logical_eof *= sf_context->sf_stripe_size * num_subfiles;

        /* if the subfile doesn't end on a stripe size boundary, must add in a partial stripe */
        if (sf_eofs[i] % sf_context->sf_stripe_size > 0) {

            /* add in the size of the partial stripe up to but not including this subfile */
            sf_logical_eof += i * sf_context->sf_stripe_size;

            /* finally, add in the number of bytes in the last partial stripe depth in the subfile */
            sf_logical_eof += sf_eofs[i] % sf_context->sf_stripe_size;
        }

        if (sf_logical_eof > logical_eof) {

            logical_eof = sf_logical_eof;
        }
    }

#ifdef H5_SUBFILING_DEBUG
    H5_subfiling_log(context_id, "%s: calculated logical EOF = %" PRId64 ".", __func__, logical_eof);
#endif

    *logical_eof_ptr = logical_eof;

done:
    if (ret_value < 0) {
        for (int i = 0; i < num_subfiles; i++) {
            if (recv_reqs && (recv_reqs[i] != MPI_REQUEST_NULL)) {
                if (MPI_SUCCESS != (mpi_code = MPI_Cancel(&recv_reqs[i])))
                    H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Cancel", mpi_code);
            }
        }
    }

    HDfree(recv_msg);
    HDfree(recv_reqs);
    HDfree(sf_eofs);

    H5_SUBFILING_FUNC_LEAVE;
} /* H5FD__subfiling__get_real_eof() */