数据流触发器

数据流触发器概述

数据流触发器是事件框架根据数据流中发生的事件启动任务的指令。 您也可以使用数据流触发器,在 JDBC 查询消费者处理完所有可用数据后停止数据流

事件框架由以下部分组成
事件生成
事件框架生成与流程相关的事件和与阶段相关的事件。 框架仅在开始和停止时生成事件。 当发生与阶段相关的特定行为时,该框架就会生成阶段事件。 产生事件的操作因阶段而异,与阶段处理数据的方式有关。
例如, Amazon S3 目标程序在完成向对象写入数据和处理整个文件后会生成事件。
事件产生事件记录与流量相关的事件记录会立即传递给指定的事件消费者。 与阶段相关的事件记录以事件流的形式在流程中传递。
任务执行
要触发任务,需要一个执行器。 执行阶段在数据收集器或外部系统中执行任务。 执行器每次接收到一个事件,就会执行指定的任务。
例如, 管道终结器执行器会在接收到事件后停止流程 ,将流程过渡到 "完成 "状态。
事件存储器
要存储事件信息,可将事件传递给目标目标机将事件记录写入目标机系统,就像写入其他数据一样。
例如,您可以存储事件记录,以便对流量 读取的文件进行审计跟踪。

流量事件生成

事件框架会在数据收集器流的生命周期中特定点生成事件。 您可以配置流程属性,将每个事件传递给执行器或另一个流程 ,以进行更复杂的处理。

事件框架会生成以下与流量相关的事件:
流量启动
流程启动事件流程初始化时生成,在流程启动后立即生成,在各个阶段初始化之前生成。 这可以为执行器在各阶段初始化前执行任务留出时间。
大多数执行器都在等待任务完成的确认。 因此, 流程会等待执行器完成任务,然后再继续进行阶段初始化。 例如,如果将 JDBC 查询执行器配置为在流程开始前截断表, 流程将等待任务完成后再处理任何数据。

如果执行器无法处理事件,例如 JDBC 查询执行器无法执行指定查询或查询失败,则初始化阶段失败, 流程不会启动。 相反, 流量会过渡到故障状态。

流量停止
流动 停止事件已生成 流动 停止时,无论是手动停止、程序停止还是由于故障停止。 停止事件在所有阶段完成处理和清理临时资源(如删除临时文件)后生成。 这样,执行器就可以在流程处理完成后、 流程完全停止前执行任务。

与启动事件消费者类似,消耗事件的执行器的行为决定了流程是否等待执行器任务完成后才允许流程停止。 此外,如果流量停止事件的处理因故失败,即使数据处理成功, 流量也会过渡到失败状态。

流动事件与阶段事件的区别如下:
  • 虚拟处理 - 与阶段事件不同, 流程事件不会由您在画布中配置的阶段进行处理。 它们会传递给您在流程属性中配置的事件消费者。

    事件消费者不会显示在流程的画布中。 因此,数据预览中也不会显示流量事件。

  • 一次性事件 - 您只能在流程属性中为每种事件类型配置一个事件消费者:一个用于开始事件,一个用于停止事件。

    必要时,您可以将流程事件传递给另一个流程。 在事件消耗流程中,您可以根据需要添加多个阶段,以进行更复杂的处理。

使用流量事件

您可以在独立流量中配置流量事件。 配置 flow 事件时,可以将其配置为由执行器或其他 flow 消耗。

当执行器可以执行您需要的所有任务时,将事件传递给执行器。 您可以为每种事件类型配置一个执行器。

当您需要在消费流中执行更复杂的任务时,可将事件传递给另一个 ,例如将事件传递给多个执行器或一个执行器和目标进行存储。

转交给执行人

您可以配置流程 ,将每种事件类型传递给一个执行阶段。 这样就可以在流程开始或停止时触发任务。 您可以分别配置每种事件类型的行为。 您还可以放弃任何不想使用的事件。

