Understanding Datasources
In the data integration API, the DataSource interface is implemented by objects that can be used to read entity data from a source. Such a source can be, for example, an Excel file or an SQL database.
The process of reading data happens in two levels. First, the main datasource emits a series of sub datasources dedicated to one type of entity and implementing the EntitySource interface. Then every instance of EntitySource emits a series of instances of EntityRecord. For example, while reading an Excel file, you will have one instance of DataSource for the whole file which will emit one instance of EntitySource for each spreadsheet, which in turn will emit one instance of EntityRecord for each line in the spreadsheet.
The example below shows how to write a datasource to read data from the database using JPA:
import com.decisionbrain.gene.dataintegration.core.DataSource;
public class JpaDataSource implements DataSource {
private final ActivityRepository activityRepository;
private final PlantRepository plantRepository;
public JpaDataSource(
ActivityRepository activityRepository,
PlantRepository plantRepository
) {
this.activityRepository = activityRepository;
this.plantRepository = plantRepository;
}
@Override
public void read(DataSourceConsumer<EntityRecord> consumer) {
consumer.accept(Result.some(new ActivitySource()));
consumer.accept(Result.some(new PlantSource()));
}
class ActivitySource extends EntitySource<EntityRecord> {
@Override
String getEntityName() {
return "Activity";
}
@Override
public Location getLocation() {
return new Location() {
@Override
public String toString() {
return "Activity repository";
}
};
}
@Override
void read(Consumer<Result<EntityRecord>> consumer) {
for (Activity activity: activityRepository.findAll()) {
consumer.accept(Result.some(EntityRecord.of(List.of(
EntityRecordEntry.of("id", activity.id),
EntityRecordEntry.of("startDate", activity.startDate),
EntityRecordEntry.of("endDate", activity.endDate),
EntityRecordEntry.of("plant.id", activity.plant.id),
EntityRecordEntry.of("plant.countryCode", activity.plant.countryCode)
))));
}
}
}
class PlantSource extends EntitySource<EntityRecord> {
@Override
String getEntityName() {
return "Plant";
}
@Override
public Location getLocation() {
return new Location() {
@Override
public String toString() {
return "Plant repository";
}
};
}
@Override
void read(Consumer<Result<EntityRecord>> consumer) {
for (Plant plant: plantRepository.findAll()) {
consumer.accept(Result.some(EntityRecord.of(List.of(
EntityRecordEntry.of("id", plant.id),
EntityRecordEntry.of("countryCode", plant.countryCode)
))));
}
}
}
}