/**************************************************************************** ** (c) Copyright IBM Corp. 2007 All rights reserved. ** ** The following sample of source code ("Sample") is owned by International ** Business Machines Corporation or one of its subsidiaries ("IBM") and is ** copyrighted and licensed, not sold. You may use, copy, modify, and ** distribute the Sample in any form without payment to IBM, for the purpose of ** assisting you in the development of your applications. ** ** The Sample code is provided to you on an "AS IS" basis, without warranty of ** any kind. IBM HEREBY EXPRESSLY DISCLAIMS ALL WARRANTIES, EITHER EXPRESS OR ** IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF ** MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. Some jurisdictions do ** not allow for the exclusion or limitation of implied warranties, so the above ** limitations or exclusions may not apply to you. IBM shall not be liable for ** any damages you suffer as a result of using, copying, modifying or ** distributing the Sample, even if IBM has been advised of the possibility of ** such damages. ***************************************************************************** ** ** SOURCE FILE NAME: dbthrds.sqc ** ** SAMPLE: How to use multiple context APIs on UNIX ** ** This program uses the POSIX threads APIs for thread creation and ** management. On Solaris systems it is also possible to use the ** Solaris thread APIs such as thd_create. ** ** The program maintains a pool of contexts. A generate_work ** function is executed from main(), and creates dynamic SQL ** statements that are executed by worker threads. When a ** context becomes available, a thread is created and dispatched ** to do the specified work. ** ** The work generated consists of statements to delete entries ** from either the STAFF or EMPLOYEE tables of the SAMPLE database. ** ** Compile and link with C compiler options for multi-threaded ** applications supported by your platform. ** ** Note: ** On some environments, the output may appear garbled because ** one thread process outputs information at the same time as ** another process, thereby overwriting output strings. If this ** is a concern, you can add a locking mechanism for the output ** so only one process outputs at any one time. ** ** SQL STATEMENTS USED: ** CONNECT ** EXECUTE IMMEDIATE ** ** ***************************************************************************** ** ** For more information on the sample programs, see the README file. ** ** For information on developing embedded SQL applications see the Developing Embedded SQL Applications book. ** ** For information on using SQL statements, see the SQL Reference. ** ** For information on DB2 APIs, see the Administrative API Reference. ** ** For the latest information on programming, building, and running DB2 ** applications, visit the DB2 Information Center: ** http://publib.boulder.ibm.com/infocenter/db2luw/v9r7/index.jsp ****************************************************************************/ #ifdef USE_UI_THREADS /* Sun has "Unix International" threads APIs */ #include <thread.h> #include <synch.h> #else #include <pthread.h> #endif #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <sql.h> #ifdef USE_UI_THREADS /* Hide the differences in the threads implementations */ #define pthread_exit(x) thr_exit(x) #define pthread_mutex_lock(x) mutex_lock(x) #define pthread_mutex_unlock(x) mutex_unlock(x) #define pthread_mutex_init(x,y) mutex_init(x, USYNC_THREAD, y) #define pthread_cond_init(x,y) cond_init(x, USYNC_THREAD, y) #define pthread_cond_wait(x,y) cond_wait(x,y) #define pthread_cond_signal(x) cond_signal(x) #define pthread_mutex_t mutex_t #define pthread_cond_t cond_t #define pthread_t thread_t #endif #if (defined(DB2HP)) #define DEFAULT_STACK_SIZE 0x20000 #else #define DEFAULT_STACK_SIZE 0 #endif /* Generated work creates the following type of structure which is passed to each worker thread. */ struct work { char database[15]; /* database for thread to connect to */ char userid[15]; char password[15]; char *command; /* dynamic SQL statement to execute */ int context; /* context to use for connection */ }; /* The context pool consists of an array of 'struct context' types. */ struct context { void *ctx; int free; }; /* Global variables. */ int contexts = 8; /* size of context pool */ struct context *ctxlist; #ifndef USE_UI_THREADS pthread_attr_t attr; /* global thread attributes */ #endif pthread_t *thd; /* array of thread ids */ int loops = 15; /* amount of work for the client to create */ int commit = 0; /* commit the work done */ int verbose = 1; char database[15]; char userid[15]; char password[15]; /* for management of the context pool */ int contexts_free; pthread_cond_t cond; pthread_mutex_t cond_m; /* Prototypes. */ void initialize(int argc, char *argv[]); void usage(char *argv0); void generate_work(void); void dispatch(struct work *work_item); void *do_work(void *args); /* each thread executes this function */ void clean_up(struct work *work_item, int connect_done, int *pStatus); #define check_expected(condition) \ { \ if (!(condition)) \ { \ fprintf(stderr, "%s:%i unexpected error: \"%s\" was false\n", \ __FILE__, __LINE__, #condition); \ exit(1); \ } \ } #define CHECKERR(context, CE_STR, pStatus) \ { \ char buf[256]; \ sprintf(buf, "Context nb.: %i\n%s", context, CE_STR); \ if (check_error(buf, &sqlca) != 0) \ { \ *(pStatus) = sqlca.sqlcode; \ } \ } int check_error(char eString[], struct sqlca *caPointer); int main(int argc, char *argv[]) { printf("\nHow to use multiple context APIs\n"); printf("\n"); initialize(argc, argv); generate_work(); if (verbose) { printf("all workers started, exiting main\n"); } pthread_exit(0); return 0; } /* main */ /* Initialize any global program state. This includes the attributes used for each thread creation, the setting of the multi-manual context type and the creation of the context pool. */ void initialize(int argc, char *argv[]) { int c, i, rc; struct sqlca sqlca; strcpy(database, "sample"); strcpy(userid, ""); strcpy(password, ""); /* read any command line options */ while ((c = getopt(argc, argv, "d:u:p:l:c:qCh")) != EOF) { switch (c) { case 'd': strcpy(database, optarg); break; case 'u': strcpy(userid, optarg); break; case 'p': strcpy(password, optarg); break; case 'l': loops = atoi(optarg); break; case 'c': contexts = atoi(optarg); break; case 'q': verbose = 0; break; case 'C': commit = 1; break; case 'h': default: usage(argv[0]); break; } } printf("Database: %s\n", database); printf("Username: %s\n", userid); printf("Password: %s\n", password); printf("Loops: %i\n", loops); printf("Contexts: %i\n", contexts); printf("Verbose: %i\n", verbose); printf("Commit: %i\n", commit); contexts_free = contexts; ctxlist = (struct context *)malloc(contexts * sizeof(struct context)); check_expected(ctxlist != NULL); thd = (pthread_t *)malloc(contexts * sizeof(pthread_t)); check_expected(thd != NULL); #ifndef USE_UI_THREADS rc = pthread_attr_init(&attr); check_expected(rc == 0); rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); check_expected(rc == 0); #if (defined(DB2HP)) rc = pthread_attr_setstacksize(&attr, DEFAULT_STACK_SIZE); #endif #ifdef _POSIX_THREAD_PRIORITY_SCHEDULING #if (defined(DB2IRIX)) rc = pthread_attr_setscope(&attr, PTHREAD_SCOPE_PROCESS); #else rc = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); #endif check_expected(rc == 0); #endif #endif sqleSetTypeCtx(SQL_CTX_MULTI_MANUAL); if (verbose) { printf("creating context pool of size %i\n", contexts); } for (i = 0; i < contexts; i++) { rc = sqleBeginCtx(&ctxlist[i].ctx, SQL_CTX_CREATE_ONLY, NULL, &sqlca); check_expected(rc == 0 && sqlca.sqlcode == 0); ctxlist[i].free = 1; } rc = pthread_mutex_init(&cond_m, NULL); check_expected(rc == 0); rc = pthread_cond_init(&cond, NULL); check_expected(rc == 0); return; } /* initialize */ /* Print a friendly usage message. */ void usage(char *argv0) { char *program = strrchr(argv0, '/'); if (!program) { program = argv0; } fprintf(stderr, "usage: %s \n" " [-d database] [-u userid] [-p password]\n" " [-l loops] [-c contexts] [-q] [-C] [-h]\n\n" " -d\t alternate sample database or database alias.\n" " -u\t user id.\n" " -p\t password.\n" " -l\t number of loops.\n" " -c\t size of context pool to use.\n" " -q\t quiet mode.\n" " -C\t commit changes made.\n" " -h\t print this message.\n", program); exit(1); } /* usage */ /* Construct a "random" SQL statement to execute in a connection to an arbitrary database. Note that the exclusive use of the SAMPLE database here is not a db2 restriction, but is a convienience to simplify this program. */ void generate_work(void) { int i, empno; unsigned int seed = getpid(); struct work *work_item; char buf[256]; /* The employee numbers are in the 10-350 range and are multiples of * 10. */ char *delete_str1 = "DELETE FROM STAFF WHERE ID=%i"; char *delete_str2 = "DELETE FROM EMPLOYEE WHERE EMPNO='%06i'"; /* Generate work to be done in each thread. */ for (i = 0; i < loops; i++) { work_item = (struct work *)malloc(sizeof(struct work)); strcpy(work_item->database, database); strcpy(work_item->userid, userid); strcpy(work_item->password, password); empno = ((rand_r(&seed) % 1000) + 1) * 10; sprintf(buf, i % 2 ? delete_str1 : delete_str2, empno); work_item->command = strdup(buf); dispatch(work_item); } return; } /* generate_work */ /* The current thread will be suspended until the required resources are available (ie: a context is free). At this point a thread is created to execute the specified SQL statement. */ void dispatch(struct work *work_item) { int rc, ctx; rc = pthread_mutex_lock(&cond_m); check_expected(rc == 0); while (!contexts_free) { rc = pthread_cond_wait(&cond, &cond_m); check_expected(rc == 0); } /* there is at least one free context at this point, find one */ for (ctx = 0; ctx < contexts; ctx++) { if (ctxlist[ctx].free) { break; } } ctxlist[ctx].free = 0; contexts_free--; rc = pthread_mutex_unlock(&cond_m); check_expected(rc == 0); work_item->context = ctx; if (verbose) { printf("creating thread on context %i for SQL statement:\n" "\t\"%s\"\n", ctx, work_item->command); } #ifdef USE_UI_THREADS rc = thr_create(NULL, DEFAULT_STACK_SIZE, do_work, (void *)work_item, THR_BOUND | THR_DETACHED, &thd[ctx]); #else rc = pthread_create(&thd[ctx], &attr, do_work, (void *)work_item); #endif check_expected(rc == 0); return; } /* dispatch */ /* * Execute the SQL statement. This is the "main" routine for each of the * worker threads. * * A context will be attached to for the connection, a connection will * be done, and a simple SQL statement will be prepared and executed. * * After this, or in the event of non-terminal error, the context will be * detached if an attachment has occurred, and any further resource * deallocation will occur. * * Before termination a condition will be signalled to wake up dispatch if * no contexts had been available. */ void *do_work(void *args) { int rc, status = 0; struct sqlca sqlca; struct work *work = (struct work *)args; EXEC SQL BEGIN DECLARE SECTION; char dbname[15]; char user[15]; char pswd[15]; char statement[256]; EXEC SQL END DECLARE SECTION; strcpy(dbname, work->database); strcpy(user, work->userid); strcpy(pswd, work->password); if (verbose) { printf("%i: sqleAttachToCtx\n", work->context); } rc = sqleAttachToCtx(ctxlist[work->context].ctx, NULL, &sqlca); check_expected(rc == 0 && sqlca.sqlcode == 0); if (verbose) { printf("%i: CONNECT TO %s\n", work->context, dbname); } if (strlen(user) == 0) { EXEC SQL CONNECT TO :dbname; } else { EXEC SQL CONNECT TO :dbname USER :user USING :pswd; } CHECKERR(work->context, "CONNECT TO DATABASE", &status); if (sqlca.sqlcode != 0) { clean_up(work, 0, &status); } else { strcpy(statement, work->command); if (verbose) { printf("%i: EXECUTE \"%s\"\n", work->context, statement); } EXEC SQL EXECUTE IMMEDIATE :statement; CHECKERR(work->context, "EXECUTE IMMEDIATE", &status); clean_up(work, 1, &status); } return (void *)status; /* this could be obtained with a pthread_join if the thread was created undetached */ } /* do_work */ void clean_up(struct work *work, int connect_done, int *pStatus) { int rc; struct sqlca sqlca; if (connect_done) { if (commit) { if (verbose) { printf("%i: COMMIT\n", work->context); } EXEC SQL COMMIT; CHECKERR(work->context, "COMMIT", pStatus); } else { if (verbose) { printf("%i: ROLLBACK\n", work->context); } EXEC SQL ROLLBACK; CHECKERR(work->context, "ROLLBACK", pStatus); } if (verbose) { printf("%i: CONNECT RESET\n", work->context); } EXEC SQL CONNECT RESET; CHECKERR(work->context, "CONNECT RESET", pStatus); } if (verbose) { printf("%i: sqleDetachFromCtx\n", work->context); } rc = sqleDetachFromCtx(ctxlist[work->context].ctx, NULL, &sqlca); check_expected(rc == 0 && sqlca.sqlcode == 0); rc = pthread_mutex_lock(&cond_m); check_expected(rc == 0); if (verbose) { printf("%i: marking context free\n", work->context); } ctxlist[work->context].free = 1; contexts_free++; rc = pthread_cond_signal(&cond); check_expected(rc == 0); rc = pthread_mutex_unlock(&cond_m); check_expected(rc == 0); free(work->command); free(work); return; } /* clean_up */ /* This procedure checks the SQLCACODE flag and prints out any information that is available related to the specific error. */ int check_error(char eString[], struct sqlca *caPointer) { char eBuffer[1024]; char sBuffer[1024]; char message[1024]; char messToken[1024]; short rc, Erc; int status = 0; if (caPointer->sqlcode != 0 && caPointer->sqlcode != 100 && caPointer->sqlcode != -438 && caPointer->sqlcode != -532) { strcpy(message, ""); sprintf(messToken, "--- error report ---\n"); strcat(message, messToken); sprintf(messToken, "ERROR occurred: %s.\nSQLCODE: %ld\n", eString, caPointer->sqlcode); strcat(message, messToken); /**********************\ * GET SQLSTATE MESSAGE * \**********************/ rc = sqlogstt(sBuffer, 1024, 80, caPointer->sqlstate); /******************************\ * GET ERROR MESSAGE API called * \******************************/ Erc = sqlaintp(eBuffer, 1024, 80, caPointer); /* return code is the length of the eBuffer string */ if (Erc > 0) { sprintf(messToken, "%s", eBuffer); strcat(message, messToken); } if (caPointer->sqlcode < 0) { if (rc == 0) { sprintf(messToken, "\n%s", sBuffer); strcat(message, messToken); } sprintf(messToken, "--- end error report ---\n"); strcat(message, messToken); printf("%s", message); return 1; } else { /* errorCode is just a Warning message */ if (rc == 0) { sprintf(messToken, "\n%s", sBuffer); strcat(message, messToken); } sprintf(messToken, "--- end error report ---\n"); strcat(message, messToken); sprintf(messToken, "WARNING - CONTINUING PROGRAM WITH WARNINGS!\n"); strcat(message, messToken); printf("%s", message); return 0; } /* endif */ } /* endif */ return 0; } /* check_error */