注意: 如果指定的执行器无法处理事件,例如 Shell 执行器无法执行脚本, 流程就会过渡到失败状态。
要将流程事件传递给执行器,请执行以下步骤:
  1. 流程属性中,选择要消耗事件的执行器。
  2. 流程属性中,配置执行器以执行任务。
示例

假设您想在流量开始时发送一封电子邮件。 首先,将流程配置为使用电子邮件执行器来处理流程启动事件。 由于不需要 "停止 "事件,因此只需使用默认的 "丢弃 "选项即可:

流程属性页面配置了电子邮件执行器作为开始事件,默认丢弃作为停止事件

然后,在流程属性中配置电子邮件执行器。 您可以配置发送电子邮件的条件。 如果省略条件,执行器会在每次收到事件时发送电子邮件:

流程属性 启动事件选项卡显示电子邮件配置属性

舞台事件生成

您可以配置某些阶段来生成事件。 根据不同阶段处理数据的方式,每个阶段产生的事件也不尽相同。 有关各阶段事件生成的详细信息,请参阅阶段文档中的 "事件生成"。

下表列出了事件生成阶段及其可生成事件的时间:
暂存 当舞台...
Amazon S3 消息来源
  • 开始处理一个对象。
  • 完成对一个对象的处理。
  • 完成处理所有可用对象,且配置的批处理等待时间已过。

更多信息,请参阅源文件中的 " 事件生成 "。

Azure Blob Storage 消息来源
  • 开始处理一个对象。
  • 完成对一个对象的处理。
  • 完成处理所有可用对象,且配置的批处理等待时间已过。

更多信息,请参阅源文件中的 " 事件生成 "。

Azure 数据湖存储 Gen2 来源
  • 开始处理一个对象。
  • 完成对一个对象的处理。
  • 完成处理所有可用对象,且配置的批处理等待时间已过。

更多信息,请参阅源文件中的 " 事件生成 "。

Azure 数据湖存储 (传统) Gen2
  • 开始处理文件。
  • 完成文件处理。
  • 完成处理所有可用文件,且配置的批处理等待时间已过。

更多信息,请参阅源文件中的 " 事件生成 "。

目录来源
  • 开始处理文件。
  • 完成文件处理。
  • 完成处理所有可用文件,且配置的批处理等待时间已过。

更多信息,请参阅源文件中的 " 事件生成 "。

文件尾部来源
  • 开始处理文件。
  • 完成文件处理。

更多信息,请参阅源文件中的 " 事件生成 "。

Google BigQuery 消息来源
  • 成功完成查询。

更多信息,请参阅源文件中的 " 事件生成 "。

Google Cloud Storage 消息来源
  • 完成处理所有可用对象,且配置的批处理等待时间已过。

更多信息,请参阅源文件中的 " 事件生成 "。

Groovy 脚本
  • 运行生成事件的脚本。

更多信息,请参阅源文件中的 " 事件生成 "。

JavaScript 脚本
  • 运行生成事件的脚本。

更多信息,请参阅源文件中的 " 事件生成 "。

JDBC 多种消费者来源
  • 完成对所有表的查询所返回数据的处理。

更多信息,请参阅源文件中的 " 事件生成 "。

JDBC 查询消费者来源
  • 完成处理查询返回的所有数据。
  • 成功完成查询。
  • 无法完成查询。

更多信息,请参阅源文件中的 " 事件生成 "。

Jython 脚本
  • 运行生成事件的脚本。

更多信息,请参阅源文件中的 " 事件生成 "。

MongoDB Atlas
  • 完成处理查询返回的所有数据。

更多信息,请参阅源文件中的 " 事件生成 "。

Oracle 散装货源
  • 完成表格中数据的处理。

更多信息,请参阅源文件中的 " 事件生成 "。

Oracle 疾病预防控制中心资料来源
  • 流量启动
  • LogMiner 会议开始
  • 受监控的表格发生变化

更多信息,请参阅源文件中的 " 事件生成 "。

Oracle CDC 客户端源
  • 读取重做日志中的 DDL 语句。

更多信息,请参阅源文件中的 " 事件生成 "。

