MongoDB Atlas CDC
The MongoDB Atlas CDC source reads changes from a MongoDB Atlas change stream or oplog. For information about supported versions, see Supported systems and versions.
The MongoDB Atlas CDC source includes the CRUD operation type in a record header attribute so generated records can be easily processed by CRUD-enabled targets. For an overview of Data Collector changed data processing and a list of CRUD-enabled targets, see Processing changed data.
When you configure the source, you define connection information, such as the connection string and credentials to use. You can specify SSL/TLS properties for an SSL/TLS-enabled MongoDB cluster. You can also use a connection to configure the source.
You can configure where the source reads changes from, initial offset, and read preference. You can define a custom filter and configure the source to flatten nested structures.
You can optionally configure advanced options that determine how the source connects to MongoDB, such as the maximum number of open connections to allow in the connection pool and the cursor type to use for capped connections.
When the flow stops, the source notes where it stops reading. When the flow starts again, the source continues processing from the last-saved offset by default. You can reset the offset to process all available data.
Credentials
Based on the authentication used by MongoDB, configure the MongoDB Atlas CDC source to use no authentication, username/password authentication, or LDAP authentication. By default, no authentication is used.
- Authentication method
- Specify the authentication to use with the Authentication Method property on
the Credentials tab:
- None
- Username / Password
- LDAP
- Connection string
- If you prefer, you can specify credentials in the connection string on the Connection tab. However, specifying credentials on the Credentials tab is the recommended method.
Read preferences
You can configure the read preference that the MongoDB Atlas CDC source uses. The read preference determines how the source reads data from different members of the MongoDB replica set.
- Primary - Requires reading from the primary member.
- Primary Preferred - Prefers reading from the primary, but allows reads from a secondary member.
- Secondary - Requires reading from a secondary member.
- Secondary Preferred - Prefers reading from a secondary, but allows reads from a primary when necessary.
- Nearest - Reads from the member with the least network latency.
By default, the source uses Secondary Preferred to avoid making unnecessary requests to the primary member.
Generated records
The MongoDB Atlas CDC source generates records based on data from a MongoDB change stream or the MongoDB oplog and adds CRUD and CDC related record header attributes.
The structure of oplog records is unique, so when necessary, you might use processors in the flow to convert record structure.
For example, for insert records, record data resides in a map field named
o. But for an update record, the _id field is part
of the o2 map field. To merge the record data, you can use a Field
Flattener to flatten the map fields and a Field Remover to remove any unnecessary
fields.
For more information about the oplog record structure, see the MongoDB documentation.
CRUD operation and CDC header attributes
The MongoDB Atlas CDC source includes the CRUD operation type in the sdc.operation.type record header attribute.
If you use a CRUD-enabled target in the flow such as JDBC Producer or Elasticsearch, the target can use the operation type when writing to target systems. When necessary, you can use an Expression Evaluator processor or any scripting processor to manipulate the value in the header attribute. For an overview of Data Collector changed data processing and a list of CRUD-enabled targets, see Processing changed data.
- 1 for INSERT
- 2 for DELETE
- 3 for UPDATE
- 5 for unsupported operations, such as CMD, NOOP, or DB, which are available MongoDB operation types but not applicable to record data.
- 7 for REPLACENote: REPLACE is supported only when the source is configured to read from a MongoDB change stream.
- op - The CRUD operation using the following values:
- i for INSERT
- u for UPDATE
- d for DELETE
- ns - The namespace, using the following format:
<database>:<collection>.
Enabling SSL/TLS
- Atlas/System CA - Connects to a MongoDB Atlas cluster. You can also use this when your certificates or keys have already been specified at the JVM level.
- Server Validation (1 Way TLS) - Connects to an SSL/TLS-enabled MongoDB Enterprise Server cluster when the client needs to validate the server certificate and does not need to prove client identity.
- Server and Client Validation (2 Way TLS) - Connects to an SSL/TLS-enabled MongoDB Enterprise Server cluster when the client needs to validate the server certificate and the server also validates the client key. This occurs when the cluster is set up to require client certificates.
- JKS (Java Keystore)
- PEM (text-based)
- DER (text-based)
- PKCS #7 / P7B
- PKCS #12 / P12 / PFX
- Private keys inside PEM, DER, or PKCS #12 encoded as PKCS#1 or PKCS#8
If the files are in PEM or DER plain text format, you can
provide the text in the stage properties. The certificate should begin and end with text
such as: —BEGIN CERTIFICATE— or —END PRIVATE KEY—.
Otherwise, you provide a path to the certificate file.
MongoDB data types
When the MongoDB Atlas CDC source reads from MongoDB, it converts standard MongoDB data types to the following Data Collector data types.
The source can also convert supported BSON types to Data Collector data types. For more information, see Reading BSON types.
| Standard MongoDB Type | Data Collector Type |
|---|---|
| Array | List |
| Binary | Byte Array |
| Boolean | Boolean |
| Date | Date |
| Double | Double |
| Int32 | Integer |
| Int64 | Long |
| JavaScript | String |
| Object | List-Map |
| String | String |
| Timestamp | Datetime |
Reading BSON types
When reading from MongoDB, the MongoDB Atlas source converts standard MongoDB data types to Data Collector data types as described in MongoDB data types.
The source converts supported BSON data types to Data Collector
data types as well. When converting BSON data types, the source adds a field attribute
named bsonType to the converted field.
Some supported
BSON data types encode additional information with the data. Where this occurs, the
information is included as additional attributes for the field. For example, a
BsonTimestamp can encode an ordinal value along with the date and time. When the source
reads the data, it converts the field to a Datetime field with an
ordinal field attribute set to the ordinal value encoded with the
data.
| BSON Data Type | Data Collector Type | Field Attributes and Values |
|---|---|---|
| Binary | Byte Array | bsonType: Binary |
| BsonDbPointer | Map field with the following subfields:
|
bsonType: Bson_Db_Pointer |
| BsonRegularExpression | String |
|
| BsonTimestamp | Datetime |
|
| Code | String | bsonType: Code |
| CodeWithScope | String | bsonType: Code_With_Scope |
| DBRef | Map field with the following subfields:
|
bsonType: Db_Ref |
| Decimal128 | Decimal | bsonType: Decimal128 |
| Null | String with null value | bsonType: Null |
| ObjectId | String containing the 24-character hexadecimal value of the Object Id |
|
| Symbol | String | bsonType: Symbol |
| Undefined | String with null value | bsonType: Undefined |