/****************************************************************************

** (c) Copyright IBM Corp. 2007 保留所有权利。

** 

** 以下源代码示例("示例")属于国际 

** 商业机器公司或其子公司之一(" IBM "),并且是 

** 有版权和许可证,不得出售。 您可以使用、复制、修改和 

** 在不向 IBM 付费的情况下,以任何形式分发样品,目的是 

** 协助您开发应用程序。

** 

** 示例代码是在 "原样 "的基础上提供给您的,不对以下方面作出保证 

** 任何一种。 IBM 在此,本公司明确声明不作任何保证,无论是明示的还是 

** 默示保证,包括但不限于以下默示保证 

** 适销性和特定用途的适用性。 一些辖区确实 

** 不允许排除或限制默示保证,因此上述 

** 限制或排除条款可能不适用于您。 IBM SHALL NOT BE LIABLE FOR 

** 您因使用、复制、修改或执行以下行为而遭受的任何损失 

** 分发样品,即使 IBM 已被告知可能出现以下情况 

** 此类损害赔偿。

*****************************************************************************

**

** 源文件名称: dbthrds.sqC

**

** 示例:如何在 UNIX 上使用多上下文 API

**

** 该程序使用 POSIX 线程应用程序接口创建和运行线程

** 管理。  在 Solaris 系统上,也可以使用

** Solaris 线程 API,如 thd_create。

**

** 程序会维护一个语境库。 A 生成工作

** 函数从 main() 开始执行,并创建动态 SQL

** 由工作线程执行的语句。 当

** 上下文可用,创建并调度线程

** 完成指定的工作。

**

** 生成的工作包括删除条目的语句

** 从 SAMPLE 数据库的 STAFF 表或 EMPLOYEE 表中选取。

**

** 使用多线程 C 编译器选项进行编译和链接

** 平台支持的应用程序。

**

** 注:

** 在某些环境下,输出可能会出现乱码,因为

** 一个线程进程在输出信息的同时

** 另一个进程,从而覆盖输出字符串。 如果这

** 您可以为输出添加一个锁定机制

** 因此,任何时候都只有一个进程输出。

**

** 使用的 sql 语句:

* * 连接

** 立即执行

**

**                           

*****************************************************************************

**

** 有关示例程序的更多信息,请参阅 README 文件。

**

** 有关开发嵌入式 SQL 应用程序的信息,请参阅《开发嵌入式 SQL 应用程序》一书。

**

** 有关使用 SQL 语句的信息,请参阅《SQL 参考》。

**

** 有关 DB2 API 的信息,请参阅《管理 API 参考》。

**

** 有关编程、构建和运行的最新信息 DB2

** 申请请访问 DB2 信息中心:

**     http://publib.boulder.ibm.com/infocenter/db2luw/v9r7/index.jsp

****************************************************************************/



#ifdef USE_UI_THREADS

// Sun 拥有 "Unix 国际 "线程应用程序接口

#include < thread.h >

#include < synch.h >

#else

#include <pthread.h>

#endif



#include <sql.h>

#include < unistd.h >

#include <stdlib.h>

#include <string.h>

#include <stdio.h>

#if ((__cplusplus >= 199711L ) &&!定义了 DB2HP &&!定义了 DB2AIX )|| \

    ( DB2LINUX && ( __LP64__ || (__gnuc__ >= 3)) ) )

   #include <iostream>

   using namespace std;

#else

   #include < iostream.h >

#endif





#ifdef USE_UI_THREADS

// 隐藏线程实现中的差异

#define pthread_exit(x) thr_exit(x)

#define pthread_mutex_lock(x) mutex_lock(x)

#define pthread_mutex_unlock(x) mutex_unlock(x)

#define pthread_mutex_init(x,y) mutex_init(x, USYNC_THREAD, y)

#define pthread_cond_init(x,y) cond_init(x, USYNC_THREAD, y)

#define pthread_cond_wait(x,y) cond_wait(x,y)

#define pthread_cond_signal(x) cond_signal(x)