Oracle 多种消费者来源
  • 完成对所有表的查询所返回数据的处理。

更多信息,请参阅源文件中的 " 事件生成 "。

Oracle XStream
  • 接收 DDL 事件
  • 清理 XStream 组件
更多信息,请参阅源文件中的 " 事件生成 "。
Salesforce 消息来源
  • 完成处理查询返回的所有数据。

更多信息,请参阅源文件中的 " 事件生成 "。

Salesforce 批量 API 2.0 来源
  • 完成处理查询返回的所有数据。

更多信息,请参阅源文件中的 " 事件生成 "。

SAP HANA 查询消费者来源
  • 完成处理查询返回的所有数据。
  • 成功完成查询。
  • 无法完成查询。

更多信息,请参阅源文件中的 " 事件生成 "。

SFTP/FTP/FTPS 客户端来源
  • 开始处理文件。
  • 完成文件处理。
  • 完成处理所有可用文件,且配置的批处理等待时间已过。

更多信息,请参阅源文件中的 " 事件生成 "。

SQL Server CDC 客户端源
  • 完成相关 CDC 表中数据的处理。
  • 启用模式变更检查后, 源会在每次检测到模式变更时生成一个事件。

更多信息,请参阅源文件中的 " 事件生成 "。

SQL Server 更改跟踪
  • 完成处理所有指定变更跟踪表中的数据。

更多信息,请参阅源文件中的 " 事件生成 "。

网络客户端
  • 完成对所有可用数据的处理。

更多信息,请参阅源文件中的事件生成

窗口聚合处理器
  • 根据配置的窗口类型和时间窗口执行聚合。

更多信息,请参阅处理器文档中的事件生成

Groovy 评估处理器
  • 运行生成事件的脚本。

更多信息,请参阅处理器文档中的事件生成

JavaScript 评估员处理器
  • 运行生成事件的脚本。

更多信息,请参阅处理器文档中的事件生成

Jython 评估处理器
  • 运行生成事件的脚本。

更多信息,请参阅处理器文档中的事件生成

TensorFlow 评估员处理器
  • 一次性评估整个批次。

如需更多信息,请参阅处理器文档中的 “事件生成”部分

网络客户端处理器
  • 完成对所有可用数据的处理。

更多信息,请参阅目标文档中的事件生成

窗口聚合处理器
  • 执行聚合。

更多信息,请参阅处理器文档中的事件生成

Amazon S3 目标
  • 完成对对象的写入。
  • 完成整个文件的流式传输。

更多信息,请参阅目标文档中的事件生成

Azure Blob Storage 目标
  • 完成写入 Blob。

更多信息,请参阅目标文档中的事件生成

Azure 数据湖存储 Gen2 目标
  • 关闭文件。
  • 完成整个文件的流式传输。

更多信息,请参阅目标文档中的事件生成

Google Cloud Storage 目标
  • 完成对对象的写入。
  • 完成整个文件的流式传输。

更多信息,请参阅目标文档中的事件生成

地方金融服务目标
  • 关闭文件。
  • 完成整个文件的流式传输。

更多信息,请参阅目标文档中的事件生成

SFTP/FTP/FTPS 客户端目标
  • 关闭文件。
  • 完成整个文件的流式传输。

更多信息,请参阅目标文档中的事件生成

Snowflake 文件上传目标
  • 完成整个文件的流式传输。

更多信息,请参阅目标文档中的 事件生成

网络客户端目标
  • 完成对所有可用数据的处理。

更多信息,请参阅目标文档中的事件生成

ADLS Gen2 文件元数据执行器
  • 更改文件元数据,如文件名、位置或权限。
  • 创建一个空文件。
  • 删除文件或目录。

更多信息,请参阅执行器文档中的事件生成

Amazon S3 执行人
  • 创建一个新的 Amazon S3 对象。
  • 将对象复制到其他位置。
  • 为现有对象添加标记。

更多信息,请参阅执行器文档中的事件生成

Databricks Job Launcher 执行器
  • 启动 Databricks 作业。

