Using Hadoop utilities

You can use utility methods and functions for Python and R language.

For Python, you can do the following tasks:

For R language, use the following utility methods in an RStudio environment:

R language utility methods

To use the R language utility methods, run:

library(HadoopLibUtilsR)

listDSXHILivyEndpoints

This utility lists all Livy endpoints defined by Hadoop registrations.

  #' @return endpoints array of livy endpoints

Example:

HadoopLibUtilsR::listDSXHILivyEndpoints()

getLivyConfig

This utility is useful if you need to modify additional configurations before connecting to Livy.

  #' @param ignore_ssl Optional. checks hostname verification. TRUE to ignore ssl, FALSE to not
  #' @param cainfo Optional. Certificate required if ignore_ssl is FALSE
  #' @return a livy configuration object that can be used with spark context

Example:

conf <- HadoopLibUtilsR::getLivyConfig()
conf$spark.executor.cores <- 1
conf$spark.executor.instances <- 2
conf$spark.executor.memory <- '1024m'
conf$spark.driver.memory <- '2048m'

createLivySparkContext

Create a Livy Spark context to a remote Hadoop system.

  #' @param livy_endpoint: Required livy endpoint to connect to 
  #' @param ignore_ssl: Optional.   checks hostname verification. TRUE to ignore ssl, FALSE to not
  #' @param cainfo input Optional. Certificate required if ignore_ssl is FALSE
  #' @param config input. Optional.  If a config object is provided, then this is used AS IS.  This does not additionally set ignore_ssl or cainfo settings.
  #' @return A Spark context

Example:

sc <- HadoopLibUtilsR::createLivySparkContext(livy_endpoint="https://myhadoopsytem/livy2/v1")

or

sc <- HadoopLibUtilsR::createLivySparkContext(livy_endpoint=livy_endpoint, config = conf)

disconnectLivySparkContext

This utility disconnects the Spark context.

#' @param sc input spark_context

Example: HadoopLibUtilsR::disconnectLivySparkContext(sc)

uploadHDFSFile

This utility pploads a local file to remote HDFS using webHDFS.

  #' @param webhdfsurl: Required. The webHDFS url for the remote system
  #' @param source: Required.  Local path indicate location of the resource to be uploaded. This needs to be accessible from within the RStudio file system.
  #' @param target:  Required.  Full webhdfs path (example: `/user/user1/datasets/myfile.csv`) for where the local resource to be uploaded to.
  #' @param overwrite: Optional. Default to TRUE. Set TRUE to overwrite content of the target, FALSE to not
  #' @param debug: Optional. Default to FALSE. Set TRUE to get additional error messages received error.

Example:

HadoopLibUtilsR::uploadHDFSFile(webhdfsurl="https://myhadoopsystem/webhdfs/v1", source="localfile.csv", target="/user/user1/hadoop/uploaded.csv", overwrite=TRUE, debug=FALSE)

downloadHDFSFile

Use this utility to download a file from a remote HDFS to a local system using webHDFS.

  #' @param webhdfsurl: Required. The webHDFS url for the remote system
  #' @param source: Required. Full webhdfs path (example: `/user/user1/datasets/myfile.csv`) to the resource to download.
  #' @param target:  Required. Local path indicate where the resource to be downloaded to.
  #' @param overwrite: Optional. Default to TRUE. Set TRUE to overwrite content of the target, FALSE to not
  #' @param debug: Optional. Default to FALSE. Set TRUE to get additional error messages received error.

Example:

HadoopLibUtilsR::downloadHDFSFile(webhdfsurl="https://myhadoopsystem/webhdfs/v1", source="/user/user1/hadoop/todownload.csv", target="localcopy.csv")

Python tasks

Manage models on HDFS with Hadoop integration utility methods

With the available Hadoop integration utility methods, you can manage models on HDFS, including building models on Hadoop (where the data resides) and pulling models in for saving (without needing to pull all of the data). For usage details, see the sample notebooks.

hadoop_lib_utils

Use the following utility methods in a Watson Studio session and notebook.

def get_hdfs_model_info(webhdfsurl, model_name, version=-1, source_hdfs_dir=None):
    """
    Return basic metadata about the serialized model (on HDFS) for the
    specified model name, if a serialized model can be found.

    :param webhdfsurl Web HDFS URL for a remote Hadoop system on which to
           operate.

    :param model_name: Name of an **HDFS** model on which to operate. This
           name is evaluated w.r.t. the source HDFS dir (see below) and can
           therefore be a relative path.

    :param version: (Optional) Remote model version on which to operate. If
           not specified, defaults to latest version.

    :param source_hdfs_dir: (Optional) HDFS directory in which to operate.
           If not specified, defaults to `/user/<user>/.dsxhi/models/`.

    :returns: A metadata object (JSON / dict) holding basic information about
           the serialized model object, if it was serialized successfully.
    """

