/*
 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
 *                         University Research and Technology
 *                         Corporation.  All rights reserved.
 * Copyright (c) 2004-2017 The University of Tennessee and The University
 *                         of Tennessee Research Foundation.  All rights
 *                         reserved.
 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
 *                         University of Stuttgart.  All rights reserved.
 * Copyright (c) 2004-2005 The Regents of the University of California.
 *                         All rights reserved.
 * Copyright (c) 2013-2018 University of Houston. All rights reserved.
 * Copyright (c) 2018      Research Organization for Information Science
 *                         and Technology (RIST). All rights reserved.
 * $COPYRIGHT$
 *
 * Additional copyrights may follow
 *
 * $HEADER$
 */


#include "ompi_config.h"
#include "sharedfp_individual.h"

#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/mca/sharedfp/sharedfp.h"
#include "ompi/mca/sharedfp/base/base.h"
#include "ompi/mca/common/ompio/common_ompio.h"

#include <stdlib.h>
#include <stdio.h>

int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh, ompio_file_t *ompio_fh)
{
    int ret = OMPI_SUCCESS;
    mca_sharedfp_individual_header_record *headnode = NULL;
    char *buff=NULL;
    int nodesoneachprocess = 0;
    int idx=0,i=0,j=0, l=0;
    int *ranks = NULL;
    double *timestampbuff = NULL;
    OMPI_MPI_OFFSET_TYPE *offsetbuff = NULL;
    int *countbuff = NULL;
    int *displ = NULL;
    double *ind_ts = NULL;
    long *ind_recordlength = NULL;
    OMPI_MPI_OFFSET_TYPE *local_off = NULL;
    int totalnodes = 0;
    ompi_status_public_t status;
    int recordlength=0;

    headnode = (mca_sharedfp_individual_header_record*)sh->selected_module_data;
    if ( NULL == headnode)  {
        opal_output(0, "sharedfp_individual_collaborate_data: headnode is NULL but file is open\n");
        return OMPI_ERROR;
    }

    /* Number of nodes on each process is the sum of records
     * on file and records in the linked list
     */
    nodesoneachprocess = headnode->numofrecordsonfile + headnode->numofrecords;

    if ( mca_sharedfp_individual_verbose ) {
        opal_output(ompi_sharedfp_base_framework.framework_output,
                    "Nodes of each process = %d\n",nodesoneachprocess);
    }

    countbuff = (int*)malloc(ompio_fh->f_size * sizeof(int));
    if ( NULL == countbuff  ) {
        return OMPI_ERR_OUT_OF_RESOURCE;
    }

    displ = (int*)malloc(sizeof(int) * ompio_fh->f_size);
    if ( NULL == displ ) {
        ret = OMPI_ERR_OUT_OF_RESOURCE;
	goto exit;
    }

    /* Each process counts the number of nodes
     * in its linked list for which global offset */
    ret =  mca_sharedfp_individual_get_timestamps_and_reclengths ( &ind_ts, &ind_recordlength,
								   &local_off, sh );
    if ( OMPI_SUCCESS != ret ) {
	goto exit;
    }

    ret = ompio_fh->f_comm->c_coll->coll_allgather ( &nodesoneachprocess, 
                                                     1, 
                                                     MPI_INT,
                                                     countbuff, 
                                                     1, 
                                                     MPI_INT, 
                                                     ompio_fh->f_comm,
                                                     ompio_fh->f_comm->c_coll->coll_allgather_module );

    if ( OMPI_SUCCESS != ret ) {
	goto exit;
    }


    if ( mca_sharedfp_individual_verbose) {
	for (i = 0; i < ompio_fh->f_size ; i++) {
            opal_output(ompi_sharedfp_base_framework.framework_output,"sharedfp_individual_collaborate_data: "
                        "Countbuff[%d] = %d\n", i, countbuff[i]);
	}
    }

    if ( 0 == nodesoneachprocess )    {
        ind_ts[0] = 0;
        ind_recordlength[0] = 0;
        local_off[0] = 0;
    }

    for(i = 0; i < ompio_fh->f_size; i++) {
        displ[i]    = totalnodes;
	if ( mca_sharedfp_individual_verbose ) {
            opal_output(ompi_sharedfp_base_framework.framework_output,
                        "sharedfp_individual_collaborate_data: displ[%d] = %d\n",i,displ[i]);
        }
        totalnodes  = totalnodes + countbuff[i];
    }

    if (totalnodes <= 0 ) {
	goto exit;
    }

    ranks = (int *) malloc ( totalnodes * sizeof(int));
    if ( NULL == ranks ) {
        ret = OMPI_ERR_OUT_OF_RESOURCE;
	goto exit;
    }
    for ( l=0, i=0; i< ompio_fh->f_size; i++ ) {
        for ( j=0; j< countbuff[i]; j++ ) {
            ranks[l++]=i;
        }
    }

    ret =  mca_sharedfp_individual_create_buff ( &timestampbuff, &offsetbuff, totalnodes, ompio_fh->f_size);
    if ( OMPI_SUCCESS != ret ) {
	goto exit;
    }

    ret = ompio_fh->f_comm->c_coll->coll_allgatherv ( ind_ts, 
                                                      countbuff[ompio_fh->f_rank], 
                                                      MPI_DOUBLE,
                                                      timestampbuff, 
                                                      countbuff, 
                                                      displ, 
                                                      MPI_DOUBLE,
                                                      ompio_fh->f_comm, 
                                                      ompio_fh->f_comm->c_coll->coll_allgatherv_module );
    if ( OMPI_SUCCESS != ret ) {
	goto exit;
    }

    ret = ompio_fh->f_comm->c_coll->coll_allgatherv ( ind_recordlength, 
                                                      countbuff[ompio_fh->f_rank], 
                                                      OMPI_OFFSET_DATATYPE,
                                                      offsetbuff, 
                                                      countbuff, 
                                                      displ, 
                                                      OMPI_OFFSET_DATATYPE,
                                                      ompio_fh->f_comm, 
                                                      ompio_fh->f_comm->c_coll->coll_allgatherv_module );
    if ( OMPI_SUCCESS != ret ) {
	goto exit;
    }

    ret =  mca_sharedfp_individual_sort_timestamps(&timestampbuff, &offsetbuff, &ranks, totalnodes);
    if ( OMPI_SUCCESS != ret ) {
	goto exit;
    }

    sh->global_offset = mca_sharedfp_individual_assign_globaloffset ( &offsetbuff, totalnodes, sh);

    recordlength = ind_recordlength[0] * 1.2;
    buff = (char * ) malloc( recordlength );
    if  ( NULL == buff ) {
	ret = OMPI_ERR_OUT_OF_RESOURCE;
	goto exit;
    }

    for (i = 0; i < nodesoneachprocess ; i++)  {
        if ( ind_recordlength[i] > recordlength ) {
            recordlength = ind_recordlength[i] * 1.2;
            buff = (char *) realloc ( buff, recordlength );
            if  ( NULL == buff ) {
                ret = OMPI_ERR_OUT_OF_RESOURCE;
                goto exit;
            }
        }

	/*Read from the local data file*/
	ret = mca_common_ompio_file_read_at ( headnode->datafilehandle,
                                              local_off[i], buff, ind_recordlength[i],
                                              MPI_BYTE, &status);
        if ( OMPI_SUCCESS != ret ) {
            goto exit;
        }

	idx =  mca_sharedfp_individual_getoffset(ind_ts[i],timestampbuff, ranks, ompio_fh->f_rank, totalnodes);

	if ( mca_sharedfp_individual_verbose ) {
            opal_output(ompi_sharedfp_base_framework.framework_output,
                        "sharedfp_individual_collaborate_data: Process %d writing %ld bytes to main file at position"
                        "%lld (%d)\n", ompio_fh->f_rank, ind_recordlength[i], offsetbuff[idx], idx);
        }

	/*Write into main data file*/
	ret = mca_common_ompio_file_write_at( ompio_fh, offsetbuff[idx], buff,
                                              ind_recordlength[i], MPI_BYTE, &status);
        if ( OMPI_SUCCESS != ret ) {
            goto exit;
        }

    }

exit:
    if ( NULL != countbuff ) {
	free ( countbuff );
    }
    if ( NULL != displ ) {
	free ( displ );
    }

    if( NULL != timestampbuff ){
        free ( timestampbuff );
    }
    if ( NULL != offsetbuff ){
        free ( offsetbuff );
    }
    if ( NULL != ind_ts ) {
	free ( ind_ts );
    }
    if ( NULL != ind_recordlength ) {
	free ( ind_recordlength );
    }
    if ( NULL != local_off ) {
	free ( local_off );
    }
    if ( NULL != buff ) {
	free ( buff );
    }
    if ( NULL != ranks ) {
        free ( ranks );
    }

    return ret;
}