如需了解更多信息,请参阅执行器文档中的 “事件生成” 部分。

Databricks 查询执行器
  • 确定提交的查询已成功完成。
  • 确定提交的查询未能完成。

如需更多信息,请参阅执行器文档中的 “事件生成”部分

Google BigQuery 执行人
  • 确定提交的查询已成功完成。
  • 确定提交的查询未能完成。

更多信息,请参阅执行器文档中的事件生成

Google Cloud Storage 执行人
  • 创建一个新的 Google Cloud Storage 对象。
  • 将对象复制到另一个位置。
  • 将对象移动到另一个位置。
  • 为现有对象添加元数据。
更多信息,请参阅执行器文档中的事件生成
JDBC 查询执行器
  • 确定提交的查询已成功完成。
  • 确定提交的查询未能完成。

更多信息,请参阅执行器文档中的事件生成

Snowflake 执行人
  • 确定提交的查询已成功完成。
  • 确定提交的查询未能完成。

更多信息,请参阅执行器文档中的事件生成

使用舞台事件

您可以根据自己的需要,以任何方式使用与舞台相关的活动。 为阶段事件配置事件流时,可以向流中添加其他阶段。 例如,您可以使用流选择器将不同类型的事件路由到不同的执行器。 但不能将事件流与数据流合并。

您可以创建两种一般类型的事件流:
  • 任务执行流将事件路由到执行器以执行任务。
  • 事件存储流将事件路由到目标 ,以存储事件信息。
当然,您也可以配置一个事件流,通过将事件记录路由到执行器和目标 ,同时执行这两项任务。 您还可以配置事件流,以便根据需要将数据路由到多个执行器和目标

任务执行流

任务执行流将事件记录从事件生成阶段路由到执行阶段。 执行器每次收到事件记录时都会执行一项任务。

例如,您有一个从 Kafka 读取文件并将文件写入 HDFS 的流程

流程画布显示了带有 Kafka 消费者源和 Hadoop FS 目标的流程

Hadoop FS 关闭文件时,您希望将文件移动到不同的目录,并将文件权限更改为只读。

保持流程的其他部分不变,您可以在 Hadoop FS 目标中启用事件处理,将其连接到 HDFS 文件元数据执行器,并将 HDFS 文件元数据执行器配置为文件和更改权限。 由此产生的流程是这样的

流程画布显示了一个流程,其中 Hadoop FS 目标连接到 HDFS 文件元数据执行器

如果需要根据文件名或位置设置不同的权限,可以使用流选择器对事件记录进行相应的路由,然后使用两个 HDFS File Metadata 执行器来更改文件权限,如下所示:

流程画布显示了一个流程,其中 Hadoop FS 目标连接到 HDFS 文件元数据执行器

事件存储流

事件存储流将事件记录从事件生成阶段路由到目标目标将事件记录写入目标系统。

事件记录包括记录头属性和记录字段中的事件信息。 在将事件记录写入目标机之前,您可以在事件流中添加处理器来丰富事件记录。

例如,您有一个使用目录处理网络日志的流程

流程画布显示带有目录源的流程

Directory 每次开始和完成读取文件时都会生成事件记录,事件记录包括一个包含文件路径的字段。 出于审计目的,您希望将这些信息写入数据库表中。

在保持流程其余部分不变的情况下,您可以启用目录的事件处理功能,并按如下步骤将其连接到 JDBC Producer:

流程画布显示了一个带有目录源的流程,该目录源连接到 JDBC 生产者目的地以存储事件

但你想知道事件发生的时间。 目录事件记录将事件创建时间存储在 sdc.event.creation_timestamp 记录头属性中。 因此,您可以使用带有以下表达式的表达式评估器,将创建日期和时间添加到记录中:
${record:attribute('sdc.event.creation_timestamp')}
如果有多个流将事件写入同一位置,则可以使用以下表达式将流名称也包含在事件记录中:
${flow:name()}

表达式评估器和最终流程如下所示:

流程画布显示一个流程,该流程的目录源连接到一个表达式评估器,该表达式评估器被配置为将创建时间和日期添加到记录中

执行程序

执行器在收到事件记录时执行任务。

您可以使用以下执行阶段来处理事件:
ADLS Gen2 文件元数据执行器
收到事件后,在 Azure Data Lake Storage Gen2 中更改文件元数据、创建空文件或删除文件或目录。
在更改文件元数据时,除了指定所有者和组,以及更新文件的权限和 ACL 外,执行者还可以重命名和移动文件。 创建空文件时,执行者可以指定所有者和组,并为文件设置权限和 ACL。 删除文件和目录时,执行器会递归执行任务。
您可以以任何逻辑方式使用执行器,例如在 Azure Data Lake Storage Gen2 目标关闭后移动文件。
Amazon S3 执行人
为指定的内容创建新的 Amazon S3 对象,复制水桶内的对象,或在收到事件后为现有的 Amazon S3 对象添加标签。
您可以以任何逻辑方式使用执行器,例如将事件记录中的信息写入新的 S3 对象,或在 Amazon S3 目标写入对象后复制或标记对象。
Databricks Job Launcher 执行器
为每个事件启动 Databricks 作业。
您可以以任何逻辑方式使用执行器,例如在 Amazon S3 目标关闭文件后运行 Databricks 作业。
Databricks 查询执行器
收到事件后在 Databricks 上运行 Spark SQL 查询。
您可以以任何符合逻辑的方式使用执行器。
电子邮件执行人
收到事件后,向配置的收件人发送自定义电子邮件。 您可以选择配置一个条件来决定何时发送电子邮件。
您可以以任何合乎逻辑的方式使用执行器,例如,每次 Azure Data Lake Storage 目标完成整个文件的流式传输时,都会发送一封电子邮件。
有关如何使用电子邮件执行器的解决方案,请参阅在流程处理过程中发送电子邮件
Google BigQuery 执行人
收到事件后,在 Google BigQuery 上运行一个或多个 SQL 查询。
您可以以任何逻辑方式使用执行器,例如每次 Google BigQuery 成功完成查询时运行 SQL 查询。
Google Cloud Storage 执行人
为指定内容创建新的 Google Cloud Storage 对象,复制或移动项目中的对象,或在收到事件时为现有对象添加元数据。
您可以以任何逻辑方式使用执行器,例如在 Google Cloud Storage 读取对象后移动对象,或在 Google Cloud Storage 目标写入对象后为其添加元数据。
管道完成执行器
收到事件时停止流程 ,将流程过渡到完成状态。 允许流程在停止前完成所有预期处理。
您可以以任何逻辑方式使用管道终结器执行器,例如在收到来自 JDBC 查询消费者的无更多数据事件时停止流程 这样就能实现 "批量 "处理--在处理完所有可用数据后停止数据流 ,而不是让数据流无限期地闲置。
例如,您可以使用 Pipeline Finisher 执行器和 JDBC Multitable Consumer,以便在处理完指定表中的所有查询数据后停止数据流
有关如何使用管道终结器执行器的解决方案,请参阅在处理完所有可用数据后停止流程
JDBC 查询执行器
使用 JDBC 连接到数据库,并运行一个或多个指定的 SQL 查询。
用于在事件发生后在数据库中运行 SQL 查询。
SFTP/FTP/FTPS 客户端执行器
连接到 SFTP、FTP 或 FTPS 服务器,并在收到事件时移动或删除文件。
您可以以任何合乎逻辑的方式使用执行器,例如在 SFTP/FTP/FTPS 客户端源读取完文件后移动文件。
外壳执行器
为每个事件执行用户定义的 shell 脚本。
您可以以任何符合逻辑的方式使用执行器。
Snowflake 执行人
将 Snowflake File Uploader 目标缓存的整个文件加载到 Snowflake 表中。
有关如何使用执行器的详细信息,请参阅 Snowflake 文件上传器和执行器流程

逻辑配对

您可以根据自己的需要以任何方式使用事件。 下表概述了事件生成与执行器和目标的一些逻辑配对。

