Write to data sources
The following Python 2.7 examples write Pandas dataframes to data sources from Jupyter notebook.
To first load data from the data sources, see Add data sources and remote data sets or Access data in relational databases.
ibm_db_sa if
it is not available already in your notebook:
!pip install --target
/user-home/_global_/python-2.7 -U ibm_db_sa
Python example for Db2 and BIGSQL
import dsx_core_utils, os, io
import pandas as pd
from sqlalchemy import create_engine
#Read csv to pandas
df_data_1 = pd.read_csv('../datasets/userdatap1.csv')
dataSet = dsx_core_utils.get_remote_data_set_info('db2Set')
dataSource =
dsx_core_utils.get_data_source_info(dataSet['datasource'])
#SQL Alchemy URL
sqla_url= "db2+ibm_db://" + dataSource['user']+ ':' +
dataSource['password'] + "@9.30.57.224:50000/SAMPLE"
#Pandas does not support many databases so we use recommended
sqlalchemy
engine = create_engine(sqla_url, pool_size=10, max_overflow=20)
conn = engine.connect()
#Write to database
df_data_1.to_sql(dataSet['table'], engine,
schema=dataSet['schema'], if_exists='replace')
#Read back again to confirm it has been written
query = 'select * from ' + dataSet['schema'] + '.' +
dataSet['table']
df_data_1 = pd.read_sql(query, con=conn)
df_data_1.head()
where db2Set is the name of the data set, 9.30.57.224 is the
sample IP of a DB2 database server, SAMPLE is the example database name and
50000 is the db2 nonssl port number, 32051 is usually the BIGSQL
port number (you can replace 50000 with it for BIGSQL), and
../datasets/userdatap1.csv is a sample csv used to create a data frame which this
written to a table.
Python example for Db2 and Db2 Warehouse on Cloud (previously known as dashDB)
import dsx_core_utils, os, io
import pandas as pd
import ibm_db_sa
from sqlalchemy import create_engine
import sqlalchemy
#Read csv to pandas
df1 = pd.read_csv('../datasets/userdatap1.csv')
dataSet = dsx_core_utils.get_remote_data_set_info('dashRDS')
dataSource =
dsx_core_utils.get_data_source_info(dataSet['datasource'])
#SQL Alchemy URL
sqla_url= "db2+ibm_db://" + dataSource['user']+ ':' +
dataSource['password'] +
"@dashdb-entry-yp-dal09-07.services.dal.bluemix.net:50000/BLUDB"
#Pandas does not support many databases so we use recommended
sqlalchemy
engine = create_engine(sqla_url, pool_size=10, max_overflow=20)
conn = engine.connect()
#dashDB map types
from sqlalchemy.types import String, Boolean
object_cols = df1.dtypes[df1.dtypes=='object'].index
bool_cols = df1.dtypes[df1.dtypes=='bool'].index
for col in bool_cols:
df1[col] = df1[col].astype(int)
dashdb_typemap = {col : String(4000) for col in object_cols }
#Write to database
df1.to_sql(name=dataSet['table'], schema=dataSet['schema'],
con=engine, if_exists='replace', dtype=dashdb_typemap,index=False)
#Read back again to confirm data has been written
query = 'select * from ' + dataSet['schema'] + '.' + tablename
df2 = pd.read_sql(query, con=conn)
df2.head()
where dashRDS is the name of the data set,
dashdb-entry-yp-dal09-07.services.dal.bluemix.net is the sample hostname/IP of a
DB2 warehouse database server, BLUDB is the example database name and
50000 is the db2 nonssl port number. ../datasets/userdatap1.csv is
a sample csv used to create a data frame which this written to a table.
Python example for Informix
import dsx_core_utils, jaydebeapi, os, io
import pandas as pd
#Read csv to pandas
df2 = pd.DataFrame(raw_data2, columns = ['I', 'I2'])
dataSet = dsx_core_utils.get_remote_data_set_info('info')
dataSource =
dsx_core_utils.get_data_source_info(dataSet['datasource'])
conn = jaydebeapi.connect(dataSource['driver_class'],
[dataSource['URL'], dataSource['user'], dataSource['password']])
#Write to informix
tablename=dataSet['table']
# Create table
#tablename='sampleTable'
#query = 'create table ' + tablename+'(i int, i2 int)'
#curs = conn.cursor()
#curs.execute(query)
# Insert the dataframe rows
for row in df2.itertuples():
srows = str(row[1:]).strip("()")
query2 = 'insert into ' + tablename + ' values('+srows+')'
curs.execute(query2)
query3 = "select * from " + tablename
df1 = pd.read_sql(query3, con=conn)
df1.head(5)
where info is the name of the data set and sampleTable is the
name of the table you would like to create.
Python example for Netezza
import dsx_core_utils, jaydebeapi, os, io
import pandas as pd
# Read data sources
dataSet = dsx_core_utils.get_remote_data_set_info('netz')
dataSource =
dsx_core_utils.get_data_source_info(dataSet['datasource'])
conn = jaydebeapi.connect(dataSource['driver_class'],
[dataSource['URL'], dataSource['user'], dataSource['password']])
# Write to Netezza
tablename=dataSet['table']
# Create table if does not exist
#tablename='sampleTable'
#query = 'create table ' + dataSet['schema'] + '.' + tablename +'(i
int)'
#curs = conn.cursor()
#curs.execute(query)
# Insert the dataframe rows
for row in df2.itertuples():
srows = str(row[1:]).strip("()")
query2 = 'insert into ' + tablename + ' values('+srows+')'
curs.execute(query2)
query3 = "select * from " + tablename
df1 = pd.read_sql(query3, con=conn)
df1.head(5)
where netz is the name of the data set and sampleTable is the
table you are writing to.
Python example for Oracle
kubectl cp -n dsxuser-999
oracle-instantclient12.2-basic-12.2.0.1.0-1.x86_64.rpm <user
jupyter pod>:/
rpm -ivh oracle-instantclient12.2-basic-12.2.0.1.0-1.x86_64.rpm
sudo sh -c "echo /opt/oracle/instantclient_12_2 >
/etc/ld.so.conf.d/oracle-instantclient.conf"
sudo ldconfig
export
LD_LIBRARY_PATH=/opt/oracle/instantclient_12_2:$LD_LIBRARY_PATH
Then in the notebook, set the environment variables:
os.environ["PATH"]=os.environ["PATH"]+
":/usr/lib/oracle/12.2/client64/lib"
os.environ["LD_LIBRARY_PATH"]=os.environ["LD_LIBRARY_PATH"]+
":/usr/lib/oracle/12.2/client64/lib"
os.environ["ORACLE_HOME"]="/usr/lib/oracle/12.2/client64"
To write a pandas dataframe to the Oracle database:
#Oracle
import dsx_core_utils, os, io
import pandas as pd
from sqlalchemy import create_engine
#Read csv to pandas
df_data_1 = pd.read_csv('../datasets/CUST_HISTORY.csv')
df_data_1.head(5)
dataSet = dsx_core_utils.get_remote_data_set_info('oracle-rds')
dataSource =
dsx_core_utils.get_data_source_info(dataSet['datasource'])
sqla_url= "oracle+cx_oracle://" + dataSource['user']+ ':' +
dataSource['password'] + "@9.87.654.321:1521/xe"
#We use recommended sqlalchemy
engine = create_engine(sqla_url, pool_size=10, max_overflow=20)
conn = engine.connect()
#Write to database (make sure the database user has write access
and where table does not exist - has drop/create)
#replace, drops and recreates the table if it does not exist.
append, appends and creates a table if it does not exist.
df_data_1.to_sql(dataSet['table'], engine,
schema=dataSet['schema'], if_exists='replace')
#Read back again to confirm it has been written
query = 'select * from '+dataSet['schema']+'.'+dataSet['table']
df_data_1 = pd.read_sql(query, con=conn)
df_data_1.head()
where oracle-rds is the name of the data set, 9.87.654.321 is
an example IP of the Oracle database host, and xe is the sample sid of the
database.
Python example for HDFS
import dsx_core_utils, requests, os, io
import pandas as pd
from sqlalchemy import create_engine
dataSet = dsx_core_utils.get_remote_data_set_info('hdfsset1')
dataSource =
dsx_core_utils.get_data_source_info(dataSet['datasource'])
filePath='../datasets/userdatap1.csv'
destFilePath='/user/user1/userdatap2.csv'
url = (dataSource['URL'][:-1] if (dataSource['URL'].endswith('/'))
else dataSource['URL']) + destFilePath + "?op=CREATE"
headers = {"Authorization": os.environ.get('DSX_TOKEN')}
response = requests.request("PUT", url, headers=headers,
timeout=10, verify=False, allow_redirects=True, data=open(filePath,
'rb').read())
where hdfsset1 is the name of the data set,
/user/user1/userdatap2.csv is the destination csv file name on HDFS,
timeout=10 is the time to wait for the write to complete, and
../datasets/userdatap1.csv is the source CSV file on DSX Local. See webHDFS
documentation for more details on file overwrite.
Python example for a custom JDBC data source
If you have a specific database vendor you have added support for then, you can write to it using the following example.
import dsx_core_utils, jaydebeapi, os, io
import pandas as pd
#Read csv to pandas
#df2 = pd.DataFrame(raw_data2, columns = ['I', 'I2'])
dataSet = dsx_core_utils.get_remote_data_set_info('custom-dbset')
dataSource =
dsx_core_utils.get_data_source_info(dataSet['datasource'])
conn = jaydebeapi.connect(dataSource['driver_class'],
[dataSource['URL'], dataSource['user'], dataSource['password']])
#Write to custom database
tablename=dataSet['table']
# Create table
#tablename='sampleTable'
#query = 'create table ' + tablename+'(i int, i2 int)'
#curs = conn.cursor()
#curs.execute(query)
# Insert the dataframe rows
for row in df2.itertuples():
srows = str(row[1:]).strip("()")
query2 = 'insert into ' + tablename + ' values('+srows+')'
curs.execute(query2)
query3 = "select * from " + tablename
df1 = pd.read_sql(query3, con=conn)
df1.head(5)
where custom-dbset is the name of the data set and sampleTable
is the name of the table you would like to create.
Python example for writing Pandas dataframes over SSL
The following Python 2.7 example writes over SSL where dashsslset is the name of
the remote data set. This code appends ;Security=ssl; to the JDBC URL.
ibm-db Python
package to 2.0.8: (!pip install --user ibm-db==2.0.9 --upgrade). Also the SSL
certificate must already be imported.import dsx_core_utils, os, io
import pandas as pd
import ibm_db_sa
from sqlalchemy import create_engine
import sqlalchemy
dataSet = dsx_core_utils.get_remote_data_set_info('dashsslset')
dataSource = dsx_core_utils.get_data_source_info(dataSet['datasource'])
dbServer = "@" + dataSource['URL'].split('/')[2] + '/' + dataSource['URL'].split('/')[3].split(':')[0]
#SQL Alchemy URL
sqla_url= "db2+ibm_db://" + dataSource['user']+ ':' + dataSource['password'] + dbServer + ";Security=ssl;"
print sqla_url
#Pandas does not support many databases so we use recommended sqlalchemy
engine = create_engine(sqla_url, pool_size=10, max_overflow=20)
conn = engine.connect()
#dashDB map types
from sqlalchemy.types import String, Boolean
object_cols = df3.dtypes[df3.dtypes=='object'].index
bool_cols = df3.dtypes[df3.dtypes=='bool'].index
for col in bool_cols:
df3[col] = df3[col].astype(int)
dashdb_typemap = {col : String(4000) for col in object_cols }
#Write to database
df3.to_sql(name=dataSet['table'], schema=dataSet['schema'], con=engine, if_exists='append', dtype=dashdb_typemap,index=False)
#Read back again to confirm data has been written
query = 'select * from ' + dataSet['schema'] + '.' + dataSet['table']
df4 = pd.read_sql(query, con=conn)
df4.head()