/* Count the number of nodes and create and array of the timestamps*/
int  mca_sharedfp_individual_get_timestamps_and_reclengths ( double **buff, long **rec_length, 
                                                             MPI_Offset **offbuff,struct mca_sharedfp_base_data_t *sh)
{
    int num = 0, i= 0, ctr = 0;
    int ret=OMPI_SUCCESS;
    mca_sharedfp_individual_metadata_node *currnode;
    mca_sharedfp_individual_header_record *headnode;
    OMPI_MPI_OFFSET_TYPE metaoffset = 0;
    struct  mca_sharedfp_individual_record2 rec;
    MPI_Status status;

    headnode = (mca_sharedfp_individual_header_record*)(sh->selected_module_data);
    num = ( headnode->numofrecords + headnode->numofrecordsonfile);
    currnode = headnode->next;

    if ( mca_sharedfp_individual_verbose ) {
        opal_output(ompi_sharedfp_base_framework.framework_output,"Num is %d\n",num);
    }

    if ( 0 == num )   {
        *buff       = (double*) malloc ( sizeof ( double ));
        *rec_length = (long *) malloc ( sizeof ( long ));
        *offbuff    = (OMPI_MPI_OFFSET_TYPE *)malloc ( sizeof(OMPI_MPI_OFFSET_TYPE) );
	if ( NULL == *buff || NULL == *rec_length || NULL == *offbuff ) {
	    ret = OMPI_ERR_OUT_OF_RESOURCE;
	    goto exit;
	}
    }
    else {
        *buff       = (double* ) malloc(sizeof ( double) * num);
        *rec_length = (long *) malloc(sizeof ( long) * num);
        *offbuff    = (OMPI_MPI_OFFSET_TYPE *) malloc ( sizeof(OMPI_MPI_OFFSET_TYPE) * num);
	if ( NULL == *buff || NULL == *rec_length || NULL == *offbuff ) {
	    ret = OMPI_ERR_OUT_OF_RESOURCE;
	    goto exit;
	}
    }

    if ( mca_sharedfp_individual_verbose ) {
        opal_output(ompi_sharedfp_base_framework.framework_output,
                    "sharedfp_individual_get_timestamps_and_reclengths: Numofrecords on file %d\n",
                    headnode->numofrecordsonfile);
    }

    if (headnode->numofrecordsonfile >  0)  {
        metaoffset = headnode->metafile_start_offset;
        ctr = 0;
        for (i = 0; i < headnode->numofrecordsonfile ; i++)  {

            ret = mca_common_ompio_file_read_at(headnode->metadatafilehandle,metaoffset, 
                                                &rec, 32, MPI_BYTE,&status);
            if ( OMPI_SUCCESS != ret ) {
                goto exit;
            }

            *(*rec_length + ctr) = rec.recordlength;
            *(*buff + ctr) = rec.timestamp;
            *(*offbuff + ctr) = rec.localposition;

            metaoffset = metaoffset +  sizeof(struct  mca_sharedfp_individual_record2);

            if ( mca_sharedfp_individual_verbose ) {
                opal_output(ompi_sharedfp_base_framework.framework_output,
                            "sharedfp_individual_get_timestamps_and_reclengths: Ctr = %d\n",ctr);
            }
            ctr++;
        }

        headnode->numofrecordsonfile = 0;
        headnode->metafile_start_offset = metaoffset;

    }	/* End of if (headnode->numofrecordsonfile > 0) */

    /* Add the records from the linked list */
    currnode = headnode->next;
    while (currnode)  {
        if ( mca_sharedfp_individual_verbose ) {
            opal_output(ompi_sharedfp_base_framework.framework_output,"Ctr = %d\n",ctr);
        }
        /* Some error over here..need to check this code again */
        /*while(headnode->next  != NULL)*/

        *(*rec_length + ctr) = currnode->recordlength;
        *(*buff + ctr) = currnode->timestamp;
        *(*offbuff + ctr) = currnode->localposition;

        ctr = ctr + 1;

        headnode->next = currnode->next;
        if ( mca_sharedfp_individual_verbose ) {
            opal_output(ompi_sharedfp_base_framework.framework_output,
                        "sharedfp_individual_get_timestamps_and_reclengths: node deleted from the metadatalinked list\n");
        }
        free(currnode);
        currnode = headnode->next;

    }	/*End of while(currnode) loop*/


    /*Reset the numofrecords*/
    headnode->numofrecords = 0;

exit:

    return ret;
}

