/****************************************************************************
** (c) Copyright IBM Corp. 2007 All rights reserved.
** 
** The following sample of source code ("Sample") is owned by International 
** Business Machines Corporation or one of its subsidiaries ("IBM") and is 
** copyrighted and licensed, not sold. You may use, copy, modify, and 
** distribute the Sample in any form without payment to IBM, for the purpose of 
** assisting you in the development of your applications.
** 
** The Sample code is provided to you on an "AS IS" basis, without warranty of 
** any kind. IBM HEREBY EXPRESSLY DISCLAIMS ALL WARRANTIES, EITHER EXPRESS OR 
** IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 
** MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. Some jurisdictions do 
** not allow for the exclusion or limitation of implied warranties, so the above 
** limitations or exclusions may not apply to you. IBM shall not be liable for 
** any damages you suffer as a result of using, copying, modifying or 
** distributing the Sample, even if IBM has been advised of the possibility of 
** such damages.
*****************************************************************************
**
** SOURCE FILE NAME: dbthrds.sqC
**
** SAMPLE: How to use multiple context APIs on UNIX
**
**         This program uses the POSIX threads APIs for thread creation and
**         management.  On Solaris systems it is also possible to use the
**         Solaris thread APIs such as thd_create.
**
**         The program maintains a pool of contexts. A generate_work
**         function is executed from main(), and creates dynamic SQL
**         statements that are executed by worker threads. When a
**         context becomes available, a thread is created and dispatched
**         to do the specified work.
**
**         The work generated consists of statements to delete entries
**         from either the STAFF or EMPLOYEE tables of the SAMPLE database.
**
**         Compile and link with C compiler options for multi-threaded
**         applications supported by your platform.
**
**         Note:
**           On some environments, the output may appear garbled because
**           one thread process outputs information at the same time as
**           another process, thereby overwriting output strings. If this
**           is a concern, you can add a locking mechanism for the output
**           so only one process outputs at any one time.
**
** SQL STATEMENTS USED:
**         CONNECT
**         EXECUTE IMMEDIATE
**
**                           
*****************************************************************************
**
** For more information on the sample programs, see the README file.
**
** For information on developing embedded SQL applications see the Developing Embedded SQL Applications book.
**
** For information on using SQL statements, see the SQL Reference.
**
** For information on DB2 APIs, see the Administrative API Reference.
**
** For the latest information on programming, building, and running DB2
** applications, visit the DB2 Information Center:
**     http://publib.boulder.ibm.com/infocenter/db2luw/v9r7/index.jsp
****************************************************************************/

#ifdef USE_UI_THREADS
// Sun has "Unix International" threads APIs
#include <thread.h>
#include <synch.h>
#else
#include <pthread.h>
#endif

#include <sql.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#if ((__cplusplus >= 199711L) && !defined DB2HP && !defined DB2AIX) || \
    (DB2LINUX && (__LP64__ || (__GNUC__ >= 3)) )
   #include <iostream>
   using namespace std;
#else
   #include <iostream.h>
#endif


#ifdef USE_UI_THREADS
// Hide the differences in the threads implementations
#define pthread_exit(x) thr_exit(x)
#define pthread_mutex_lock(x) mutex_lock(x)
#define pthread_mutex_unlock(x) mutex_unlock(x)
#define pthread_mutex_init(x,y) mutex_init(x, USYNC_THREAD, y)
#define pthread_cond_init(x,y) cond_init(x, USYNC_THREAD, y)
#define pthread_cond_wait(x,y) cond_wait(x,y)
#define pthread_cond_signal(x) cond_signal(x)
#define pthread_mutex_t mutex_t
#define pthread_cond_t cond_t
#define pthread_t thread_t
#endif

#if (defined(DB2HP))
#define DEFAULT_STACK_SIZE 0x20000
#else
#define DEFAULT_STACK_SIZE 0
#endif

#define check_expected(condition)                                  \
{                                                                  \
  if (!(condition))                                                \
  {                                                                \
    cerr << __FILE__ << ":" << __LINE__ << " unexpected error: \"" \
         << #condition << "\" was false" << endl;                  \
    exit(1);                                                       \
  }                                                                \
}

#define CHECKERR(context, CE_STR, pStatus)              \
{                                                       \
  char buf[256];                                        \
  sprintf(buf, "Context nb.: %i\n%s", context, CE_STR); \
  if (check_error(buf, &sqlca) != 0)                    \
  {                                                     \
    *(pStatus) = sqlca.sqlcode;                         \
  }                                                     \
}

// Generate work creates the following type of structure which is passed
// to each worker thread.
struct work
{
  char database[15]; // database for thread to connect to
  char userid[15];
  char password[15];
  char *command; // dynamic SQL statement to execute
  int context; // context to use for connection
};

