/****************************************************************************
** (c) Copyright IBM Corp. 2007 All rights reserved.
** 
** The following sample of source code ("Sample") is owned by International 
** Business Machines Corporation or one of its subsidiaries ("IBM") and is 
** copyrighted and licensed, not sold. You may use, copy, modify, and 
** distribute the Sample in any form without payment to IBM, for the purpose of 
** assisting you in the development of your applications.
** 
** The Sample code is provided to you on an "AS IS" basis, without warranty of 
** any kind. IBM HEREBY EXPRESSLY DISCLAIMS ALL WARRANTIES, EITHER EXPRESS OR 
** IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 
** MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. Some jurisdictions do 
** not allow for the exclusion or limitation of implied warranties, so the above 
** limitations or exclusions may not apply to you. IBM shall not be liable for 
** any damages you suffer as a result of using, copying, modifying or 
** distributing the Sample, even if IBM has been advised of the possibility of 
** such damages.
*****************************************************************************
**
** SOURCE FILE NAME: tbload.sqc
**
** SAMPLE: How to load into a partitioned database
**
**         Note:
**           This sample is meant to be run in a partitioned database
**           environment. If you attempt to run this program in a
**           non-partitioned database environment, you should receive
**           the following error:
**
**           SQL27959N  The partitioned database configuration option
**           "PARTITIONED DB CONFIG" is invalid.  Reason code = "1".
**
** DB2 API USED:
**         db2Load   -- Load
**         db2Export -- Export
**
** SQL STATEMENTS USED:
**         CREATE TABLE
**         DROP TABLE
**         COMMIT
**
*****************************************************************************
**
** For more information on the sample programs, see the README file.
**
** For information on developing embedded SQL applications see the Developing Embedded SQL Applications book.
**
** For information on using SQL statements, see the SQL Reference.
**
** For information on DB2 APIs, see the Administrative API Reference.
**
** For the latest information on programming, building, and running DB2
** applications, visit the DB2 Information Center:
**     http://publib.boulder.ibm.com/infocenter/db2luw/v9r7/index.jsp
****************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sqlenv.h>
#include <sqlutil.h>
#include <db2ApiDf.h>
#include "utilemb.h"

EXEC SQL INCLUDE SQLCA;

/* Function prototypes */
void PrintLoadSummary(db2LoadOut     *pLoadInfoOut,
                      db2PartLoadOut *pPartLoadInfoOut,
                      struct sqlca   *pSqlca);