#define pthread_mutex_t mutex_t

#define pthread_cond_t cond_t

#define pthread_t thread_t

#endif



#if (defined( DB2HP ))

#define DEFAULT_STACK_SIZE 0x20000

#else

#define DEFAULT_STACK_SIZE 0

#endif



#define check_expected(condition) \

{                                                                  \

  if (!(condition)) \

  {                                                                \

    cerr << __FILE__ << ":" << __LINE__ << " 意外错误:\""\

         << #condition << "\"是假的" << endl; \

    exit(1); \

  }                                                                \

}



#define CHECKERR(context, CE_STR, pStatus ) \

{                                                       \

  char buf[256]; \

  sprintf(buf, "Context nb.: %i \n %s ", context, CE_STR); \

  if (check_error(buf, &sqlca)!= 0)\\

  {                                                     \

    *(pStatus) = sqlca.sqlcode;                         \

  }                                                     \

}



// 生成工作会创建以下类型的结构,并将其传递给

// 给每个工作线程。

结构工作

{

  char database[15]; // 供线程连接的数据库

  char userid[15];

  char password[15];

  char *command; // 要执行的动态 SQL 语句

  int context; // 用于连接的上下文

};



// 上下文池由 "struct context "类型数组组成。

结构上下文

{

  void *ctx;

  int free;

};



// 全局变量。

int contexts = 8; // 上下文池的大小

结构上下文 *ctxlist;



#ifndef USE_UI_THREADS

pthread_attr_t attr; // 全局线程属性

#endif

pthread_t *thd; // 线程 ID 数组



int loops = 15; // 客户端创建的工作量

int commit = 0; // 提交已完成的工作

int verbose = 1;

char database[15];

char userid[15];

char password[15];



// 用于管理上下文池

int contexts_free;

pthread_cond_t cond;

pthread_mutex_t cond_m;



// 用于序列化 cout 的互斥

pthread_mutex_t 输出;



// 原型。

void initialize(int argc, char *argv[]);

void usage(char *argv0 );

void generate_work();

void dispatch(struct work *work_item);

void *doo_work(void *args); // 每个线程都执行该函数

void clean_up(struct work *work_item, int connect_done, int *pStatus );

int check_error(char eString[ ], struct sqlca *caPointer );



int main(int argc, char *argv[])

{



  // 初始化用于序列化 cout 的互斥器

  pthread_mutex_init(&output, NULL);



  pthread_mutex_lock (&output);

  cout << ""\nHow to use multiple context APIs" << endl << endl;

  pthread_mutex_unlock (&output);



  初始化(argc,argv);

  generate_work();

  如果 (verbose)

  {

    pthread_mutex_lock (&output);

    cout << "所有工作者都已启动,退出主系统" << endl;

    pthread_mutex_unlock (&output);

  }

  pthread_exit(0);

}//main



// 初始化全局程序状态。  这包括以下属性

// 用于每个线程的创建、多手动上下文的设置

// 类型和创建上下文池。

void initialize(int argc, char *argv[])

{

  int c, i, rc;

  struct sqlca sqlca;



  strcpy(database, "sample");

  strcpy(userid,"");

  strcpy(密码,"");



  // 读取任何命令行选项

  while ((c = getopt(argc, argv, "d:u:p:l:c:qCh" ))!= EOF)

  {

    开关

    {

      情况 'd':

        strcpy(数据库,optarg);

        break;

      情况 'u':

        strcpy(userid,optarg);

        break;

      案例 'p':

        strcpy(password,optarg);

        break;

      情况 'l':

        loops = atoi(optarg);

        break;

      case 'c':

        contexts = atoi(optarg);

        break;

      案例 'q':

        verbose = 0;

        break;

      case 'c':

        提交 = 1;

        break;

      案例 'h':

      缺省值:

        usage(argv[0]);

        break;

    }

  }

  cout << "数据库:" << 数据库 << endl;

  cout << "Username: " << userid << endl;

  cout << "Password: " << password << endl;

  cout << "Loops:" << 循环 << endl;

  cout << "上下文:" << contexts << endl;

  cout << "Verbose:" << verbose << endl;

  cout << "提交:" << commit << endl;

  contexts_free = contexts;



  ctxlist = new context[contexts];

  check_expected(ctxlist!= NULL);



  thd = new pthread_t[contexts];

  check_expected(thd!= NULL);



#ifndef USE_UI_THREADS

  rc = pthread_attr_init(&attr);

  check_expected(rc == 0);



  rc = pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);

  check_expected(rc == 0);