// The context pool consists of an array of 'struct context' types.
struct context
{
  void *ctx;
  int free;
};

// Global variables.
int contexts = 8; // size of context pool
struct context *ctxlist;

#ifndef USE_UI_THREADS
pthread_attr_t attr; // global thread attributes
#endif
pthread_t *thd; // array of thread ids

int loops = 15; // amount of work for the client to create
int commit = 0; // commit the work done
int verbose = 1;
char database[15];
char userid[15];
char password[15];

// for management of the context pool
int contexts_free;
pthread_cond_t cond;
pthread_mutex_t cond_m;

// mutex for serializing cout
pthread_mutex_t output;

// Prototypes.
void initialize(int argc, char *argv[]);
void usage(char *argv0);
void generate_work();
void dispatch(struct work *work_item);
void *do_work(void *args);      // each thread executes this function
void clean_up(struct work *work_item, int connect_done, int *pStatus);
int check_error(char eString[], struct sqlca *caPointer);

int main(int argc, char *argv[])
{

  // Initialize the mutex for serializing cout
  pthread_mutex_init(&output, NULL);

  pthread_mutex_lock (&output);
  cout << "\nHow to use multiple context APIs" << endl << endl;
  pthread_mutex_unlock (&output);

  initialize(argc, argv);
  generate_work();
  if (verbose)
  {
    pthread_mutex_lock (&output);
    cout << "all workers started, exiting main" << endl;
    pthread_mutex_unlock (&output);
  }
  pthread_exit(0);
} //main

// Initialize any global program state.  This includes the attributes
// used for each thread creation, the setting of the multi-manual context
// type and the creation of the context pool.
void initialize(int argc, char *argv[])
{
  int c, i, rc;
  struct sqlca sqlca;

  strcpy(database, "sample");
  strcpy(userid, "");
  strcpy(password, "");

  // read any command line options
  while ((c = getopt(argc, argv, "d:u:p:l:c:qCh")) != EOF)
  {
    switch (c)
    {
      case 'd':
        strcpy(database, optarg);
        break;
      case 'u':
        strcpy(userid, optarg);
        break;
      case 'p':
        strcpy(password, optarg);
        break;
      case 'l':
        loops = atoi(optarg);
        break;
      case 'c':
        contexts = atoi(optarg);
        break;
      case 'q':
        verbose = 0;
        break;
      case 'C':
        commit = 1;
        break;
      case 'h':
      default:
        usage(argv[0]);
        break;
    }
  }
  cout << "Database: " << database << endl;
  cout << "Username: " << userid << endl;
  cout << "Password: " << password << endl;
  cout << "Loops: " << loops << endl;
  cout << "Contexts: " << contexts << endl;
  cout << "Verbose: " << verbose << endl;
  cout << "Commit: " << commit << endl;
  contexts_free = contexts;

  ctxlist = new context[contexts];
  check_expected(ctxlist != NULL);

  thd = new pthread_t[contexts];
  check_expected(thd != NULL);

#ifndef USE_UI_THREADS
  rc = pthread_attr_init(&attr);
  check_expected(rc == 0);

  rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  check_expected(rc == 0);

#if (defined(DB2DYNIX)) || (defined(DB2HP))
  rc = pthread_attr_setstacksize(&attr, DEFAULT_STACK_SIZE);
#endif

#ifdef _POSIX_THREAD_PRIORITY_SCHEDULING

#if (defined(DB2IRIX))
  rc = pthread_attr_setscope(&attr, PTHREAD_SCOPE_PROCESS);
#else
  rc = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
#endif
  check_expected(rc == 0);
#endif
#endif

  sqleSetTypeCtx(SQL_CTX_MULTI_MANUAL);

  if (verbose)
  {
    pthread_mutex_lock (&output);
    cout << "creating context pool of size " << contexts << endl;
    pthread_mutex_unlock (&output);
  }

  for (i = 0; i < contexts; i++)
  {
    rc = sqleBeginCtx(&ctxlist[i].ctx, SQL_CTX_CREATE_ONLY, NULL, &sqlca);
    check_expected(rc == 0 && sqlca.sqlcode == 0);
    ctxlist[i].free = 1;
  }

  rc = pthread_mutex_init(&cond_m, NULL);
  check_expected(rc == 0);

  rc = pthread_cond_init(&cond, NULL);
  check_expected(rc == 0);

  return;
} //initialize