int main(int argc, char* argv[])
{
  SQL_API_RC rc = SQL_RC_OK;
  char dbAlias[SQL_ALIAS_SZ + 1];
  char user[USERID_SZ + 1];
  char pswd[PSWD_SZ + 1];
  char dataFileName[256];
  char msgFileName[128];
  struct sqldcol dataDescriptor;
  char actionString[256];
  struct sqllob *pAction;
  struct db2ExportOut outputInfo;
  struct db2ExportStruct exportParmStruct;

  db2LoadStruct loadParms;
  const char *pActionString = "INSERT INTO newtable";
  const char *pFileTypeModString = "ANYORDER";
  sqlu_media_list loadMediaList;
  sqlu_location_entry inputLocationEntry;
  db2LoadIn loadInfoIn;
  db2LoadOut loadInfoOut;
  db2PartLoadIn partLoadInfoIn;
  db2PartLoadOut partLoadInfoOut;
  db2LoadNodeList partitioningDbPartNums;
  db2Uint16 mode;
  db2Uint16 isolatePartErrs;

  printf("\nTHIS SAMPLE SHOWS A PARTITIONED DATABASE LOAD OPERATION.\n");

  /* Check the command line arguments */
  rc = CmdLineArgsCheck1(argc, argv, dbAlias, user, pswd);
  if (rc != 0)
  {
    return rc;
  }

  /* Connect to the database */
  rc = DbConn(dbAlias, user, pswd);
  if (rc != 0)
  {
    return rc;
  }

#if(defined(DB2NT))
  sprintf(dataFileName, "%s%stbload.DEL", getenv("DB2PATH"), PATH_SEP);
#else
  sprintf(dataFileName, "%s%stbload.DEL", getenv("HOME"), PATH_SEP);
#endif

  /* First use export to create the data file. */
  printf("\n-----------------------------------------------------------");
  printf("\nUSE THE DB2 API:\n");
  printf("  db2Export -- Export\n");
  printf("TO EXPORT DATA TO A FILE.\n");

  printf("\n  Be sure to complete all table operations and release\n");
  printf("  all locks before starting an export operation. This\n");
  printf("  can be done by issuing a COMMIT after closing all\n");
  printf("  cursors opened WITH HOLD, or by issuing a ROLLBACK.\n");
  printf("  Please refer to the 'Administrative API Reference'\n");
  printf("  for the details.\n");

  /* export data */
  dataDescriptor.dcolmeth = SQL_METH_D;
  strcpy(actionString, "SELECT deptnumb, manager FROM org");
  pAction = (struct sqllob *)malloc(sizeof(sqluint32) +
                                     sizeof(actionString) + 1);
  pAction->length = strlen(actionString);
  strcpy(pAction->data, actionString);
  strcpy(msgFileName, "tbload.MSG");

  exportParmStruct.piDataFileName    = dataFileName;
  exportParmStruct.piLobPathList     = NULL;
  exportParmStruct.piLobFileList     = NULL;
  exportParmStruct.piDataDescriptor  = &dataDescriptor;
  exportParmStruct.piActionString    = pAction;
  exportParmStruct.piFileType        = SQL_DEL;
  exportParmStruct.piFileTypeMod     = NULL;
  exportParmStruct.piMsgFileName     = msgFileName;
  exportParmStruct.iCallerAction     = SQLU_INITIAL;
  exportParmStruct.poExportInfoOut   = &outputInfo;

  /* From V9.0 onwards, the structure db2ExportStruct */
  /* will have three new members. They are            */
  /* piExportInfoIn, piXmlPathList and piXmlFileList */
  exportParmStruct.piExportInfoIn    = NULL;
  exportParmStruct.piXmlPathList     = NULL;
  exportParmStruct.piXmlFileList     = NULL;


  printf("\n  Export data.\n");
  printf("    client destination file name: %s\n", dataFileName);
  printf("    action                      : %s\n", actionString);
  printf("    client message file name    : %s\n", msgFileName);

  /* export data */
  db2Export(db2Version970,
            &exportParmStruct,
            &sqlca);

  DB2_API_CHECK("data -- export");

  /* free memory allocated */
  free(pAction);


  printf("\n--------------------------------------------------------\n");
  printf("\nUSE THE DB2 API:\n");
  printf("  db2Load -- Load\n");
  printf("TO LOAD DATA TO A PARTITIONED DATABASE TABLE.\n");

  printf("\nCREATE TABLE newtable(c1 INT, c2 INT)\n");

  /* Create table newtable to load into */
  EXEC SQL CREATE TABLE newtable(C1 INT, C2 INT);
  EMB_SQL_CHECK("newtable -- create");

  EXEC SQL COMMIT;
  EMB_SQL_CHECK("transaction -- commit");

  /**********************************************************************
   * Set up and initialize the db2Load API parameter structure
   **********************************************************************/
  memset(&loadParms, '\0', sizeof(db2LoadStruct));

  /* Set up the list of input source files. We are using just one */
  /* which will be called "tbload.DEL"                            */
  loadParms.piSourceList = &loadMediaList;
  loadParms.piSourceList->media_type = SQLU_SERVER_LOCATION;
  loadParms.piSourceList->sessions = 1;
  loadParms.piSourceList->target.location = &inputLocationEntry;
  strcpy(loadParms.piSourceList->target.location->location_entry,
         dataFileName);

  /* Set up the load action string to "INSERT INTO TABLE1" */
  loadParms.piActionString =
    (struct sqlchar *)malloc(sizeof(short) +
                             strlen(pActionString) + 1);
  strcpy(loadParms.piActionString->data, pActionString);
  loadParms.piActionString->length = strlen(pActionString);

  /* Set the file type to DEL (i.e., an ASCII delimited file) */
  loadParms.piFileType = (char *)SQL_DEL;

  /* Specify the ANYORDER file type modifier which indicates to the  */
  /* load utility that it is not necessary to load the rows of data  */
  /* into the table in the same order they appear in the input file. */
  /* This can result in better load performance and permits the use  */
  /* of multiple partitioning agents as well.                        */
  loadParms.piFileTypeMod =
    (struct sqlchar *)malloc(sizeof(short) +
                             strlen(pFileTypeModString) + 1);
  strcpy(loadParms.piFileTypeMod->data, pFileTypeModString);
  loadParms.piFileTypeMod->length = strlen(pFileTypeModString);

  /* Set up the name that will serve as a prefix for the  */
  /* message files retrieved from each partition that is  */
  /* participating in the load operation.                 */
  loadParms.piLocalMsgFileName = (char *)"tbload.MSG";

  /* Set up and initialize the load input structure */
  memset(&loadInfoIn, '\0', sizeof(db2LoadIn));
  loadInfoIn.iNonrecoverable       = SQLU_NON_RECOVERABLE_LOAD;
  loadInfoIn.iIndexingMode         = SQLU_INX_AUTOSELECT;
  loadInfoIn.iAccessLevel          = SQLU_ALLOW_NO_ACCESS;
  loadInfoIn.iLockWithForce        = SQLU_NO_FORCE;

  /* From V9.0 onwards, the structure member iCheckPending is                 */
  /* deprecated and replaced with iSetIntegrityPending. Also the              */
  /* possible value to set this  variable SQLU_CHECK_PENDING_CASCADE_DEFERRED */
  /* has been replaced with SQLU_SI_PENDING_CASCADE_DEFERRED.                 */
  loadInfoIn.iSetIntegrityPending = SQLU_SI_PENDING_CASCADE_DEFERRED;

  loadInfoIn.iRestartphase         = ' ';
  loadInfoIn.iStatsOpt             = SQLU_STATS_NONE;
  loadParms.piLoadInfoIn = (db2LoadIn *)&loadInfoIn;

  /* Set up and initialize the load output structure */
  memset(&loadInfoOut, '\0', sizeof(db2LoadOut));
  loadParms.poLoadInfoOut = (db2LoadOut *)&loadInfoOut;

  /* Set up the callerac to indicate this is an initial load operation */
  loadParms.iCallerAction = SQLU_INITIAL;

  /**********************************************************************
   * Set up the partitioning load input structure.
   *
   * NOTE: A value of NULL for any field in this structure will
   *       result in the default value for the option being used.
   *
   *       It is recommended that callers zero out the entire structure
   *       and then set up only those parameters that have non-default
   *       values.
   **********************************************************************/
  memset(&partLoadInfoIn, '\0', sizeof(db2PartLoadIn));

  /* Set the mode to PARTITION_AND_LOAD -- this is the default value */
  /* but we do it anyway just to show how it would be set up for     */
  /* non-default values                                              */
  mode = DB2LOAD_PARTITION_AND_LOAD;
  partLoadInfoIn.piMode = &mode;

  /* By setting piOutputNodes to NULL we are indicating that we      */
  /* want loading to take place on all nodes the table is defined    */
  /* on.  Again, this is the default value, but we do it anyway for  */
  /* instructional purposes.                                         */
  partLoadInfoIn.piOutputNodes = NULL;

  /* Set up the piPartitioningNodes parameter to indicate that       */
  /* we want a partitioning agent on node 0.  We will just show      */
  /* how to do this in a comment.  For the real code we will use the */
  /* the default value, NULL, which will indicate to LOAD that it    */
  /* should try to select the best node(s) for partitioning.         */

  /* partitioningDbPartNums.iNumNodes = 1;
     partitioningDbPartNums.piNodeList =
      (SQL_PDB_NODE_TYPE *)malloc(1 * sizeof(SQL_PDB_NODE_TYPE));
     partitioningDbPartNums.piNodeList[0] = 0;
     partLoadInfoIn.piPartitioningNodes = &partitioningDbPartNums; */

  partLoadInfoIn.piPartitioningNodes = NULL;

  /* Set up the error isolation mode to SETUP_AND_LOAD_ERRS */
  isolatePartErrs = DB2LOAD_SETUP_AND_LOAD_ERRS;
  partLoadInfoIn.piIsolatePartErrs = &isolatePartErrs;

  loadParms.piPartLoadInfoIn = &partLoadInfoIn;

  /**********************************************************************
   * Set up the partitioned load output structure
   **********************************************************************/
  memset(&partLoadInfoOut, '\0', sizeof(db2PartLoadOut));

  /* Reserve space for 100 agent info entries. In general, setting */
  /* iMaxAgentInfoEntries to 3 * <number of nodes> in cluster      */
  /* should be sufficient.                                         */
  partLoadInfoOut.iMaxAgentInfoEntries = 100;
  partLoadInfoOut.poAgentInfoList =
    (db2LoadAgentInfo *)malloc(sizeof(db2LoadAgentInfo) * 100);

  loadParms.poPartLoadInfoOut = &partLoadInfoOut;

  /*********************************************************************
   * Call the db2Load API
   *********************************************************************/
  printf("\n  Load data.\n");
  printf("    client destination file name: %s\n", dataFileName);
  printf("    action                      : %s\n", pActionString);
  printf("    client message file name    : %s\n", msgFileName);

  db2Load(db2Version970,
          &loadParms,
          &sqlca);

  /* Display and warnings or errors */
  if (sqlca.sqlcode != 0)
  {
    printf("\nThe following error is expected for non-partitioned");
    printf(" database environments.\n");
    SqlInfoPrint("table -- load", &sqlca, __LINE__, __FILE__);
  }
  else
  {
   /* Display a partition-level summary of the load operation */
    PrintLoadSummary(loadParms.poLoadInfoOut,
                     loadParms.poPartLoadInfoOut,
                     &sqlca);
  }

  /* Drop newtable */
  printf("\nDROP TABLE newtable\n");

  EXEC SQL DROP TABLE newtable;
  EMB_SQL_CHECK("new table -- drop");

  EXEC SQL COMMIT;
  EMB_SQL_CHECK("transaction -- commit");

  /* Free dynamically allocated memory */
  if (loadParms.piActionString != NULL)
  {
    free(loadParms.piActionString);
    loadParms.piActionString = NULL;
  }

  if (loadParms.piFileTypeMod != NULL)
  {
    free(loadParms.piFileTypeMod);
    loadParms.piFileTypeMod = NULL;
  }

  if (partitioningDbPartNums.piNodeList != NULL)
  {
    free(partitioningDbPartNums.piNodeList);
    loadParms.piFileTypeMod = NULL;
  }

  if (partLoadInfoOut.poAgentInfoList != NULL)
  {
    free(partLoadInfoOut.poAgentInfoList);
    partLoadInfoOut.poAgentInfoList = NULL;
  }

  /* Disconnect from the database */
  rc = DbDisconn(dbAlias);
  if (rc != 0)
  {
    return rc;
  }

  return rc;
}