#if (defined( DB2DYNIX )) || (defined( DB2HP ))

  rc = pthread_attr_setstacksize(&attr,DEFAULT_STACK_SIZE);

#endif



#ifdef _POSIX_THREAD_PRIORITY_SCHEDULING



#if (defined( DB2IRIX ))

  rc = pthread_attr_setscope(&attr,PTHREAD_SCOPE_PROCESS);

#else

  rc = pthread_attr_setscope(&attr,PTHREAD_SCOPE_SYSTEM);

#endif

  check_expected(rc == 0);

#endif

#endif



  sqleSetTypeCtx(SQL_CTX_MULTI_MANUAL);



  如果 (verbose)

  {

    pthread_mutex_lock (&output);

    cout << "创建大小为 " << 上下文 << endl 的上下文池;

    pthread_mutex_unlock (&output);

  }



  for (i = 0; i < contexts; i++)

  {

    rc = sqleBeginCtx(&ctxlist[i ].ctx, SQL_CTX_CREATE_ONLY, NULL, &sqlca);

    check_expected(rc == 0 && sqlca.sqlcode == 0);

    ctxlist[i].free = 1;

  }



  rc = pthread_mutex_init(&cond_m,NULL);

  check_expected(rc == 0);



  rc = pthread_cond_init(&cond,NULL);

  check_expected(rc == 0);



  return;

}//初始化



// 打印友好的使用信息。

void usage(char *argv0 )

{

  char *program = strrchr( argv0, '/');



  如果 (!program)

  {

    程序 = argv0;

  }



  cerr << "usage:" << 程序 << endl

       << " [[-d 数据库] [[-u 用户名] [[-p 密码]" << endl

       << " [[-l 循环] [[-c 上下文] [-q] [-C] [-h]"" << endl

       << endl

       << " -d \t alternate sample database or database alias." << endl

       << " -u \t 用户 ID." << endl

       << " -p \t password." << endl

       << " -l \t循环次数。" << endl

       << " -c \t 要使用的上下文池大小。" << endl

       << " -q \t quiet mode." << endl

       << " -C \t commit changes made." << endl

       << " -h \t print this message." << endl;



  exit(1);

}//使用



// 构造一条 "随机 "SQL 语句,在连接到一个

// 任意数据库。

// 请注意,在这里独家使用 SAMPLE 数据库并不是 db2

// 限制,只是为了简化程序。

void generate_work()

{

  int i, empno;

  unsigned int seed = getpid();

  struct work *work_item;

  char buf[256];



  // 员工编号范围在 10-350 之间,是以下数值的倍数

  // * 10.

  char *delete_str1 = "DELETE FROM STAFF WHERE ID= %i ";

  char *delete_str2 = "DELETE FROM EMPLOYEE WHERE EMPNO=' %06i '";



  // 生成每个线程要完成的工作。

  for (i = 0; i < loops; i++)

  {

    work_item = new work;



    strcpy(work_item->database,数据库);

    strcpy(work_item->userid,userid);

    strcpy(work_item->password,密码);



    srand(种子);

    empno = ((rand() % 1000) + 1) * 10;

    sprintf(buf, i % 2? delete_str1 :, empno); delete_str2



    work_item->command = strdup(buf);



    dispatch(work_item);

  }



  return;

}//generate_work



// 当前线程将被暂停,直到获得所需的资源

// 是可用的(即:上下文是自由的)。  此时会创建一个线程

// 执行指定的 SQL 语句。