// Print a friendly usage message.
void usage(char *argv0)
{
  char *program = strrchr(argv0, '/');

  if (!program)
  {
    program = argv0;
  }

  cerr << "usage: " << program << endl
       << "       [-d database] [-u userid] [-p password]" << endl
       << "       [-l loops] [-c contexts] [-q] [-C] [-h]" << endl
       << endl
       << " -d\t alternate sample database or database alias." << endl
       << " -u\t user id." << endl
       << " -p\t password." << endl
       << " -l\t number of loops." << endl
       << " -c\t size of context pool to use." << endl
       << " -q\t quiet mode." << endl
       << " -C\t commit changes made." << endl
       << " -h\t print this message." << endl;

  exit(1);
} //usage

// Construct a "random" SQL statement to execute in a connection to an
// arbitrary database.
// Note that the exclusive use of the SAMPLE database here is not a db2
// restriction, but is a convienience to simplify this program.
void generate_work()
{
  int i, empno;
  unsigned int seed = getpid();
  struct work *work_item;
  char buf[256];

  // The employee numbers are in the 10-350 range and are multiples of
  // * 10.
  char *delete_str1 = "DELETE FROM STAFF WHERE ID=%i";
  char *delete_str2 = "DELETE FROM EMPLOYEE WHERE EMPNO='%06i'";

  // Generate work to be done in each thread.
  for (i = 0; i < loops; i++)
  {
    work_item = new work;

    strcpy(work_item->database, database);
    strcpy(work_item->userid, userid);
    strcpy(work_item->password, password);

    srand(seed);
    empno = ((rand() % 1000) + 1) * 10;
    sprintf(buf, i % 2 ? delete_str1 : delete_str2, empno);

    work_item->command = strdup(buf);

    dispatch(work_item);
  }

  return;
} //generate_work

// The current thread will be suspended until the required resources
// are available (ie: a context is free).  At this point a thread is created
// to execute the specified SQL statement.
void dispatch(struct work *work_item)
{
  int rc, ctx;

  rc = pthread_mutex_lock(&cond_m);
  check_expected(rc == 0);
  while (!contexts_free)
  {
    rc = pthread_cond_wait(&cond, &cond_m);
    check_expected(rc == 0);
  }

  // there is at least one free context at this point, find one
  for (ctx = 0; ctx < contexts; ctx++)
  {
    if (ctxlist[ctx].free)
    {
      break;
    }
  }
  ctxlist[ctx].free = 0;
  contexts_free--;

  rc = pthread_mutex_unlock(&cond_m);
  check_expected(rc == 0);

  work_item->context = ctx;

  if (verbose)
  {
    pthread_mutex_lock (&output);
    cout << "creating thread on context " << ctx << " for SQL statement:"
         << "\n\t\"" << work_item->command << "\"" << endl;
    pthread_mutex_unlock (&output);
  }
#ifdef USE_UI_THREADS
  rc = thr_create(NULL,
                  DEFAULT_STACK_SIZE,
                  do_work,
                  (void *)work_item,
                  THR_BOUND | THR_DETACHED,
                  &thd[ctx]);
#else
  rc = pthread_create(&thd[ctx], &attr, do_work, (void *)work_item);
#endif
  check_expected(rc == 0);

  return;
} //dispatch

// Execute the SQL statement.  This is the "main" routine for each of the
// worker threads.
//
// A context will be attached to for the connection, a connection will be done,
// and a simple SQL statement will be prepared and executed.
//
// After this, or in the event of non-terminal error, the context will be
// detached if an attachment has occurred, and any further resource
// deallocation will occur.
//
// Before termination a condition will be signalled to wake up dispatch if
// no contexts had been available.
void *do_work(void *args)
{
  EXEC SQL BEGIN DECLARE SECTION;
    char dbname[15];
    char user[15];
    char pswd[15];
    char statement[256];
  EXEC SQL END DECLARE SECTION;

  int rc, status = 0;
  struct sqlca sqlca;
  struct work *work = (struct work *)args;

  strcpy(dbname, work->database);
  strcpy(user, work->userid);
  strcpy(pswd, work->password);

  if (verbose)
  {
    pthread_mutex_lock (&output);
    cout << work->context << ": sqleAttachToCtx" << endl;
    pthread_mutex_unlock (&output);
  }
  rc = sqleAttachToCtx(ctxlist[work->context].ctx, NULL, &sqlca);
  check_expected(rc == 0 && sqlca.sqlcode == 0);

  if (verbose)
  {
    pthread_mutex_lock (&output);
    cout << work->context << ": CONNECT TO " << dbname << endl;
    pthread_mutex_unlock (&output);
  }

  if (strlen(user) == 0)
  {
    EXEC SQL CONNECT TO :dbname;
  }
  else
  {
    EXEC SQL CONNECT TO :dbname USER :user USING :pswd;
  }
  CHECKERR(work->context, "CONNECT TO DATABASE", &status);
  if (sqlca.sqlcode != 0)
  {
    clean_up(work, 0, &status);
  }
  else
  {
    strcpy(statement, work->command);

    if (verbose)
    {
      pthread_mutex_lock (&output);
      cout << work->context << ": EXECUTE \"" << statement << "\"" << endl;
      pthread_mutex_unlock (&output);
    }

    EXEC SQL EXECUTE IMMEDIATE :statement;
    CHECKERR(work->context, "EXECUTE IMMEDIATE", &status);

    clean_up(work, 1, &status);
  }

  return (void *)status; // this could be obtained with a pthread_join
                         // if the thread was created undetached
} //do_work