def load_model_from_hdfs(webhdfsurl, model_name,
    version=-1, source_hdfs_dir=None, model_load_func=None):
    """
    Find the serialized model (on HDFS) for the specified model name and, if
    it exists, read the model into memory.

    :param webhdfsurl Web HDFS URL for a remote Hadoop system on which to
           operate.

    :param model_name: Name of an **HDFS** model on which to operate. This
           name is evaluated w.r.t. the source HDFS dir (see below) and can
           therefore be a relative path.

    :param version: (Optional) Remote model version on which to operate. If
           not specified, defaults to latest version.

    :param source_hdfs_dir: (Optional) HDFS directory in which to operate.
           If not specified, defaults to `/user/<user>/.dsxhi/models/`.

    :param model_load_func: (Optional) User-specified function to use for
           loading the model into memory. If specified the function must
           be defined to accept a single argument, **`staging_path`**,
           which is a temporary local path from which the model can be loaded.
           If not specified, a default model load function will be used based
           on the model type stored in the HDFS model metadata.

    :returns: An in-memory object representing the model that was read de-
           serialized from a path on HDFS.
    """ 

hi_core_utils

Use the following utility methods in a remote livy session.

def install_packages(yaml=None, yaml_files=[], target_dir=None, force_gcc=False):
    """
    Use "conda-env" to install one or more conda yaml files into a temp conda
    environment, and then add the "site-packages" from that environment into
    the running python's active search path.

    :param yaml: Optional yaml content in the form of a string literal, to be
           installed into the temp env. If this parameter is specified along
           with yaml_files, this yaml content will be installed *before* the
           specified yaml files.

    :param yaml_files: Optional array of file paths (local to the file system
           of the running python process) to be installed into the temp env,
           one after the other. The files will be installed in the order in
           which they are received.  If this parameter is specified along with
           the "yaml" parameter, these yaml files will be installed *after* the
           specified yaml string literal.

    :param target_dir: Optional target directory (local to the file system of
           the running python process) into which to install the temp env.
           If not specified a temporary directory with a unique name will be
           autogenerated for the temp env.

    :param force_gcc: Optionally force installation of "gcc" and "gxx" packages
           before installing the specified yaml files. If this is not specified
           or is specified as False, this function will search the specified
           yamls for a pip install directive ("- pip:") and will *only* install
           the "gcc" and "gxx" packages if at least one of the specified yamls
           has that directive.
    """
def get_hdfs_model_info(model_name, version=-1, source_hdfs_dir=None):
    """
    Return basic metadata about the serialized model (on HDFS) for the
    specified model name, if a serialized model can be found.

    :param model_name: Name of an **HDFS** model on which to operate. This
           name is evaluated w.r.t. the source HDFS dir (see below) and can
           therefore be a relative path.

    :param version: (Optional) Remote model version on which to operate. If
           not specified, defaults to latest version.

    :param source_hdfs_dir: (Optional) HDFS directory in which to operate.
           If not specified, defaults to `/user/<user>/.dsxhi/models/`.

    :returns: A metadata object (JSON / dict) holding basic information about
           the serialized model object, if it was serialized successfully.
    """
    return hi_util.get_hdfs_model_info(model_name, version, source_hdfs_dir)

def load_model_from_hdfs(model_name,
    version=-1, source_hdfs_dir=None, model_load_func=None):
    """
    Find the serialized model (on HDFS) for the specified model name and, if
    it exists, read the model into memory.

    :param model_name: Name of an **HDFS** model on which to operate. This
           name is evaluated w.r.t. the source HDFS dir (see below) and can
           therefore be a relative path.

    :param version: (Optional) Remote model version on which to operate. If
           not specified, defaults to latest version.

    :param source_hdfs_dir: (Optional) HDFS directory in which to operate.
           If not specified, defaults to `/user/<user>/.dsxhi/models/`.

    :param model_load_func: (Optional) User-specified function to use for
           loading the model into memory. If specified the function must
           be defined to accept two arguments: **`hdfs_path`** and
           **`staging_path`**.  The arguments point to the serialized model
           on HDFS and also on the local file system of the Hadoop "driver"
           node, respectively.  The user-given function can use whichever of
           those paths is appropriate for its operation / model type. If not
           specified, a default model load function will be used based on the
           model type stored in the HDFS model metadata.

    :returns: An in-memory object representing the model that was read de-
           serialized from a path on HDFS.
    """