void dispatch(struct work *work_item)

{

  int rc, ctx;



  rc = pthread_mutex_lock(&cond_m);

  check_expected(rc == 0);

  while (!contexts_free)

  {

    rc = pthread_cond_wait(&cond,&cond_m);

    check_expected(rc == 0);

  }



  // 此时至少有一个自由上下文,请找出一个

  for (ctx = 0; ctx < contexts; ctx++)

  {

    如果 (ctxlist[ctx].free)

    {

      break;

    }

  }

  ctxlist[ctx].free = 0;

  contexts_free--;



  rc = pthread_mutex_unlock(&cond_m);

  check_expected(rc == 0);



  work_item->context = ctx;



  如果 (verbose)

  {

    pthread_mutex_lock (&output);

    cout << "为 SQL 语句在上下文 " << ctx << " 上创建线程:"

         << "\n\t\"" << work_item->command << "\"" << endl;

    pthread_mutex_unlock (&output);

  }

#ifdef USE_UI_THREADS

  rc = thr_create(NULL、

                  默认堆栈大小、

                  do_work、

                  (void *)work_item、

                  thr_bound | thr_detached、

                  &thd[ctx]);

#else

  rc = pthread_create(&thd[ctx],&attr,do_work,(void *)work_item);

#endif

  check_expected(rc == 0);



  return;

}//调度



// 执行 SQL 语句。  这是每个

// 工人线程。

//

// 将为连接附加一个上下文,并完成连接、

// 将准备并执行一条简单的 SQL 语句。

//

// 在此之后,或者在非终端错误的情况下,上下文将是

// 如果发生了附着,就会被分离,任何进一步的资源

// 将进行删除分配。

//

// 在终止之前,如果出现以下情况,将发出唤醒调度的信号

// 没有可用的上下文。

void *doo_work(void *args)

{

  EXEC SQL BEGIN DECLARE SECTION;

    char dbname[15];

    char user[15];

    char pswd[15];

    char statement[256];

  exec sql end declare section;



  int rc, status = 0;

  struct sqlca sqlca;

  struct work *work = (struct work *)args;



  strcpy(dbname, work->database);

  strcpy(user,work->userid);

  strcpy(pswd, work->password);



  如果 (verbose)

  {

    pthread_mutex_lock (&output);

    cout << work->context << ": sqleAttachToCtx"" << endl;

    pthread_mutex_unlock (&output);

  }

  rc = sqleAttachToCtx(ctxlist[work->context ].ctx, NULL, &sqlca);

  check_expected(rc == 0 && sqlca.sqlcode == 0);



  如果 (verbose)

  {

    pthread_mutex_lock (&output);

    cout << work->context << ": CONNECT TO " << dbname << endl;

    pthread_mutex_unlock (&output);

  }



  如果 (strlen(user) == 0)

  {

    EXEC SQL CONNECT TO :dbname;

  }

  else

  {

    EXEC SQL CONNECT TO :dbname USER :user USING :pswd;

  }

  CHECKERR(work->context, "CONNECT TO DATABASE", &status);

  if ( sqlca.sqlcode!= 0)

  {

    clean_up(work, 0, &status);

  }

  else

  {

    strcpy(statement,work->command);



    如果 (verbose)

    {

      pthread_mutex_lock (&output);

      cout << work->context << ": EXECUTE \"" << statement << "\"" << endl;

      pthread_mutex_unlock (&output);

    }



    EXEC SQL EXECUTE IMMEDIATE :statement;

    CHECKERR(work->context, "EXECUTE IMMEDIATE", &status);



    clean_up(work, 1, &status);

  }



  return (void *)status; // 这可以通过 pthread_join 获得

                         // 如果线程是在未连接的情况下创建的

}//doo_work



void clean_up(struct work *work, int connect_done, int *pStatus )

