/**************************************************************************** ** (c) Copyright IBM Corp. 2007 All rights reserved. ** ** The following sample of source code ("Sample") is owned by International ** Business Machines Corporation or one of its subsidiaries ("IBM") and is ** copyrighted and licensed, not sold. You may use, copy, modify, and ** distribute the Sample in any form without payment to IBM, for the purpose of ** assisting you in the development of your applications. ** ** The Sample code is provided to you on an "AS IS" basis, without warranty of ** any kind. IBM HEREBY EXPRESSLY DISCLAIMS ALL WARRANTIES, EITHER EXPRESS OR ** IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF ** MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. Some jurisdictions do ** not allow for the exclusion or limitation of implied warranties, so the above ** limitations or exclusions may not apply to you. IBM shall not be liable for ** any damages you suffer as a result of using, copying, modifying or ** distributing the Sample, even if IBM has been advised of the possibility of ** such damages. ***************************************************************************** ** ** SOURCE FILE NAME: dbthrds.sqC ** ** SAMPLE: How to use multiple context APIs on UNIX ** ** This program uses the POSIX threads APIs for thread creation and ** management. On Solaris systems it is also possible to use the ** Solaris thread APIs such as thd_create. ** ** The program maintains a pool of contexts. A generate_work ** function is executed from main(), and creates dynamic SQL ** statements that are executed by worker threads. When a ** context becomes available, a thread is created and dispatched ** to do the specified work. ** ** The work generated consists of statements to delete entries ** from either the STAFF or EMPLOYEE tables of the SAMPLE database. ** ** Compile and link with C compiler options for multi-threaded ** applications supported by your platform. ** ** Note: ** On some environments, the output may appear garbled because ** one thread process outputs information at the same time as ** another process, thereby overwriting output strings. If this ** is a concern, you can add a locking mechanism for the output ** so only one process outputs at any one time. ** ** SQL STATEMENTS USED: ** CONNECT ** EXECUTE IMMEDIATE ** ** ***************************************************************************** ** ** For more information on the sample programs, see the README file. ** ** For information on developing embedded SQL applications see the Developing Embedded SQL Applications book. ** ** For information on using SQL statements, see the SQL Reference. ** ** For information on DB2 APIs, see the Administrative API Reference. ** ** For the latest information on programming, building, and running DB2 ** applications, visit the DB2 Information Center: ** http://publib.boulder.ibm.com/infocenter/db2luw/v9r7/index.jsp ****************************************************************************/ #ifdef USE_UI_THREADS // Sun has "Unix International" threads APIs #include <thread.h> #include <synch.h> #else #include <pthread.h> #endif #include <sql.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <stdio.h> #if ((__cplusplus >= 199711L) && !defined DB2HP && !defined DB2AIX) || \ (DB2LINUX && (__LP64__ || (__GNUC__ >= 3)) ) #include <iostream> using namespace std; #else #include <iostream.h> #endif #ifdef USE_UI_THREADS // Hide the differences in the threads implementations #define pthread_exit(x) thr_exit(x) #define pthread_mutex_lock(x) mutex_lock(x) #define pthread_mutex_unlock(x) mutex_unlock(x) #define pthread_mutex_init(x,y) mutex_init(x, USYNC_THREAD, y) #define pthread_cond_init(x,y) cond_init(x, USYNC_THREAD, y) #define pthread_cond_wait(x,y) cond_wait(x,y) #define pthread_cond_signal(x) cond_signal(x) #define pthread_mutex_t mutex_t #define pthread_cond_t cond_t #define pthread_t thread_t #endif #if (defined(DB2HP)) #define DEFAULT_STACK_SIZE 0x20000 #else #define DEFAULT_STACK_SIZE 0 #endif #define check_expected(condition) \ { \ if (!(condition)) \ { \ cerr << __FILE__ << ":" << __LINE__ << " unexpected error: \"" \ << #condition << "\" was false" << endl; \ exit(1); \ } \ } #define CHECKERR(context, CE_STR, pStatus) \ { \ char buf[256]; \ sprintf(buf, "Context nb.: %i\n%s", context, CE_STR); \ if (check_error(buf, &sqlca) != 0) \ { \ *(pStatus) = sqlca.sqlcode; \ } \ } // Generate work creates the following type of structure which is passed // to each worker thread. struct work { char database[15]; // database for thread to connect to char userid[15]; char password[15]; char *command; // dynamic SQL statement to execute int context; // context to use for connection }; // The context pool consists of an array of 'struct context' types. struct context { void *ctx; int free; }; // Global variables. int contexts = 8; // size of context pool struct context *ctxlist; #ifndef USE_UI_THREADS pthread_attr_t attr; // global thread attributes #endif pthread_t *thd; // array of thread ids int loops = 15; // amount of work for the client to create int commit = 0; // commit the work done int verbose = 1; char database[15]; char userid[15]; char password[15]; // for management of the context pool int contexts_free; pthread_cond_t cond; pthread_mutex_t cond_m; // mutex for serializing cout pthread_mutex_t output; // Prototypes. void initialize(int argc, char *argv[]); void usage(char *argv0); void generate_work(); void dispatch(struct work *work_item); void *do_work(void *args); // each thread executes this function void clean_up(struct work *work_item, int connect_done, int *pStatus); int check_error(char eString[], struct sqlca *caPointer); int main(int argc, char *argv[]) { // Initialize the mutex for serializing cout pthread_mutex_init(&output, NULL); pthread_mutex_lock (&output); cout << "\nHow to use multiple context APIs" << endl << endl; pthread_mutex_unlock (&output); initialize(argc, argv); generate_work(); if (verbose) { pthread_mutex_lock (&output); cout << "all workers started, exiting main" << endl; pthread_mutex_unlock (&output); } pthread_exit(0); } //main // Initialize any global program state. This includes the attributes // used for each thread creation, the setting of the multi-manual context // type and the creation of the context pool. void initialize(int argc, char *argv[]) { int c, i, rc; struct sqlca sqlca; strcpy(database, "sample"); strcpy(userid, ""); strcpy(password, ""); // read any command line options while ((c = getopt(argc, argv, "d:u:p:l:c:qCh")) != EOF) { switch (c) { case 'd': strcpy(database, optarg); break; case 'u': strcpy(userid, optarg); break; case 'p': strcpy(password, optarg); break; case 'l': loops = atoi(optarg); break; case 'c': contexts = atoi(optarg); break; case 'q': verbose = 0; break; case 'C': commit = 1; break; case 'h': default: usage(argv[0]); break; } } cout << "Database: " << database << endl; cout << "Username: " << userid << endl; cout << "Password: " << password << endl; cout << "Loops: " << loops << endl; cout << "Contexts: " << contexts << endl; cout << "Verbose: " << verbose << endl; cout << "Commit: " << commit << endl; contexts_free = contexts; ctxlist = new context[contexts]; check_expected(ctxlist != NULL); thd = new pthread_t[contexts]; check_expected(thd != NULL); #ifndef USE_UI_THREADS rc = pthread_attr_init(&attr); check_expected(rc == 0); rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); check_expected(rc == 0); #if (defined(DB2DYNIX)) || (defined(DB2HP)) rc = pthread_attr_setstacksize(&attr, DEFAULT_STACK_SIZE); #endif #ifdef _POSIX_THREAD_PRIORITY_SCHEDULING #if (defined(DB2IRIX)) rc = pthread_attr_setscope(&attr, PTHREAD_SCOPE_PROCESS); #else rc = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); #endif check_expected(rc == 0); #endif #endif sqleSetTypeCtx(SQL_CTX_MULTI_MANUAL); if (verbose) { pthread_mutex_lock (&output); cout << "creating context pool of size " << contexts << endl; pthread_mutex_unlock (&output); } for (i = 0; i < contexts; i++) { rc = sqleBeginCtx(&ctxlist[i].ctx, SQL_CTX_CREATE_ONLY, NULL, &sqlca); check_expected(rc == 0 && sqlca.sqlcode == 0); ctxlist[i].free = 1; } rc = pthread_mutex_init(&cond_m, NULL); check_expected(rc == 0); rc = pthread_cond_init(&cond, NULL); check_expected(rc == 0); return; } //initialize // Print a friendly usage message. void usage(char *argv0) { char *program = strrchr(argv0, '/'); if (!program) { program = argv0; } cerr << "usage: " << program << endl << " [-d database] [-u userid] [-p password]" << endl << " [-l loops] [-c contexts] [-q] [-C] [-h]" << endl << endl << " -d\t alternate sample database or database alias." << endl << " -u\t user id." << endl << " -p\t password." << endl << " -l\t number of loops." << endl << " -c\t size of context pool to use." << endl << " -q\t quiet mode." << endl << " -C\t commit changes made." << endl << " -h\t print this message." << endl; exit(1); } //usage // Construct a "random" SQL statement to execute in a connection to an // arbitrary database. // Note that the exclusive use of the SAMPLE database here is not a db2 // restriction, but is a convienience to simplify this program. void generate_work() { int i, empno; unsigned int seed = getpid(); struct work *work_item; char buf[256]; // The employee numbers are in the 10-350 range and are multiples of // * 10. char *delete_str1 = "DELETE FROM STAFF WHERE ID=%i"; char *delete_str2 = "DELETE FROM EMPLOYEE WHERE EMPNO='%06i'"; // Generate work to be done in each thread. for (i = 0; i < loops; i++) { work_item = new work; strcpy(work_item->database, database); strcpy(work_item->userid, userid); strcpy(work_item->password, password); srand(seed); empno = ((rand() % 1000) + 1) * 10; sprintf(buf, i % 2 ? delete_str1 : delete_str2, empno); work_item->command = strdup(buf); dispatch(work_item); } return; } //generate_work // The current thread will be suspended until the required resources // are available (ie: a context is free). At this point a thread is created // to execute the specified SQL statement. void dispatch(struct work *work_item) { int rc, ctx; rc = pthread_mutex_lock(&cond_m); check_expected(rc == 0); while (!contexts_free) { rc = pthread_cond_wait(&cond, &cond_m); check_expected(rc == 0); } // there is at least one free context at this point, find one for (ctx = 0; ctx < contexts; ctx++) { if (ctxlist[ctx].free) { break; } } ctxlist[ctx].free = 0; contexts_free--; rc = pthread_mutex_unlock(&cond_m); check_expected(rc == 0); work_item->context = ctx; if (verbose) { pthread_mutex_lock (&output); cout << "creating thread on context " << ctx << " for SQL statement:" << "\n\t\"" << work_item->command << "\"" << endl; pthread_mutex_unlock (&output); } #ifdef USE_UI_THREADS rc = thr_create(NULL, DEFAULT_STACK_SIZE, do_work, (void *)work_item, THR_BOUND | THR_DETACHED, &thd[ctx]); #else rc = pthread_create(&thd[ctx], &attr, do_work, (void *)work_item); #endif check_expected(rc == 0); return; } //dispatch // Execute the SQL statement. This is the "main" routine for each of the // worker threads. // // A context will be attached to for the connection, a connection will be done, // and a simple SQL statement will be prepared and executed. // // After this, or in the event of non-terminal error, the context will be // detached if an attachment has occurred, and any further resource // deallocation will occur. // // Before termination a condition will be signalled to wake up dispatch if // no contexts had been available. void *do_work(void *args) { EXEC SQL BEGIN DECLARE SECTION; char dbname[15]; char user[15]; char pswd[15]; char statement[256]; EXEC SQL END DECLARE SECTION; int rc, status = 0; struct sqlca sqlca; struct work *work = (struct work *)args; strcpy(dbname, work->database); strcpy(user, work->userid); strcpy(pswd, work->password); if (verbose) { pthread_mutex_lock (&output); cout << work->context << ": sqleAttachToCtx" << endl; pthread_mutex_unlock (&output); } rc = sqleAttachToCtx(ctxlist[work->context].ctx, NULL, &sqlca); check_expected(rc == 0 && sqlca.sqlcode == 0); if (verbose) { pthread_mutex_lock (&output); cout << work->context << ": CONNECT TO " << dbname << endl; pthread_mutex_unlock (&output); } if (strlen(user) == 0) { EXEC SQL CONNECT TO :dbname; } else { EXEC SQL CONNECT TO :dbname USER :user USING :pswd; } CHECKERR(work->context, "CONNECT TO DATABASE", &status); if (sqlca.sqlcode != 0) { clean_up(work, 0, &status); } else { strcpy(statement, work->command); if (verbose) { pthread_mutex_lock (&output); cout << work->context << ": EXECUTE \"" << statement << "\"" << endl; pthread_mutex_unlock (&output); } EXEC SQL EXECUTE IMMEDIATE :statement; CHECKERR(work->context, "EXECUTE IMMEDIATE", &status); clean_up(work, 1, &status); } return (void *)status; // this could be obtained with a pthread_join // if the thread was created undetached } //do_work void clean_up(struct work *work, int connect_done, int *pStatus) { int rc; struct sqlca sqlca; if (connect_done) { if (commit) { if (verbose) { pthread_mutex_lock (&output); cout << work->context << ": COMMIT" << endl; pthread_mutex_unlock (&output); } EXEC SQL COMMIT; CHECKERR(work->context, "COMMIT", pStatus); } else { if (verbose) { pthread_mutex_lock (&output); cout << work->context << ": ROLLBACK" << endl; pthread_mutex_unlock (&output); } EXEC SQL ROLLBACK; CHECKERR(work->context, "ROLLBACK", pStatus); } if (verbose) { pthread_mutex_lock (&output); cout << work->context << ": CONNECT RESET" << endl; pthread_mutex_unlock (&output); } EXEC SQL CONNECT RESET; CHECKERR(work->context, "CONNECT RESET", pStatus)} if (verbose) { pthread_mutex_lock (&output); cout << work->context << ": sqleDetachFromCtx" << endl; pthread_mutex_unlock (&output); } rc = sqleDetachFromCtx(ctxlist[work->context].ctx, NULL, &sqlca); check_expected(rc == 0 && sqlca.sqlcode == 0); rc = pthread_mutex_lock(&cond_m); check_expected(rc == 0); if (verbose) { pthread_mutex_lock (&output); cout << work->context << ": marking context free" << endl; pthread_mutex_unlock (&output); } ctxlist[work->context].free = 1; contexts_free++; rc = pthread_cond_signal(&cond); check_expected(rc == 0); rc = pthread_mutex_unlock(&cond_m); check_expected(rc == 0); delete work->command; delete work; return; } //clean_up // This procedure checks the SQLCACODE flag and prints out any // information that is available related to the specific error. int check_error(char eString[], struct sqlca *caPointer) { char eBuffer[1024]; char sBuffer[1024]; char message[1024]; char messToken[1024]; short rc, Erc; int status = 0; if (caPointer->sqlcode != 0 && caPointer->sqlcode != 100 && caPointer->sqlcode != -438 && caPointer->sqlcode != -532) { strcpy(message, ""); sprintf(messToken, "--- error report ---\n"); strcat(message, messToken); sprintf(messToken, "ERROR occurred : %s.\nSQLCODE : %ld\n", eString, caPointer->sqlcode); strcat(message, messToken); // GET SQLSTATE MESSAGE rc = sqlogstt(sBuffer, 1024, 80, caPointer->sqlstate); // GET ERROR MESSAGE API called Erc = sqlaintp(eBuffer, 1024, 80, caPointer); // return code is the length of the eBuffer string if (Erc > 0) { sprintf(messToken, "%s", eBuffer); strcat(message, messToken); } if (caPointer->sqlcode < 0) { if (rc == 0) { sprintf(messToken, "\n%s", sBuffer); strcat(message, messToken); } sprintf(messToken, "--- end error report ---\n"); strcat(message, messToken); pthread_mutex_lock (&output); cout << message; pthread_mutex_unlock (&output); return 1; } else { // errorCode is just a Warning message if (rc == 0) { sprintf(messToken, "\n%s", sBuffer); strcat(message, messToken); } sprintf(messToken, "--- end error report ---\n"); strcat(message, messToken); sprintf(messToken, "WARNING - CONTINUING PROGRAM WITH WARNINGS!\n"); strcat(message, messToken); pthread_mutex_lock (&output); cout << message; pthread_mutex_unlock (&output); return 0; } } return 0; } //check_error