def write_model_to_hdfs(model, model_name, target_hdfs_dir=None,
    with_new_ver=True, model_write_func=None):
    """
    Serialize the specified model to a location on HDFS and correlate that
    serialized model with the specified model name.

    :param model: In-memory model on which to operate.
    :param model_name: Name of an **HDFS** model on which to operate. This
           name is evaluated w.r.t. the target HDFS dir (see below) and can
           therefore be a relative path.

    :param target_hdfs_dir: (Optional) HDFS directory in which to operate.
           If not specified, defaults to `/user/<user>/.dsxhi/models/`.

    :param with_new_ver: (Optional) Whether or not to write the model with
           a new version number. Defaults to `True`.

    :param model_write_func: (Optional) User-specified function to use for
           writing the model to disk. If specified the function must be
           defined to accept three arguments: **`model`**, **`hdfs_path`**
           and **`staging_path`**.  The first argument points to the in-memory
           object to be written.  The 2nd and 3rd arguments hold a remote
           (HDFS) target path and a local (on the Hadoop "driver" node) path,
           respectively.  The user-given function can use whichever **one** of
           those paths is appropriate for its operation / model type.  (If
           both paths are used, the `staging_path` will take precedent; the
           `hdfs_path` will be ignored.)  If not specified, a default model
           write function will be used based on the python type of the given
           in-memory model.

    :returns: A metadata object (JSON / dict) holding basic information about
           the serialized model object, if it was serialized successfully.
    """

def run_command(command, sleep_after=None, echo_output=True):
    """
    Execute a specified command using Popen, wait for the command to complete,
    then optionally echo the command output (via "print()") before returning it.

    :param command: System command to execute within the Yarn container of
           the Hadoop "driver" node.

    :param sleep_after: (Optional) Amount of time, in seconds, to wait after
           executing the command. Defaults to `None` (i.e. don't wait).

    :param echo_output: (Optional) Whether or not to echo (via "print") the
           output of the command, if there is any.  Defaults to `True`.

    :returns: The output from the executed command, as a string, or None if
           there was no output (or if the output was an empty string).
    """

The following example is for JEG notebooks running on an environment created from a pushed image.

# run this cell only if sc is not initilized or seems to be undefined
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.getOrCreate()
sc = SparkContext.getOrCreate()
import os

# run the cell with the following line uncommented if the import for the package fails
del os.environ["HI_WORKING_PKG_INSTALL_DIR"]

# hi_utils_lib = os.getenv("HI_UTILS_PATH", "")
hi_utils_lib = "/user/dsxhi/environments/pythonAddons/hi_core_utils.zip"

sc.addPyFile("hdfs://{}".format(hi_utils_lib))
import hi_core_utils
# installing conda package: fiona
hi_core_utils.install_packages('''
  name: test-it
  channels:
    - defaults
  dependencies:
    - fiona''')
import fiona

The following example is for livy notebooks.

%%spark -s $session_name
import os
del os.environ["HI_WORKING_PKG_INSTALL_DIR"]
hi_utils_lib = os.getenv("HI_UTILS_PATH", "")
sc.addPyFile("hdfs://{}".format(hi_utils_lib))
import hi_core_utils
hi_core_utils.install_packages('''
  name: test-it
  channels:
    - defaults
  dependencies:
    - fiona''')

Transfer files with an HDP cluster

You can transfer files between the HDP cluster and the Watson Studio cluster using Python utility functions or from a Python environment within your notebook.

Python example

Restriction: The WebHDFS URL must always end in a forward slash, for example, https://9.87.654.321:50070/gateway/dsx/webhdfs/v1/.

To download files from the HDP cluster to Watson Studio:

hadoop_lib_utils.download_hdfs_file("https://9.87.654.321:50070/gateway/dsx/webhdfs/v1/",
"/user/user1/sample_07.csv",
"/user-home/1001/DSX_Projects/sample_07.txt")

To upload files from Watson Studio to the HDP cluster:

hadoop_lib_utils.upload_hdfs_file("https://9.87.654.321:50070/gateway/dsx/webhdfs/v1/",
"/user-home/1001/DSX_Projects/sample_07.txt",
"/user/user1/sample_07.csv" )

Parent topic: Apache Hadoop