void clean_up(struct work *work, int connect_done, int *pStatus)
{
  int rc;
  struct sqlca sqlca;

  if (connect_done)
  {
    if (commit)
    {
      if (verbose)
      {
        pthread_mutex_lock (&output);
        cout << work->context << ": COMMIT" << endl;
        pthread_mutex_unlock (&output);
      }

      EXEC SQL COMMIT;
      CHECKERR(work->context, "COMMIT", pStatus);
    }
    else
    {
      if (verbose)
      {
        pthread_mutex_lock (&output);
        cout << work->context << ": ROLLBACK" << endl;
        pthread_mutex_unlock (&output);
      }

      EXEC SQL ROLLBACK;
      CHECKERR(work->context, "ROLLBACK", pStatus);
    }

    if (verbose)
    {
      pthread_mutex_lock (&output);
      cout << work->context << ": CONNECT RESET" << endl;
      pthread_mutex_unlock (&output);
    }

    EXEC SQL CONNECT RESET;
  CHECKERR(work->context, "CONNECT RESET", pStatus)}

  if (verbose)
  {
    pthread_mutex_lock (&output);
    cout << work->context << ": sqleDetachFromCtx" << endl;
    pthread_mutex_unlock (&output);
  }
  rc = sqleDetachFromCtx(ctxlist[work->context].ctx, NULL, &sqlca);
  check_expected(rc == 0 && sqlca.sqlcode == 0);

  rc = pthread_mutex_lock(&cond_m);
  check_expected(rc == 0);

  if (verbose)
  {
    pthread_mutex_lock (&output);
    cout << work->context << ": marking context free" << endl;
    pthread_mutex_unlock (&output);
  }
  ctxlist[work->context].free = 1;
  contexts_free++;

  rc = pthread_cond_signal(&cond);
  check_expected(rc == 0);
  rc = pthread_mutex_unlock(&cond_m);
  check_expected(rc == 0);

  delete work->command;
  delete work;

  return;
} //clean_up

// This procedure checks the SQLCACODE flag and prints out any
// information that is available related to the specific error.
int check_error(char eString[], struct sqlca *caPointer)
{
  char eBuffer[1024];
  char sBuffer[1024];
  char message[1024];
  char messToken[1024];
  short rc, Erc;
  int status = 0;

  if (caPointer->sqlcode != 0 && caPointer->sqlcode != 100
        && caPointer->sqlcode != -438 && caPointer->sqlcode != -532)
  {
    strcpy(message, "");

    sprintf(messToken, "--- error report ---\n");
    strcat(message, messToken);

    sprintf(messToken, "ERROR occurred : %s.\nSQLCODE : %ld\n",
            eString, caPointer->sqlcode);
    strcat(message, messToken);

    // GET SQLSTATE MESSAGE
    rc = sqlogstt(sBuffer, 1024, 80, caPointer->sqlstate);

    // GET ERROR MESSAGE API called
    Erc = sqlaintp(eBuffer, 1024, 80, caPointer);

    // return code is the length of the eBuffer string
    if (Erc > 0)
    {
      sprintf(messToken, "%s", eBuffer);
      strcat(message, messToken);
    }

    if (caPointer->sqlcode < 0)
    {
      if (rc == 0)
      {
        sprintf(messToken, "\n%s", sBuffer);
        strcat(message, messToken);
      }
      sprintf(messToken, "--- end error report ---\n");
      strcat(message, messToken);

      pthread_mutex_lock (&output);
      cout << message;
      pthread_mutex_unlock (&output);

      return 1;
    }
    else
    {
      // errorCode is just a Warning message
      if (rc == 0)
      {
        sprintf(messToken, "\n%s", sBuffer);
        strcat(message, messToken);
      }
      sprintf(messToken, "--- end error report ---\n");
      strcat(message, messToken);

      sprintf(messToken, "WARNING - CONTINUING PROGRAM WITH WARNINGS!\n");
      strcat(message, messToken);

      pthread_mutex_lock (&output);
      cout << message;
      pthread_mutex_unlock (&output);

      return 0;
    }
  }
  return 0;
} //check_error