流动事件

流量事件类型 活动消费者
流量启动
  • 任何单一执行者,管道终结者除外。
  • 另一个额外处理流程
流量停止
  • 任何单一执行者,管道终结者除外。
  • 另一个额外处理流程

来源活动

事件发生源 活动消费者
Amazon S3
  • 管道终结器执行器用于在处理完所有对象的查询数据后停止流程
  • 电子邮件执行器,用于在完成处理可用对象时发送电子邮件。
  • 任何事件存储目标
Azure Blob Storage
  • 管道终结器执行器用于在处理完所有对象的查询数据后停止流程
  • 电子邮件执行器,用于在完成处理可用对象时发送电子邮件。
  • 任何事件存储目标
Azure Data Lake Storage Gen2
  • 管道终结器执行器用于在处理完所有对象的查询数据后停止流程
  • 电子邮件执行器,用于在处理完可用对象后发送电子邮件。
  • 任何事件存储目标
Azure Data Lake Storage Gen2(旧版)
  • 管道终结器执行器,用于在处理完所有可用文件后停止流程
  • 电子邮件执行器可在关闭文件或流式传输整个文件后发送电子邮件。
  • 任何事件存储目标
目录
  • 电子邮件执行器,用于在每次启动或完成文件处理时发送电子邮件。
  • 管道终结器执行器,用于在处理完所有可用文件后停止流程
  • 任何事件存储目标
文件实时跟踪
  • 电子邮件执行器,用于在每次启动或完成文件处理时发送电子邮件。
  • 任何事件存储目标
Google BigQuery
  • 电子邮件执行器,用于在每次成功完成查询时发送电子邮件。
  • Google BigQuery 执行器运行 SQL 查询。
  • 任何事件存储目标
Google Cloud Storage
  • Google Cloud Storage 执行器在读取对象后执行任务。
  • 管道终结器执行器用于在处理完所有对象的查询数据后停止流程
  • 电子邮件执行器,用于在完成处理可用对象时发送电子邮件。
  • 任何事件存储目标
JDBC 多表消费者
  • 管道终结器执行器用于在处理完所有表的查询数据后停止流程
  • 电子邮件执行器,用于在完成处理查询返回的所有数据时发送电子邮件。
  • 任何事件存储目标
JDBC 查询消费者
  • 将 "无更多数据 "事件路由到管道终结器执行器,以便在处理完查询数据后停止流程
  • 电子邮件执行器,用于在每次成功完成查询、未能完成查询或完成处理所有可用数据时发送电子邮件。
  • 任何事件存储目标
MongoDB Atlas
  • 将 "无更多数据 "事件路由到管道终结器执行器,以便在处理完查询数据后停止流程
  • 电子邮件执行器,用于在数据源完成可用数据处理后发送电子邮件。
  • 任何事件存储目标
Oracle 批量加载
  • 电子邮件执行器,用于在完成读取表中数据后发送电子邮件。
  • 任何事件存储目标
Oracle CDC
  • 电子邮件执行器为每个事件发送电子邮件。
  • 任何事件存储目标
Oracle CDC 客户
  • 电子邮件执行器每次读取重做日志中的 DDL 语句时都会发送电子邮件。
  • 任何事件存储目标
Oracle 多表消费者
  • 管道终结器执行器用于在处理完所有表的查询数据后停止流程
  • 电子邮件执行器,用于在完成处理查询返回的所有数据时发送电子邮件。
  • 任何事件存储目标
Oracle Xstream
  • 电子邮件执行器为每个事件发送电子邮件。
  • 任何事件存储目标
Salesforce
  • 管道终结器执行器,用于在处理完查询数据后停止流程
  • 电子邮件执行器,用于在完成处理查询返回的所有数据时发送电子邮件。
  • 任何事件存储目标
Salesforce Bulk API 2.0
  • 管道终结器执行器,用于在处理完查询数据后停止流程
  • 电子邮件执行器,用于在完成处理查询返回的所有数据时发送电子邮件。
  • 任何事件存储目标
