/****************************************************************************
** (c) Copyright IBM Corp. 2007 保留所有权利。
**
** 以下源代码示例("示例")属于国际
** 商业机器公司或其子公司之一(" IBM "),并且是
** 有版权和许可证,不得出售。 您可以使用、复制、修改和
** 在不向 IBM 付费的情况下,以任何形式分发样品,目的是
** 协助您开发应用程序。
**
** 示例代码是在 "原样 "的基础上提供给您的,不对以下方面作出保证
** 任何一种。 IBM 在此,本公司明确声明不作任何保证,无论是明示的还是
** 默示保证,包括但不限于以下默示保证
** 适销性和特定用途的适用性。 一些辖区确实
** 不允许排除或限制默示保证,因此上述
** 限制或排除条款可能不适用于您。 IBM SHALL NOT BE LIABLE FOR
** 您因使用、复制、修改或执行以下行为而遭受的任何损失
** 分发样品,即使 IBM 已被告知可能出现以下情况
** 此类损害赔偿。
*****************************************************************************
**
** 源文件名称: dbthrds.sqC
**
** 示例:如何在 UNIX 上使用多上下文 API
**
** 该程序使用 POSIX 线程应用程序接口创建和运行线程
** 管理。 在 Solaris 系统上,也可以使用
** Solaris 线程 API,如 thd_create。
**
** 程序会维护一个语境库。 A 生成工作
** 函数从 main() 开始执行,并创建动态 SQL
** 由工作线程执行的语句。 当
** 上下文可用,创建并调度线程
** 完成指定的工作。
**
** 生成的工作包括删除条目的语句
** 从 SAMPLE 数据库的 STAFF 表或 EMPLOYEE 表中选取。
**
** 使用多线程 C 编译器选项进行编译和链接
** 平台支持的应用程序。
**
** 注:
** 在某些环境下,输出可能会出现乱码,因为
** 一个线程进程在输出信息的同时
** 另一个进程,从而覆盖输出字符串。 如果这
** 您可以为输出添加一个锁定机制
** 因此,任何时候都只有一个进程输出。
**
** 使用的 sql 语句:
* * 连接
** 立即执行
**
**
*****************************************************************************
**
** 有关示例程序的更多信息,请参阅 README 文件。
**
** 有关开发嵌入式 SQL 应用程序的信息,请参阅《开发嵌入式 SQL 应用程序》一书。
**
** 有关使用 SQL 语句的信息,请参阅《SQL 参考》。
**
** 有关 DB2 API 的信息,请参阅《管理 API 参考》。
**
** 有关编程、构建和运行的最新信息 DB2
** 申请请访问 DB2 信息中心:
** http://publib.boulder.ibm.com/infocenter/db2luw/v9r7/index.jsp
****************************************************************************/
#ifdef USE_UI_THREADS
// Sun 拥有 "Unix 国际 "线程应用程序接口
#include < thread.h >
#include < synch.h >
#else
#include <pthread.h>
#endif
#include <sql.h>
#include < unistd.h >
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#if ((__cplusplus >= 199711L ) &&!定义了 DB2HP &&!定义了 DB2AIX )|| \
( DB2LINUX && ( __LP64__ || (__gnuc__ >= 3)) ) )
#include <iostream>
using namespace std;
#else
#include < iostream.h >
#endif
#ifdef USE_UI_THREADS
// 隐藏线程实现中的差异
#define pthread_exit(x) thr_exit(x)
#define pthread_mutex_lock(x) mutex_lock(x)
#define pthread_mutex_unlock(x) mutex_unlock(x)
#define pthread_mutex_init(x,y) mutex_init(x, USYNC_THREAD, y)
#define pthread_cond_init(x,y) cond_init(x, USYNC_THREAD, y)
#define pthread_cond_wait(x,y) cond_wait(x,y)
#define pthread_cond_signal(x) cond_signal(x)
#define pthread_mutex_t mutex_t
#define pthread_cond_t cond_t
#define pthread_t thread_t
#endif
#if (defined( DB2HP ))
#define DEFAULT_STACK_SIZE 0x20000
#else
#define DEFAULT_STACK_SIZE 0
#endif
#define check_expected(condition) \
{ \
if (!(condition)) \
{ \
cerr << __FILE__ << ":" << __LINE__ << " 意外错误:\""\
<< #condition << "\"是假的" << endl; \
exit(1); \
} \
}
#define CHECKERR(context, CE_STR, pStatus ) \
{ \
char buf[256]; \
sprintf(buf, "Context nb.: %i \n %s ", context, CE_STR); \
if (check_error(buf, &sqlca)!= 0)\\
{ \
*(pStatus) = sqlca.sqlcode; \
} \
}
// 生成工作会创建以下类型的结构,并将其传递给
// 给每个工作线程。
结构工作
{
char database[15]; // 供线程连接的数据库
char userid[15];
char password[15];
char *command; // 要执行的动态 SQL 语句
int context; // 用于连接的上下文
};
// 上下文池由 "struct context "类型数组组成。
结构上下文
{
void *ctx;
int free;
};
// 全局变量。
int contexts = 8; // 上下文池的大小
结构上下文 *ctxlist;
#ifndef USE_UI_THREADS
pthread_attr_t attr; // 全局线程属性
#endif
pthread_t *thd; // 线程 ID 数组
int loops = 15; // 客户端创建的工作量
int commit = 0; // 提交已完成的工作
int verbose = 1;
char database[15];
char userid[15];
char password[15];
// 用于管理上下文池
int contexts_free;
pthread_cond_t cond;
pthread_mutex_t cond_m;
// 用于序列化 cout 的互斥
pthread_mutex_t 输出;
// 原型。
void initialize(int argc, char *argv[]);
void usage(char *argv0 );
void generate_work();
void dispatch(struct work *work_item);
void *doo_work(void *args); // 每个线程都执行该函数
void clean_up(struct work *work_item, int connect_done, int *pStatus );
int check_error(char eString[ ], struct sqlca *caPointer );
int main(int argc, char *argv[])
{
// 初始化用于序列化 cout 的互斥器
pthread_mutex_init(&output, NULL);
pthread_mutex_lock (&output);
cout << ""\nHow to use multiple context APIs" << endl << endl;
pthread_mutex_unlock (&output);
初始化(argc,argv);
generate_work();
如果 (verbose)
{
pthread_mutex_lock (&output);
cout << "所有工作者都已启动,退出主系统" << endl;
pthread_mutex_unlock (&output);
}
pthread_exit(0);
}//main
// 初始化全局程序状态。 这包括以下属性
// 用于每个线程的创建、多手动上下文的设置
// 类型和创建上下文池。
void initialize(int argc, char *argv[])
{
int c, i, rc;
struct sqlca sqlca;
strcpy(database, "sample");
strcpy(userid,"");
strcpy(密码,"");
// 读取任何命令行选项
while ((c = getopt(argc, argv, "d:u:p:l:c:qCh" ))!= EOF)
{
开关
{
情况 'd':
strcpy(数据库,optarg);
break;
情况 'u':
strcpy(userid,optarg);
break;
案例 'p':
strcpy(password,optarg);
break;
情况 'l':
loops = atoi(optarg);
break;
case 'c':
contexts = atoi(optarg);
break;
案例 'q':
verbose = 0;
break;
case 'c':
提交 = 1;
break;
案例 'h':
缺省值:
usage(argv[0]);
break;
}
}
cout << "数据库:" << 数据库 << endl;
cout << "Username: " << userid << endl;
cout << "Password: " << password << endl;
cout << "Loops:" << 循环 << endl;
cout << "上下文:" << contexts << endl;
cout << "Verbose:" << verbose << endl;
cout << "提交:" << commit << endl;
contexts_free = contexts;
ctxlist = new context[contexts];
check_expected(ctxlist!= NULL);
thd = new pthread_t[contexts];
check_expected(thd!= NULL);
#ifndef USE_UI_THREADS
rc = pthread_attr_init(&attr);
check_expected(rc == 0);
rc = pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
check_expected(rc == 0);
#if (defined( DB2DYNIX )) || (defined( DB2HP ))
rc = pthread_attr_setstacksize(&attr,DEFAULT_STACK_SIZE);
#endif
#ifdef _POSIX_THREAD_PRIORITY_SCHEDULING
#if (defined( DB2IRIX ))
rc = pthread_attr_setscope(&attr,PTHREAD_SCOPE_PROCESS);
#else
rc = pthread_attr_setscope(&attr,PTHREAD_SCOPE_SYSTEM);
#endif
check_expected(rc == 0);
#endif
#endif
sqleSetTypeCtx(SQL_CTX_MULTI_MANUAL);
如果 (verbose)
{
pthread_mutex_lock (&output);
cout << "创建大小为 " << 上下文 << endl 的上下文池;
pthread_mutex_unlock (&output);
}
for (i = 0; i < contexts; i++)
{
rc = sqleBeginCtx(&ctxlist[i ].ctx, SQL_CTX_CREATE_ONLY, NULL, &sqlca);
check_expected(rc == 0 && sqlca.sqlcode == 0);
ctxlist[i].free = 1;
}
rc = pthread_mutex_init(&cond_m,NULL);
check_expected(rc == 0);
rc = pthread_cond_init(&cond,NULL);
check_expected(rc == 0);
return;
}//初始化
// 打印友好的使用信息。
void usage(char *argv0 )
{
char *program = strrchr( argv0, '/');
如果 (!program)
{
程序 = argv0;
}
cerr << "usage:" << 程序 << endl
<< " [[-d 数据库] [[-u 用户名] [[-p 密码]" << endl
<< " [[-l 循环] [[-c 上下文] [-q] [-C] [-h]"" << endl
<< endl
<< " -d \t alternate sample database or database alias." << endl
<< " -u \t 用户 ID." << endl
<< " -p \t password." << endl
<< " -l \t循环次数。" << endl
<< " -c \t 要使用的上下文池大小。" << endl
<< " -q \t quiet mode." << endl
<< " -C \t commit changes made." << endl
<< " -h \t print this message." << endl;
exit(1);
}//使用
// 构造一条 "随机 "SQL 语句,在连接到一个
// 任意数据库。
// 请注意,在这里独家使用 SAMPLE 数据库并不是 db2
// 限制,只是为了简化程序。
void generate_work()
{
int i, empno;
unsigned int seed = getpid();
struct work *work_item;
char buf[256];
// 员工编号范围在 10-350 之间,是以下数值的倍数
// * 10.
char *delete_str1 = "DELETE FROM STAFF WHERE ID= %i ";
char *delete_str2 = "DELETE FROM EMPLOYEE WHERE EMPNO=' %06i '";
// 生成每个线程要完成的工作。
for (i = 0; i < loops; i++)
{
work_item = new work;
strcpy(work_item->database,数据库);
strcpy(work_item->userid,userid);
strcpy(work_item->password,密码);
srand(种子);
empno = ((rand() % 1000) + 1) * 10;
sprintf(buf, i % 2? delete_str1 :, empno); delete_str2
work_item->command = strdup(buf);
dispatch(work_item);
}
return;
}//generate_work
// 当前线程将被暂停,直到获得所需的资源
// 是可用的(即:上下文是自由的)。 此时会创建一个线程
// 执行指定的 SQL 语句。
void dispatch(struct work *work_item)
{
int rc, ctx;
rc = pthread_mutex_lock(&cond_m);
check_expected(rc == 0);
while (!contexts_free)
{
rc = pthread_cond_wait(&cond,&cond_m);
check_expected(rc == 0);
}
// 此时至少有一个自由上下文,请找出一个
for (ctx = 0; ctx < contexts; ctx++)
{
如果 (ctxlist[ctx].free)
{
break;
}
}
ctxlist[ctx].free = 0;
contexts_free--;
rc = pthread_mutex_unlock(&cond_m);
check_expected(rc == 0);
work_item->context = ctx;
如果 (verbose)
{
pthread_mutex_lock (&output);
cout << "为 SQL 语句在上下文 " << ctx << " 上创建线程:"
<< "\n\t\"" << work_item->command << "\"" << endl;
pthread_mutex_unlock (&output);
}
#ifdef USE_UI_THREADS
rc = thr_create(NULL、
默认堆栈大小、
do_work、
(void *)work_item、
thr_bound | thr_detached、
&thd[ctx]);
#else
rc = pthread_create(&thd[ctx],&attr,do_work,(void *)work_item);
#endif
check_expected(rc == 0);
return;
}//调度
// 执行 SQL 语句。 这是每个
// 工人线程。
//
// 将为连接附加一个上下文,并完成连接、
// 将准备并执行一条简单的 SQL 语句。
//
// 在此之后,或者在非终端错误的情况下,上下文将是
// 如果发生了附着,就会被分离,任何进一步的资源
// 将进行删除分配。
//
// 在终止之前,如果出现以下情况,将发出唤醒调度的信号
// 没有可用的上下文。
void *doo_work(void *args)
{
EXEC SQL BEGIN DECLARE SECTION;
char dbname[15];
char user[15];
char pswd[15];
char statement[256];
exec sql end declare section;
int rc, status = 0;
struct sqlca sqlca;
struct work *work = (struct work *)args;
strcpy(dbname, work->database);
strcpy(user,work->userid);
strcpy(pswd, work->password);
如果 (verbose)
{
pthread_mutex_lock (&output);
cout << work->context << ": sqleAttachToCtx"" << endl;
pthread_mutex_unlock (&output);
}
rc = sqleAttachToCtx(ctxlist[work->context ].ctx, NULL, &sqlca);
check_expected(rc == 0 && sqlca.sqlcode == 0);
如果 (verbose)
{
pthread_mutex_lock (&output);
cout << work->context << ": CONNECT TO " << dbname << endl;
pthread_mutex_unlock (&output);
}
如果 (strlen(user) == 0)
{
EXEC SQL CONNECT TO :dbname;
}
else
{
EXEC SQL CONNECT TO :dbname USER :user USING :pswd;
}
CHECKERR(work->context, "CONNECT TO DATABASE", &status);
if ( sqlca.sqlcode!= 0)
{
clean_up(work, 0, &status);
}
else
{
strcpy(statement,work->command);
如果 (verbose)
{
pthread_mutex_lock (&output);
cout << work->context << ": EXECUTE \"" << statement << "\"" << endl;
pthread_mutex_unlock (&output);
}
EXEC SQL EXECUTE IMMEDIATE :statement;
CHECKERR(work->context, "EXECUTE IMMEDIATE", &status);
clean_up(work, 1, &status);
}
return (void *)status; // 这可以通过 pthread_join 获得
// 如果线程是在未连接的情况下创建的
}//doo_work
void clean_up(struct work *work, int connect_done, int *pStatus )
{
int rc;
struct sqlca sqlca;
if (connect_done)
{
如果(提交)
{
如果 (verbose)
{
pthread_mutex_lock (&output);
cout << work->context << ": COMMIT" << endl;
pthread_mutex_unlock (&output);
}
EXEC SQL COMMIT;
CHECKERR(work->context, "COMMIT", pStatus );
}
else
{
如果 (verbose)
{
pthread_mutex_lock (&output);
cout << work->context << ":ROLLBACK" << endl;
pthread_mutex_unlock (&output);
}
执行 sql 回滚;
CHECKERR(work->context, "ROLLBACK", pStatus );
}
如果 (verbose)
{
pthread_mutex_lock (&output);
cout << work->context << ": CONNECT RESET" << endl;
pthread_mutex_unlock (&output);
}
exec sql connect reset;
CHECKERR(work->context, "CONNECT RESET", pStatus )}
如果 (verbose)
{
pthread_mutex_lock (&output);
cout << work->context << ": sqleDetachFromCtx"" << endl;
pthread_mutex_unlock (&output);
}
rc = sqleDetachFromCtx(ctxlist[work->context ].ctx, NULL, &sqlca);
check_expected(rc == 0 && sqlca.sqlcode == 0);
rc = pthread_mutex_lock(&cond_m);
check_expected(rc == 0);
如果 (verbose)
{
pthread_mutex_lock (&output);
cout << work->context << ": marking context free" << endl;
pthread_mutex_unlock (&output);
}
ctxlist[work->context].free = 1;
contexts_free++;
rc = pthread_cond_signal(&cond);
check_expected(rc == 0);
rc = pthread_mutex_unlock(&cond_m);
check_expected(rc == 0);
删除工作->命令;
删除工作;
return;
}//清理
// 该存储过程检查 SQLCACODE 标志,并打印出任何
// 与特定错误有关的可用信息。
int check_error(char eString[ ], struct sqlca *caPointer )
{
char eBuffer[1024 ];
char sBuffer[1024 ];
char message[1024];
char messToken[1024 ];
短程遥控
int status = 0;
if ( caPointer->sqlcode!= 0 && caPointer->sqlcode!= 100
&& caPointer->sqlcode!= -438 && caPointer->sqlcode!= -532)
{
strcpy(信息,"");
sprintf( messToken, "--- 错误报告 ---\n");
strcat(message, messToken );
sprintf( messToken, "ERROR occurred : %s.\nSQLCODE : %ld\n"、
eString, caPointer->sqlcode);
strcat(message, messToken );
// 获取 sqlstate 信息
rc = sqlogstt( sBuffer, 1024, 80, caPointer->sqlstate );
// 调用 GET ERROR MESSAGE API
Erc = sqlaintp( eBuffer, 1024, 80, caPointer );
// 返回代码是 eBuffer 字符串的长度
如果 (Erc > 0)
{
sprintf( messToken, " %s ", eBuffer );
strcat(message, messToken );
}
if ( caPointer->sqlcode < 0)
{
if (rc == 0)
{
sprintf( messToken, "\n %s ", sBuffer );
strcat(message, messToken );
}
sprintf( messToken, "--- 错误报告结束 ---\n");
strcat(message, messToken );
pthread_mutex_lock (&output);
cout << message;
pthread_mutex_unlock (&output);
return 1;
}
else
{
// errorCode 只是一条警告信息
if (rc == 0)
{
sprintf( messToken, "\n %s ", sBuffer );
strcat(message, messToken );
}
sprintf( messToken, "--- 错误报告结束 ---\n");
strcat(message, messToken );
sprintf( messToken, "WARNING - CONTINUING PROGRAM WITH WARNINGS!\n");
strcat(message, messToken );
pthread_mutex_lock (&output);
cout << message;
pthread_mutex_unlock (&output);
return 0;
}
}
return 0;
}//check_error