/* ------------------------- HELPER FUNCTIONS ------------------------ */

/* This will print the load summary plus the sqlcode message that was  */
/* returned by LOAD.                                                   */
void PrintLoadSummary(db2LoadOut     *pLoadInfoOut,
                      db2PartLoadOut *pPartLoadInfoOut,
                      struct sqlca   *pSqlca)
{
  int i;
  char *loadAgentName[] = {"LOAD_AGENT",
                           "PARTITIONING_AGENT",
                           "PRE_PARTITIONING_AGENT",
                           "FILE_TRANSFER_AGENT",
                           "LOAD_TO_FILE_AGENT"};
  int numAgentInfoEntries;

  /* Determine the number of agent info entries in the list.  If we   */
  /* didn't allocate enough memory, oNumAgentInfoEntries could be     */
  /* greater than iMaxAgentInfoEntries, but in this case we should    */
  /* only display the first iMaxAgentInfoEntries elements of the list */
  if (pPartLoadInfoOut->oNumAgentInfoEntries <
      pPartLoadInfoOut->iMaxAgentInfoEntries)
  {
    numAgentInfoEntries = pPartLoadInfoOut->oNumAgentInfoEntries;
  }
  else
  {
    numAgentInfoEntries = pPartLoadInfoOut->iMaxAgentInfoEntries;
  }