SAP HANA 查询消费者
  • 将 "无更多数据 "事件路由到管道终结器执行器,以便在处理完查询数据后停止流程
  • 电子邮件执行器,用于在每次成功完成查询、未能完成查询或完成处理所有可用数据时发送电子邮件。
  • 任何事件存储目标
SFTP/FTP/FTPS 客户端
  • 电子邮件执行器,用于在每次启动或完成文件处理时发送电子邮件。
  • 管道终结器执行器,用于在处理完所有可用文件后停止流程
  • SFTP/FTP/FTPS 客户端执行器,用于移动或删除已处理的文件。
  • 任何事件存储目标
SQL Server 变化跟踪
  • 管道终结器执行器,用于在处理完可用数据后停止流程
Web 客户端
  • 管道终结器执行器,用于在处理完所有可用数据后停止流程
  • 电子邮件执行器,用于在完成处理查询返回的所有数据时发送电子邮件。
  • 任何事件存储目标

处理器事件

事件发生处理器 活动消费者
Groovy 评估程序
  • 任何逻辑执行器。
  • 任何事件存储目标
JavaScript 评估程序
  • 任何逻辑执行器。
  • 任何事件存储目标
Jython 评估程序
  • 任何逻辑执行器。
  • 任何事件存储目标
TensorFlow 评估程序
  • 任何事件存储目标
Web 客户端
  • 电子邮件执行器,用于在处理完所有可用数据后发送电子邮件。
  • 任何事件存储目标
窗口聚合器
  • 当结果超过指定阈值时,通过电子邮件通知执行器。
  • 任何事件存储目标

目标活动

事件生成目标 活动消费者
Amazon S3
  • Amazon S3 执行器来创建或复制对象,或为封闭对象添加标记。
  • Databricks Job Launcher 执行器用于在关闭对象或整个文件后运行 Databricks 应用程序。
  • 电子邮件执行器,用于在关闭对象或整个文件后发送电子邮件。
  • 任何事件存储目标
Azure Blob Storage
  • 电子邮件执行器可在关闭文件或流式传输整个文件后发送电子邮件。
  • 任何事件存储目标
Azure Data Lake Storage Gen2
  • ADLS Gen2 执行器更改文件元数据、创建空文件或在关闭文件后删除文件或目录。
  • 电子邮件执行器可在关闭文件或流式传输整个文件后发送电子邮件。
  • 任何事件存储目标
Google Cloud Storage
  • Databricks Job Launcher 执行器用于在关闭对象或整个文件后运行 Databricks 作业。
  • Google Cloud Storage 执行器在写入对象或整个文件后执行任务。
  • 电子邮件执行器,用于在关闭对象或整个文件后发送电子邮件。
  • 任何事件存储目标
本地文件系统
  • 电子邮件执行器,用于在目标关闭文件或流式传输整个文件后发送电子邮件。
  • 任何事件存储目标
SFTP/FTP/FTPS 客户端
  • 电子邮件执行器,用于在目标每次关闭文件或流式传输整个文件时发送电子邮件。
  • SFTP/FTP/FTPS 客户端执行器移动已关闭的文件。
  • 任何事件存储目标
Snowflake 文件上传程序
  • Snowflake 执行器运行查询,以便在每次目标流传输整个文件时将数据加载到 表。 Snowflake
  • 电子邮件执行器,用于在目标每次流式传输整个文件时发送电子邮件。
  • 任何事件存储目标

执行者事件

事件生成执行器 活动消费者
ADLS Gen2 文件元数据执行器
  • 电子邮件执行器,每次执行器更改文件元数据时都会发送电子邮件。
  • 任何事件存储目标
Amazon S3
  • 电子邮件执行器,每次执行器更改对象元数据时都会发送电子邮件。
  • 任何事件存储目标
Databricks Job Launcher 执行器
  • 电子邮件执行器,每次 Databricks 作业启动器执行器启动 Databricks 作业时发送电子邮件。
  • 任何事件存储目标