{

  int             rc;

  struct sqlca sqlca;



  if (connect_done)

  {

    如果(提交)

    {

      如果 (verbose)

      {

        pthread_mutex_lock (&output);

        cout << work->context << ": COMMIT" << endl;

        pthread_mutex_unlock (&output);

      }



      EXEC SQL COMMIT;

      CHECKERR(work->context, "COMMIT", pStatus );

    }

    else

    {

      如果 (verbose)

      {

        pthread_mutex_lock (&output);

        cout << work->context << ":ROLLBACK" << endl;

        pthread_mutex_unlock (&output);

      }



      执行 sql 回滚;

      CHECKERR(work->context, "ROLLBACK", pStatus );

    }



    如果 (verbose)

    {

      pthread_mutex_lock (&output);

      cout << work->context << ": CONNECT RESET" << endl;

      pthread_mutex_unlock (&output);

    }



    exec sql connect reset;

  CHECKERR(work->context, "CONNECT RESET", pStatus )}



  如果 (verbose)

  {

    pthread_mutex_lock (&output);

    cout << work->context << ": sqleDetachFromCtx"" << endl;

    pthread_mutex_unlock (&output);

  }

  rc = sqleDetachFromCtx(ctxlist[work->context ].ctx, NULL, &sqlca);

  check_expected(rc == 0 && sqlca.sqlcode == 0);



  rc = pthread_mutex_lock(&cond_m);

  check_expected(rc == 0);



  如果 (verbose)

  {

    pthread_mutex_lock (&output);

    cout << work->context << ": marking context free" << endl;

    pthread_mutex_unlock (&output);

  }

  ctxlist[work->context].free = 1;

  contexts_free++;



  rc = pthread_cond_signal(&cond);

  check_expected(rc == 0);

  rc = pthread_mutex_unlock(&cond_m);

  check_expected(rc == 0);



  删除工作->命令;

  删除工作;



  return;

}//清理



// 该存储过程检查 SQLCACODE 标志,并打印出任何

// 与特定错误有关的可用信息。

int check_error(char eString[ ], struct sqlca *caPointer )

{

  char eBuffer[1024 ];

  char sBuffer[1024 ];

  char message[1024];

  char messToken[1024 ];

  短程遥控

  int       status = 0;



  if ( caPointer->sqlcode!= 0 && caPointer->sqlcode!= 100

        && caPointer->sqlcode!= -438 && caPointer->sqlcode!= -532)

  {

    strcpy(信息,"");



    sprintf( messToken, "--- 错误报告 ---\n");

    strcat(message, messToken );



    sprintf( messToken, "ERROR occurred : %s.\nSQLCODE : %ld\n"、

            eString, caPointer->sqlcode);

    strcat(message, messToken );



    // 获取 sqlstate 信息

    rc = sqlogstt( sBuffer, 1024, 80, caPointer->sqlstate );



    // 调用 GET ERROR MESSAGE API

    Erc = sqlaintp( eBuffer, 1024, 80, caPointer );



    // 返回代码是 eBuffer 字符串的长度

    如果 (Erc > 0)

    {

      sprintf( messToken, " %s ", eBuffer );

      strcat(message, messToken );

    }



    if ( caPointer->sqlcode < 0)

    {

      if (rc == 0)

      {

        sprintf( messToken, "\n %s ", sBuffer );

        strcat(message, messToken );

      }

      sprintf( messToken, "--- 错误报告结束 ---\n");

      strcat(message, messToken );



      pthread_mutex_lock (&output);

      cout << message;

      pthread_mutex_unlock (&output);



      return 1;

    }

    else

    {

      // errorCode 只是一条警告信息

      if (rc == 0)

      {

        sprintf( messToken, "\n %s ", sBuffer );

        strcat(message, messToken );

      }

      sprintf( messToken, "--- 错误报告结束 ---\n");

      strcat(message, messToken );



      sprintf( messToken, "WARNING - CONTINUING PROGRAM WITH WARNINGS!\n");

      strcat(message, messToken );



      pthread_mutex_lock (&output);

      cout << message;

      pthread_mutex_unlock (&output);



      return 0;

    }

  }

  return 0;

}//check_error