  printf("\nRESULTS OF LOAD OPERATION:\n\n");

  printf("  LOAD AGENT TYPE           NODE SQLCODE TABLE STATE     \n");
  printf("  -------------------------------------------------------\n");

  /* First dump some summary information about the partitioned db load */
  for (i = 0; i < numAgentInfoEntries; i++)
  {
    printf("  %-25s  %3.3d  %+6d ",
           loadAgentName[pPartLoadInfoOut->poAgentInfoList[i].oAgentType],
           pPartLoadInfoOut->poAgentInfoList[i].oNodeNum,
           pPartLoadInfoOut->poAgentInfoList[i].oSqlcode);

    /* Display the table state on loading partitions */
    if (pPartLoadInfoOut->poAgentInfoList[i].oAgentType ==
        DB2LOAD_LOAD_AGENT)
    {
      switch(pPartLoadInfoOut->poAgentInfoList[i].oTableState)
      {
        case DB2LOADQUERY_NORMAL:
          printf("%-8s\n", "NORMAL");
          break;

        case DB2LOADQUERY_UNCHANGED:
          printf("%-8s\n", "UNCHANGED");
          break;

        case DB2LOADQUERY_LOAD_IN_PROGRESS:
          printf("%-8s\n", "LOAD IN PROGRESS");
          break;

        case DB2LOADQUERY_LOAD_PENDING:
          printf("%-8s\n", "LOAD PENDING");
          break;

        default:
          printf("%-8s\n", "UNKNOWN");
      }
    }
    else
    {
      printf("%-8s\n", "N/A");
    }
  }