Databricks 查询执行器
  • 电子邮件执行器,用于在查询成功或失败时发送电子邮件。
  • 任何事件存储目标
Google BigQuery 执行人
  • 电子邮件执行器,用于在查询成功或失败时发送电子邮件。
  • 任何事件存储目标
Google Cloud Storage 执行人
  • 电子邮件执行器,用于在查询成功或失败时发送电子邮件。
  • 任何事件存储目标
JDBC 查询执行器
  • 电子邮件执行器,用于在查询成功或失败时发送电子邮件。
  • 任何事件存储目标
Snowflake 执行人
  • 电子邮件执行器,用于在查询成功或失败时发送电子邮件。
  • 任何事件存储目标

事件记录

事件记录是在发生阶段或流程事件时创建的记录。

大多数事件记录都会在记录头中传递一般事件信息,如事件发生的时间。 它们还可以在记录字段中包含特定事件的详细信息,如关闭的输出文件的名称和位置。

文件尾部生成的事件记录是个例外--它们包括记录字段中的所有事件信息。

事件记录头属性

除标准记录头属性外,大多数事件记录还包括事件信息记录头属性,如事件类型和事件发生时间。

与任何记录标题属性一样,您可以使用表达式评估器和 record:attribute 函数将记录标题属性信息作为一个字段包含在记录中。 例如,在存储事件记录时,您很可能希望在事件记录中包含事件发生的时间,此时可在表达式评估器中使用以下表达式:
${record:attribute('sdc.event.creation_timestamp')}

请注意,所有记录头属性都是字符串值。 有关使用记录标题属性的更多信息,请参阅记录标题属性

大多数事件包括以下事件记录头属性。 文件尾部是个例外,它会将所有事件信息写入记录字段。
事件记录头属性 描述
sdc.event.type 事件类型。 由生成事件的阶段定义。

有关事件生成阶段可用事件类型的信息,请参阅阶段文档。

sdc.event.version 整数,表示事件记录类型的版本。
sdc.event.creation_timestamp 舞台创建事件时的时间戳。
注: 阶段生成的事件记录因阶段而异。 有关阶段事件的描述,请参阅事件生成阶段文档中的 "事件记录"。 有关流量事件的描述,请参阅流量事件记录

目录

以下是有关数据流触发器和事件框架的要点:
  1. 您可以在任何流程中使用事件框架,使其逻辑符合您的需求。
  2. 事件框架生成与流程相关的事件和与阶段相关的事件。
  3. 您可以在独立中使用事件。
  4. 流量事件在流量开始和停止时产生。 有关详情,请参阅流程事件生成
  5. 您可以将每个流程事件类型配置为传递给单个执行器或另一个流程 ,以进行更复杂的处理。
  6. 舞台事件根据舞台的处理逻辑生成。 有关事件生成阶段的列表,请参阅阶段事件生成
  7. 事件生成事件记录 ,以传递与事件相关的信息,如关闭文件的路径。

    各阶段生成的事件记录各不相同。 有关阶段事件的描述,请参阅事件生成阶段文档中的 "事件记录"。 有关流量事件的描述,请参阅流量事件记录

  8. 在最简单的使用案例中,您可以将阶段事件记录路由到目标 ,以保存事件信息。
  9. 您可以将舞台事件记录路由到执行舞台,这样它就能在收到事件后执行任务。

    有关逻辑事件生成和执行器配对的列表,请参阅逻辑配对

  10. 您可以将处理器添加到阶段事件的事件流或事件的消费中。

    例如,您可以添加一个表达式评估器,在将事件记录写入目标之前,将事件生成时间添加到事件记录中。 或者,您可以使用流选择器将不同类型的事件记录路由到不同的执行器。

  11. 在处理舞台事件时,不能将事件流与数据流合并。
  12. 您可以使用开发数据生成器和至事件开发阶段来生成用于流程开发和测试的事件。 有关开发阶段的更多信息,请参阅开发阶段

有关如何使用事件框架的示例,请参阅本章前面的案例研究。