/****************************************************************************
** (c) Copyright IBM Corp. 2007 All rights reserved.
**
** The following sample of source code ("Sample") is owned by International
** Business Machines Corporation or one of its subsidiaries ("IBM") and is
** copyrighted and licensed, not sold. You may use, copy, modify, and
** distribute the Sample in any form without payment to IBM, for the purpose of
** assisting you in the development of your applications.
**
** The Sample code is provided to you on an "AS IS" basis, without warranty of
** any kind. IBM HEREBY EXPRESSLY DISCLAIMS ALL WARRANTIES, EITHER EXPRESS OR
** IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
** MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. Some jurisdictions do
** not allow for the exclusion or limitation of implied warranties, so the above
** limitations or exclusions may not apply to you. IBM shall not be liable for
** any damages you suffer as a result of using, copying, modifying or
** distributing the Sample, even if IBM has been advised of the possibility of
** such damages.
*****************************************************************************
**
** SOURCE FILE NAME: dbthrds.sqc
**
** SAMPLE: How to use multiple context APIs on UNIX
**
** This program uses the POSIX threads APIs for thread creation and
** management. On Solaris systems it is also possible to use the
** Solaris thread APIs such as thd_create.
**
** The program maintains a pool of contexts. A generate_work
** function is executed from main(), and creates dynamic SQL
** statements that are executed by worker threads. When a
** context becomes available, a thread is created and dispatched
** to do the specified work.
**
** The work generated consists of statements to delete entries
** from either the STAFF or EMPLOYEE tables of the SAMPLE database.
**
** Compile and link with C compiler options for multi-threaded
** applications supported by your platform.
**
** Note:
** On some environments, the output may appear garbled because
** one thread process outputs information at the same time as
** another process, thereby overwriting output strings. If this
** is a concern, you can add a locking mechanism for the output
** so only one process outputs at any one time.
**
** SQL STATEMENTS USED:
** CONNECT
** EXECUTE IMMEDIATE
**
**
*****************************************************************************
**
** For more information on the sample programs, see the README file.
**
** For information on developing embedded SQL applications see the Developing Embedded SQL Applications book.
**
** For information on using SQL statements, see the SQL Reference.
**
** For information on DB2 APIs, see the Administrative API Reference.
**
** For the latest information on programming, building, and running DB2
** applications, visit the DB2 Information Center:
** http://publib.boulder.ibm.com/infocenter/db2luw/v9r7/index.jsp
****************************************************************************/
#ifdef USE_UI_THREADS /* Sun has "Unix International" threads APIs */
#include <thread.h>
#include <synch.h>
#else
#include <pthread.h>
#endif
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <sql.h>
#ifdef USE_UI_THREADS
/* Hide the differences in the threads implementations */
#define pthread_exit(x) thr_exit(x)
#define pthread_mutex_lock(x) mutex_lock(x)
#define pthread_mutex_unlock(x) mutex_unlock(x)
#define pthread_mutex_init(x,y) mutex_init(x, USYNC_THREAD, y)
#define pthread_cond_init(x,y) cond_init(x, USYNC_THREAD, y)
#define pthread_cond_wait(x,y) cond_wait(x,y)
#define pthread_cond_signal(x) cond_signal(x)
#define pthread_mutex_t mutex_t
#define pthread_cond_t cond_t
#define pthread_t thread_t
#endif
#if (defined(DB2HP))
#define DEFAULT_STACK_SIZE 0x20000
#else
#define DEFAULT_STACK_SIZE 0
#endif
/* Generated work creates the following type of structure
which is passed to each worker thread. */
struct work
{
char database[15]; /* database for thread to connect to */
char userid[15];
char password[15];
char *command; /* dynamic SQL statement to execute */
int context; /* context to use for connection */
};
/* The context pool consists of an array of 'struct context' types. */
struct context
{
void *ctx;
int free;
};
/* Global variables. */
int contexts = 8; /* size of context pool */
struct context *ctxlist;
#ifndef USE_UI_THREADS
pthread_attr_t attr; /* global thread attributes */
#endif
pthread_t *thd; /* array of thread ids */
int loops = 15; /* amount of work for the client to create */
int commit = 0; /* commit the work done */
int verbose = 1;
char database[15];
char userid[15];
char password[15];
/* for management of the context pool */
int contexts_free;
pthread_cond_t cond;
pthread_mutex_t cond_m;
/* Prototypes. */
void initialize(int argc, char *argv[]);
void usage(char *argv0);
void generate_work(void);
void dispatch(struct work *work_item);
void *do_work(void *args); /* each thread executes this function */
void clean_up(struct work *work_item, int connect_done, int *pStatus);
#define check_expected(condition) \
{ \
if (!(condition)) \
{ \
fprintf(stderr, "%s:%i unexpected error: \"%s\" was false\n", \
__FILE__, __LINE__, #condition); \
exit(1); \
} \
}
#define CHECKERR(context, CE_STR, pStatus) \
{ \
char buf[256]; \
sprintf(buf, "Context nb.: %i\n%s", context, CE_STR); \
if (check_error(buf, &sqlca) != 0) \
{ \
*(pStatus) = sqlca.sqlcode; \
} \
}
int check_error(char eString[], struct sqlca *caPointer);
int main(int argc, char *argv[])
{
printf("\nHow to use multiple context APIs\n");
printf("\n");
initialize(argc, argv);
generate_work();
if (verbose)
{
printf("all workers started, exiting main\n");
}
pthread_exit(0);
return 0;
} /* main */
/* Initialize any global program state. This includes the attributes
used for each thread creation, the setting of the multi-manual context
type and the creation of the context pool. */
void initialize(int argc, char *argv[])
{
int c, i, rc;
struct sqlca sqlca;
strcpy(database, "sample");
strcpy(userid, "");
strcpy(password, "");
/* read any command line options */
while ((c = getopt(argc, argv, "d:u:p:l:c:qCh")) != EOF)
{
switch (c)
{
case 'd':
strcpy(database, optarg);
break;
case 'u':
strcpy(userid, optarg);
break;
case 'p':
strcpy(password, optarg);
break;
case 'l':
loops = atoi(optarg);
break;
case 'c':
contexts = atoi(optarg);
break;
case 'q':
verbose = 0;
break;
case 'C':
commit = 1;
break;
case 'h':
default:
usage(argv[0]);
break;
}
}
printf("Database: %s\n", database);
printf("Username: %s\n", userid);
printf("Password: %s\n", password);
printf("Loops: %i\n", loops);
printf("Contexts: %i\n", contexts);
printf("Verbose: %i\n", verbose);
printf("Commit: %i\n", commit);
contexts_free = contexts;
ctxlist = (struct context *)malloc(contexts * sizeof(struct context));
check_expected(ctxlist != NULL);
thd = (pthread_t *)malloc(contexts * sizeof(pthread_t));
check_expected(thd != NULL);
#ifndef USE_UI_THREADS
rc = pthread_attr_init(&attr);
check_expected(rc == 0);
rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
check_expected(rc == 0);
#if (defined(DB2HP))
rc = pthread_attr_setstacksize(&attr, DEFAULT_STACK_SIZE);
#endif
#ifdef _POSIX_THREAD_PRIORITY_SCHEDULING
#if (defined(DB2IRIX))
rc = pthread_attr_setscope(&attr, PTHREAD_SCOPE_PROCESS);
#else
rc = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
#endif
check_expected(rc == 0);
#endif
#endif
sqleSetTypeCtx(SQL_CTX_MULTI_MANUAL);
if (verbose)
{
printf("creating context pool of size %i\n", contexts);
}
for (i = 0; i < contexts; i++)
{
rc = sqleBeginCtx(&ctxlist[i].ctx, SQL_CTX_CREATE_ONLY, NULL, &sqlca);
check_expected(rc == 0 && sqlca.sqlcode == 0);
ctxlist[i].free = 1;
}
rc = pthread_mutex_init(&cond_m, NULL);
check_expected(rc == 0);
rc = pthread_cond_init(&cond, NULL);
check_expected(rc == 0);
return;
} /* initialize */
/* Print a friendly usage message. */
void usage(char *argv0)
{
char *program = strrchr(argv0, '/');
if (!program)
{
program = argv0;
}
fprintf(stderr,
"usage: %s \n"
" [-d database] [-u userid] [-p password]\n"
" [-l loops] [-c contexts] [-q] [-C] [-h]\n\n"
" -d\t alternate sample database or database alias.\n"
" -u\t user id.\n"
" -p\t password.\n"
" -l\t number of loops.\n"
" -c\t size of context pool to use.\n"
" -q\t quiet mode.\n"
" -C\t commit changes made.\n"
" -h\t print this message.\n", program);
exit(1);
} /* usage */
/* Construct a "random" SQL statement to execute in a connection to an
arbitrary database.
Note that the exclusive use of the SAMPLE database here is not a db2
restriction, but is a convienience to simplify this program. */
void generate_work(void)
{
int i, empno;
unsigned int seed = getpid();
struct work *work_item;
char buf[256];
/* The employee numbers are in the 10-350 range and are multiples of
* 10.
*/
char *delete_str1 = "DELETE FROM STAFF WHERE ID=%i";
char *delete_str2 = "DELETE FROM EMPLOYEE WHERE EMPNO='%06i'";
/* Generate work to be done in each thread. */
for (i = 0; i < loops; i++)
{
work_item = (struct work *)malloc(sizeof(struct work));
strcpy(work_item->database, database);
strcpy(work_item->userid, userid);
strcpy(work_item->password, password);
empno = ((rand_r(&seed) % 1000) + 1) * 10;
sprintf(buf, i % 2 ? delete_str1 : delete_str2, empno);
work_item->command = strdup(buf);
dispatch(work_item);
}
return;
} /* generate_work */
/* The current thread will be suspended until the required resources
are available (ie: a context is free). At this point a thread is
created to execute the specified SQL statement. */
void dispatch(struct work *work_item)
{
int rc, ctx;
rc = pthread_mutex_lock(&cond_m);
check_expected(rc == 0);
while (!contexts_free)
{
rc = pthread_cond_wait(&cond, &cond_m);
check_expected(rc == 0);
}
/* there is at least one free context at this point, find one */
for (ctx = 0; ctx < contexts; ctx++)
{
if (ctxlist[ctx].free)
{
break;
}
}
ctxlist[ctx].free = 0;
contexts_free--;
rc = pthread_mutex_unlock(&cond_m);
check_expected(rc == 0);
work_item->context = ctx;
if (verbose)
{
printf("creating thread on context %i for SQL statement:\n"
"\t\"%s\"\n", ctx, work_item->command);
}
#ifdef USE_UI_THREADS
rc = thr_create(NULL,
DEFAULT_STACK_SIZE,
do_work,
(void *)work_item,
THR_BOUND | THR_DETACHED,
&thd[ctx]);
#else
rc = pthread_create(&thd[ctx], &attr, do_work, (void *)work_item);
#endif
check_expected(rc == 0);
return;
} /* dispatch */
/*
* Execute the SQL statement. This is the "main" routine for each of the
* worker threads.
*
* A context will be attached to for the connection, a connection will
* be done, and a simple SQL statement will be prepared and executed.
*
* After this, or in the event of non-terminal error, the context will be
* detached if an attachment has occurred, and any further resource
* deallocation will occur.
*
* Before termination a condition will be signalled to wake up dispatch if
* no contexts had been available.
*/
void *do_work(void *args)
{
int rc, status = 0;
struct sqlca sqlca;
struct work *work = (struct work *)args;
EXEC SQL BEGIN DECLARE SECTION;
char dbname[15];
char user[15];
char pswd[15];
char statement[256];
EXEC SQL END DECLARE SECTION;
strcpy(dbname, work->database);
strcpy(user, work->userid);
strcpy(pswd, work->password);
if (verbose)
{
printf("%i: sqleAttachToCtx\n", work->context);
}
rc = sqleAttachToCtx(ctxlist[work->context].ctx, NULL, &sqlca);
check_expected(rc == 0 && sqlca.sqlcode == 0);
if (verbose)
{
printf("%i: CONNECT TO %s\n", work->context, dbname);
}
if (strlen(user) == 0)
{
EXEC SQL CONNECT TO :dbname;
}
else
{
EXEC SQL CONNECT TO :dbname USER :user USING :pswd;
}
CHECKERR(work->context, "CONNECT TO DATABASE", &status);
if (sqlca.sqlcode != 0)
{
clean_up(work, 0, &status);
}
else
{
strcpy(statement, work->command);
if (verbose)
{
printf("%i: EXECUTE \"%s\"\n", work->context, statement);
}
EXEC SQL EXECUTE IMMEDIATE :statement;
CHECKERR(work->context, "EXECUTE IMMEDIATE", &status);
clean_up(work, 1, &status);
}
return (void *)status; /* this could be obtained with a pthread_join
if the thread was created undetached */
} /* do_work */
void clean_up(struct work *work, int connect_done, int *pStatus)
{
int rc;
struct sqlca sqlca;
if (connect_done)
{
if (commit)
{
if (verbose)
{
printf("%i: COMMIT\n", work->context);
}
EXEC SQL COMMIT;
CHECKERR(work->context, "COMMIT", pStatus);
}
else
{
if (verbose)
{
printf("%i: ROLLBACK\n", work->context);
}
EXEC SQL ROLLBACK;
CHECKERR(work->context, "ROLLBACK", pStatus);
}
if (verbose)
{
printf("%i: CONNECT RESET\n", work->context);
}
EXEC SQL CONNECT RESET;
CHECKERR(work->context, "CONNECT RESET", pStatus);
}
if (verbose)
{
printf("%i: sqleDetachFromCtx\n", work->context);
}
rc = sqleDetachFromCtx(ctxlist[work->context].ctx, NULL, &sqlca);
check_expected(rc == 0 && sqlca.sqlcode == 0);
rc = pthread_mutex_lock(&cond_m);
check_expected(rc == 0);
if (verbose)
{
printf("%i: marking context free\n", work->context);
}
ctxlist[work->context].free = 1;
contexts_free++;
rc = pthread_cond_signal(&cond);
check_expected(rc == 0);
rc = pthread_mutex_unlock(&cond_m);
check_expected(rc == 0);
free(work->command);
free(work);
return;
} /* clean_up */
/* This procedure checks the SQLCACODE flag and prints out any
information that is available related to the specific error. */
int check_error(char eString[], struct sqlca *caPointer)
{
char eBuffer[1024];
char sBuffer[1024];
char message[1024];
char messToken[1024];
short rc, Erc;
int status = 0;
if (caPointer->sqlcode != 0 && caPointer->sqlcode != 100
&& caPointer->sqlcode != -438 && caPointer->sqlcode != -532)
{
strcpy(message, "");
sprintf(messToken, "--- error report ---\n");
strcat(message, messToken);
sprintf(messToken, "ERROR occurred: %s.\nSQLCODE: %ld\n",
eString, caPointer->sqlcode);
strcat(message, messToken);
/**********************\
* GET SQLSTATE MESSAGE *
\**********************/
rc = sqlogstt(sBuffer, 1024, 80, caPointer->sqlstate);
/******************************\
* GET ERROR MESSAGE API called *
\******************************/
Erc = sqlaintp(eBuffer, 1024, 80, caPointer);
/* return code is the length of the eBuffer string */
if (Erc > 0)
{
sprintf(messToken, "%s", eBuffer);
strcat(message, messToken);
}
if (caPointer->sqlcode < 0)
{
if (rc == 0)
{
sprintf(messToken, "\n%s", sBuffer);
strcat(message, messToken);
}
sprintf(messToken, "--- end error report ---\n");
strcat(message, messToken);
printf("%s", message);
return 1;
}
else
{
/* errorCode is just a Warning message */
if (rc == 0)
{
sprintf(messToken, "\n%s", sBuffer);
strcat(message, messToken);
}
sprintf(messToken, "--- end error report ---\n");
strcat(message, messToken);
sprintf(messToken, "WARNING - CONTINUING PROGRAM WITH WARNINGS!\n");
strcat(message, messToken);
printf("%s", message);
return 0;
} /* endif */
} /* endif */
return 0;
} /* check_error */