  /* Now print the partitioning statistics */
  printf("\n");
  printf("  Partitioning summary:\n");
  printf("  Number of rows read        = %d\n",
         (int)pPartLoadInfoOut->oRowsRdPartAgents);
  printf("  Number of rows rejected    = %d\n",
         (int)pPartLoadInfoOut->oRowsRejPartAgents);
  printf("  Number of rows partitioned = %d\n",
         (int)pPartLoadInfoOut->oRowsPartitioned);

  /* Now print the load statistics (i.e., number of rows loaded, etc.) */
  printf("\n");
  printf("  Load summary:\n");
  printf("  Number of rows read        = %d\n",
         (int)pLoadInfoOut->oRowsRead);
  printf("  Number of rows skipped     = %d\n",
         (int)pLoadInfoOut->oRowsSkipped);
  printf("  Number of rows loaded      = %d\n",
         (int)pLoadInfoOut->oRowsLoaded);
  printf("  Number of rows rejected    = %d\n",
         (int)pLoadInfoOut->oRowsRejected);
  printf("  Number of rows deleted     = %d\n",
         (int)pLoadInfoOut->oRowsDeleted);
  printf("  Number of rows committed   = %d\n",
         (int)pLoadInfoOut->oRowsCommitted);
  printf("\n");
}