int  mca_sharedfp_individual_create_buff(double **ts,MPI_Offset **off,int totalnodes, int size)
{

    if ( totalnodes)  {
        *off = (OMPI_MPI_OFFSET_TYPE *) malloc ( totalnodes * sizeof(OMPI_MPI_OFFSET_TYPE));
        if ( NULL == *off ) {
	    return OMPI_ERR_OUT_OF_RESOURCE;
	}

        *ts = (double *) malloc ( totalnodes * sizeof(double) );
        if (NULL == *ts ) {
            return OMPI_ERR_OUT_OF_RESOURCE;
	}

    }

    return OMPI_SUCCESS;
}

/*Sort the timestamp buffer*/
int  mca_sharedfp_individual_sort_timestamps(double **ts, MPI_Offset **off, int **ranks, int totalnodes)
{

    int i = 0;
    int j = 0;
    int flag = 1;
    double tempts = 0.0;
    OMPI_MPI_OFFSET_TYPE tempoffset = 0;
    int temprank = 0;

    for (i= 1; (i <= totalnodes)&&(flag) ; i++)  {
        flag = 0;
        for (j = 0; j < (totalnodes - 1); j++)  {
            if ( *(*ts + j + 1) < *(*ts + j ))  {
                /*swap timestamp*/
                tempts = *(*ts + j );
                *(*ts + j) = *(*ts + j + 1);
                *(*ts + j + 1) = tempts;

                /*swap offset*/
                tempoffset = *(*off + j);
                *(*off + j) = *(*off + j + 1);
                *(*off + j + 1) = tempoffset;

                /*swap ranks*/
                temprank = *(*ranks + j);
                *(*ranks + j) = *(*ranks + j + 1);
                *(*ranks + j + 1) = temprank;

                flag = 1;
            }
        }

    }

    return OMPI_SUCCESS;
}


MPI_Offset  mca_sharedfp_individual_assign_globaloffset(MPI_Offset **offsetbuff,int totalnodes,
                                                        struct mca_sharedfp_base_data_t *sh)
{
    int i = 0;
    OMPI_MPI_OFFSET_TYPE temp = 0,prevoffset = 0;
    OMPI_MPI_OFFSET_TYPE global_offset = 0;

    for (i = 0; i < totalnodes; i++) {
        temp = *(*offsetbuff + i);

        if (i == 0) {
            *(*offsetbuff + i ) = sh->global_offset;
	}
        else {
            *(*offsetbuff + i) = *(*offsetbuff + i - 1) + prevoffset;
	}
        prevoffset = temp;
    }
    global_offset =   *(*offsetbuff + i - 1) + prevoffset;

    return global_offset;
}


int  mca_sharedfp_individual_getoffset(double timestamp, double *ts, int *ranks, int myrank, int totalnodes)
{
    int i = 0;
    int notfound = 1;


    while (notfound) {
        if (ts[i] == timestamp && ranks[i] == myrank )
            break;

        i++;

        if (i == totalnodes)  {
            notfound = 0;
        }
    }

    if (!notfound) {
        return -1;
    }

    return i;
}