Understanding Datasource Consumers
On the other side of the data flow, the API defines the DataSourceConsumer interface. A datasource consumer can read the data from a datasource and process it, for example, to write its content in an Excel file or send it to an SQL database.
A datasource consumer is simply a function that can process each entity source emitted by a datasource, as in this example of a datasource consumer that collects entity records and associated issues into in-memory collections:
import com.decisionbrain.gene.dataintergation.core.DataSourceConsumer;
public class CollectionConsumer implements DataSourceConsumer<EntityRecord> {
public final ArrayList<EntityRecord> activities = new ArrayList<>();
public final ArrayList<EntityRecord> plants = new ArrayList<>();
public final ArrayList<Issue> entityRecords = new ArrayList<>();
@Override
void accept(Result<EntitySource<EntityRecord>> entitySourceResult) {
issues.addAll(entitySourceResult.getIssues());
entitySourceResult.forEach(entitySource ->
entitySource.read(entityRecordResult -> {
issues.addAll(entityRecordResult.getIssues());
entityRecordResult.forEach(entityRecord ->
switch (entitySource.getEntityName()) {
case "Activity":
activities.add(entityRecord);
break;
case "Plant":
plants.add(entityRecord);
break;
default:
// Do nothing
break;
}
);
})
);
}
}
As you can see, if all you want to do with issues is to collect them and only process entity records, the process can be tedious since you need to unpack all Result wrappers. To avoid doing all this work, you can make your consumer extend the IssueCollectorConsumer abstract class that will take care of all this processing on your behalf:
import com.decisionbrain.gene.dataintergation.core.DataSourceConsumer;
import com.decisionbrain.gene.dataintergation.core.IssueCollectorConsumer;
public class CollectionConsumer
extends IssueCollectorConsumer<EntitySource<EntityRecord>>
implements DataSourceConsumer<EntityRecord> {
public final ArrayList<EntityRecord> activities = new ArrayList<>();
public final ArrayList<EntityRecord> plants = new ArrayList<>();
public CollectionConsumer() {
super(new ArrayList<>());
}
@Override
void acceptItem(EntitySource<EntityRecord> entitySource) {
entitySource.read(entityRecordResult -> new IssueCollectorConsumer(issues) {
@Override
void acceptItem(EntityRecord entityRecord) {
switch (entitySource.getEntityName()) {
case "Activity":
activities.add(entityRecord);
break;
case "Plant":
plants.add(entityRecord);
break;
default:
// Do nothing
break;
}
}
});
}
}