IBM Streams 4.3.0

Examples

These examples use the ObjectStorageScan operator.

a) Sample using bucket as submission parameter and cos application configuration with property cos.creds to specify the IAM credentials:

As endpoint is the public us-geo (CROSS REGION) the default value of the os-endpoint submission parameter.

composite Main {
    param
        expression<rstring> $bucket: getSubmissionTimeValue("os-bucket");
        expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3-api.us-geo.objectstorage.softlayer.net");
    graph
        // ObjectStorageScan operator with directory and pattern
        stream<rstring name> Scanned = com.ibm.streamsx.objectstorage::ObjectStorageScan() {
            param

                objectStorageURI: com.ibm.streamsx.objectstorage.s3::getObjectStorageURI($bucket);
                endpoint: $endpoint;
                directory: "/sample";
                pattern: ".*";
        }

        // use a ObjectStorageSource operator to process the object names
        stream<rstring line> Data = com.ibm.streamsx.objectstorage::ObjectStorageSource(Scanned) {
            param
                objectStorageURI: com.ibm.streamsx.objectstorage.s3::getObjectStorageURI($bucket);
                endpoint: $endpoint;
        }
}

b) Sample using parameters to specify the IAM credentials:

Set the objectStorageURI either in format "cos://<bucket-name>/" or "s3a://<bucket-name>/".

composite Main {
    param
        expression<rstring> $IAMApiKey: getSubmissionTimeValue("os-iam-api-key");
        expression<rstring> $IAMServiceInstanceId: getSubmissionTimeValue("os-iam-service-instance");
        expression<rstring> $objectStorageURI: getSubmissionTimeValue("os-uri");
        expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3-api.us-geo.objectstorage.softlayer.net");
    graph
        // ObjectStorageScan operator with directory and pattern
        stream<rstring name> Scanned = com.ibm.streamsx.objectstorage::ObjectStorageScan() {
            param

                IAMApiKey: $IAMApiKey;
                IAMServiceInstanceId: $IAMServiceInstanceId;
                objectStorageURI: $objectStorageURI;
                endpoint: $endpoint;
                directory: "/sample";
                pattern: ".*";
        }

        // use a ObjectStorageSource operator to process the object names
        stream<rstring line> Data = com.ibm.streamsx.objectstorage::ObjectStorageSource(Scanned) {
            param
                IAMApiKey: $IAMApiKey;
                IAMServiceInstanceId: $IAMServiceInstanceId;
                objectStorageURI: $objectStorageURI;
                endpoint: $endpoint;
        }
}