streamsx.hbase package

HBASE integration for IBM Streams

For details of implementing applications in Python for IBM Streams and the Streaming Analytics service running on IBM Cloud see:

This package exposes SPL operators in the com.ibm.streamsx.hbase toolkit as Python methods.

Overview

Provides functions to access HBASE.

HBASE configuration file

The host name and the port of hadoop server has to be specified with the environment variable HADOOP_HOST_PORT.

For example:

export HADOOP_HOST_PORT=hdp264.fyre.ibm.com:8020

The package creates a HBase configuration file (hbase-site.xml) from a template.

And replaces the hadoop server name and the port with values from environment variable HADOOP_HOST_PORT.

Alternative is specify the location of HBase configuration file hbase-site.xml with the environment variable HBASE_SITE_XML.

For example:

export HBASE_SITE_XML=/usr/hdp/current/hbase-client/conf/hbase-site.xml

Sample

A simple Streams application that scans a HBASE table and prints the scanned rows:

from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology.context import submit
import streamsx.hbase as hbase

topo = Topology('hbase_scan_sample')

scanned_rows = hbase.scan(topo, table_name='sample', max_versions=1 , init_delay=2)
scanned_rows.print()

cfg = {}
cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
submit ('DISTRIBUTED', topo, cfg)
class streamsx.hbase.HBaseGet(tableName, rowAttrName, connection=None, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, **options)

HBaseGet gets tuples from an HBase table.

Example, gets tuples from HBase table ‘streamsSample_lotr’:

inputStream = _create_stream_for_get(topo)
# crete a query stream

output_schema = StreamSchema('tuple<rstring who, rstring colF, rstring colQ, rstring value, int32 numResults>')

options = {
  'columnFamilyAttrName' : 'colF',
  'columnQualifierAttrName' : "colQ",
  'outAttrName' : "value" ,
  'outputCountAttr' : 'numResults',
  'maxVersions' : 0
}

get_rows = inputStream.map(hbase.HBaseGet(tableName=_get_table_name(), rowAttrName='who', schema=output_schema, **options))
hbaseSite

The hbaseSite specifies the path of hbase-site.xml file.

Type:dict|str
tableName

The name of HBase table.

Type:str
schema

Output schema, defaults to CommonSchema.String

Type:StreamSchema
options

The additional optional parameters as variable keyword arguments.

Type:kwargs
authKeytab

The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.

Type:str
authPrincipal

The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.

Type:str
columnFamilyAttrName

Name of the attribute on the input tuple containing the columnFamily. Cannot be used with staticColumnFmily.

Type:str
columnQualifierAttrName

Name of the attribute on the input tuple containing the columnQualifier. Cannot be used with staticColumnQualifier.

Type:str
hbaseSite

The hbaseSite parameter specifies the path of hbase-site.xml file. This is the recommended way to specify the HBASE configuration.

Type:str
maxVersions

The optional parameter maxVersions specifies the maximum number of versions that the operator returns. It defaults to a value of one. A value of 0 indicates that the operator gets all versions.

Type:int
minTimestamp

The optional parameter minTimestamp specifies the minimum timestamp that is used for queries. The operator does not return any entries with a timestamp older than this value. Unless you specify the maxVersions parameter, the opertor returns only one entry in this time range.

Type:int
outAttrName

The optional parameter outAttrName specifies the name of the attribute of the output port in which the operator puts the retrieval results. The data type for the attribute depends on whether you specified a columnFamily or columnQualifier.

Type:str
outputCountAttr

The optional parameter outputCountAttr specifies the name of attribute of the output port where the operator puts a count of the values it populated.

Type:str
populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation.

Parameters:
  • topology – Topology containing the composite map.
  • stream – Stream to be transformed.
  • schema – Schema passed into map.
  • name – Name passed into map.
  • **options – Future options passed to map.
Returns:

Single stream representing the transformation of stream.

Return type:

Stream

rowAttrName

Name of the attribute on the input tuple containing the row. It is required.

Type:str
staticColumnFamily

If this parameter is specified, it will be used as the columnFamily for all operations. (Compare to columnFamilyAttrName.) For HBASEScan, it can have cardinality greater than one.

Type:str
staticColumnQualifier

If this parameter is specified, it will be used as the columnQualifier for all tuples. HBASEScan allows it to be specified multiple times.

Type:str
tableName

‘tableName’ or ‘tableNameAttribute’. Cannot be used with ‘tableNameAttribute’. If the table does not exist, the operator will throw an exception.

Type:str
Type:Name of the HBASE table. It is an optional parameter but one of these parameters must be set in opeartor
tableNameAttribute

Name of the attribute on the input tuple containing the tableName. Use this parameter to pass the table name to the operator via input port. Cannot be used with parameter ‘tableName’. This is suitable for tables with the same schema.

Type:str
vmArg

The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.

Type:str
class streamsx.hbase.HBasePut(tableName, rowAttrName, valueAttrName, connection=None, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, **options)

HBasePut puts the incoming tuples into an Hbase table.

Example, puts tuples into HBase table ‘streamsSample_lotr’:

import streamsx.hbase as hbase

output_schema = StreamSchema('tuple<rstring who, rstring infoType, rstring requestedDetail, rstring value, int32 numResults>')

HBasePutParameters = {
  'columnFamilyAttrName' : 'infoType',
  'columnQualifierAttrName' : "requestedDetail",
  'HBasePutParameters' : "value" ,
 }

put_rows = inputStream.map(hbase.HBasePut(tableName='streamsSample_lotr', rowAttrName='who', schema=output_schema, **HBasePutParameters))

get_rows.print()
hbaseSite

The hbaseSite specifies the path of hbase-site.xml file. .

Type:dict|str
schema

Output schema, defaults to CommonSchema.String

Type:StreamSchema
options

The additional optional parameters as variable keyword arguments.

Type:kwargs
Timestamp

This parameter specifies the timestamp in milliseconds (INT64). The timestamp allows for versioning of the cells. Everytime HBaes make a PUT on a table it set the timestamp. By default this is the current time in milliseconds, but you can set your own timestamp as well with this parametr. Cannot be used with TimestampAttrName.

Type:int
TimestampAttrName

Name of the attribute on the input tuple containing the timestamp in milliseconds. Cannot be used with Timestamp.

Type:str
authKeytab

The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.

Type:str
authPrincipal

The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.

Type:str
batchSize

This parameter has been deprecated as of Streams 4.2.0. The enableBuffer parameter should be used instead.

Type:int
checkAttrName

Name of the attribute specifying the tuple to check for before applying the Put or Delete. The type of the attribute is tuple with attributes columnFamily and columnQualifier, or a tuple with attributes columnFamily, columnQualifier, and value. In the first case, the Put or Delete will be allowed to proceed only when there is no entry for the row, columnFamily, columnQualifer combination. When the the type of the attribute given by checkAttrName contains an attribute value, the Put or Delete operation will only succeed when the entry specified the row, columnFamily, and columnQualifier has the given value.

Type:str
columnFamilyAttrName

Name of the attribute on the input tuple containing the columnFamily. Cannot be used with staticColumnFmily.

Type:str
columnQualifierAttrName

Name of the attribute on the input tuple containing the columnQualifier. Cannot be used with staticColumnQualifier.

Type:str
enableBuffer

When set to true, this parameter can improve the performance of the operator because tuples received by the operator will not be immediately forwarded to the HBase server.

Type:bool
hbaseSite

The hbaseSite parameter specifies the path of hbase-site.xml file. This is the recommended way to specify the HBASE configuration.

Type:str
populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation.

Parameters:
  • topology – Topology containing the composite map.
  • stream – Stream to be transformed.
  • schema – Schema passed into map.
  • name – Name passed into map.
  • **options – Future options passed to map.
Returns:

Single stream representing the transformation of stream.

Return type:

Stream

rowAttrName

Name of the attribute on the input tuple containing the row. It is required.

Type:str
staticColumnFamily

If this parameter is specified, it will be used as the columnFamily for all operations. (Compare to columnFamilyAttrName.) For HBASEScan, it can have cardinality greater than one.

Type:str
staticColumnQualifier

If this parameter is specified, it will be used as the columnQualifier for all tuples. HBASEScan allows it to be specified multiple times.

Type:str
successAttr

Attribute on the output port to be set to true if the check passes and the action is successful.

Type:str
tableName

‘tableName’ or ‘tableNameAttribute’. Cannot be used with ‘tableNameAttribute’. If the table does not exist, the operator will throw an exception.

Type:str
Type:Name of the HBASE table. It is an optional parameter but one of these parameters must be set in opeartor
tableNameAttribute

Name of the attribute on the input tuple containing the tableName. Use this parameter to pass the table name to the operator via input port. Cannot be used with parameter ‘tableName’. This is suitable for tables with the same schema.

Type:str
valueAttrName

This parameter specifies the name of the attribute that contains the value that is put into the table. It is required.

Type:str
vmArg

The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.

Type:str
class streamsx.hbase.HBaseScan(tableName, connection=None, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, name=None, **options)

HBaseScan operator scans an HBase table. Like the FileSource operator, it has an optional input port. If no input port is specifed, then the operator scans the table according to the parameters that you specify, and sends the final punctuation.

Example, puts tuples into HBase table ‘streamsSample_lotr’:

import streamsx.hbase as hbase

output_schema = StreamSchema('tuple<rstring who, rstring infoType, rstring requestedDetail, rstring value, int32 numResults>')

HBasePutParameters = {
  'columnFamilyAttrName' : 'infoType',
  'columnQualifierAttrName' : "requestedDetail",
  'HBasePutParameters' : "value" ,
 }

scanned_rows = inputStream.map(hbase.HBaseScan(tableName='streamsSample_lotr', rowAttrName='who', schema=output_schema, **HBasePutParameters))

scanned_rows.print()
hbaseSite

The hbaseSite specifies the path of hbase-site.xml file. .

Type:dict|str
schema

Output schema, defaults to CommonSchema.String

Type:StreamSchema
options

The additional optional parameters as variable keyword arguments.

Type:kwargs
authKeytab

The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.

Type:str
authPrincipal

The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.

Type:str
channel

If this operator is part of a parallel region, it shares the work of scanning with other operators in the region. To do this, set this parameter value by calling getChannel(). This parameter is required if the maximum number of channels has a value other than zero.

Type:int
endRow

This parameter specifies the row to use to stop the scan. The row that you specify is excluded from the scan.

Type:str
hbaseSite

The hbaseSite parameter specifies the path of hbase-site.xml file. This is the recommended way to specify the HBASE configuration.

Type:str
initDelay

The parameter initDelay specifies the time to wait in seconds before the operator HBASEScan reads the first row. The default value is 0 .

Type:float
maxChannels

If this operator is part of a parallel region, set this parameter value by calling getMaxChannels(). If the operator is in a parallel region, then the regions to be scanned are divided among the other copies of this operator in the other channels. If this parameter is set, you must also set the channel parameter.

Type:int
maxThreads

Maximum number of threads to use to scan the table. Defaults to one.

Type:int
maxVersions

This parameter specifies the minimum timestamp that is used for queries. The operator does not return any entries with a timestamp older than this value. Unless you specify the maxVersions parameter, the opertor returns only one entry in this time range.

Type:int
minTimestamp

This parameter specifies the minimum timestamp that is used for queries. The operator does not return any entries with a timestamp older than this value. Unless you specify the maxVersions parameter, the opertor returns only one entry in this time range.

Type:int
outAttrName

This parameter specifies the name of the attribute in which to put the value. It defaults to value. If the attribute is a tuple data type, the attribute names are used as columnQualifiers. If multiple families are included in the scan and they have the same columnQualifiers, there is no way of knowing which columnFamily was used to populate a tuple attribute.

Type:str
outputCountAttr

This parameter specifies the output attribute in which to put the number of results that are found. When the result is a tuple, this parameter value is the number attributes that were populated in that tuple.

Type:str
populate(topology, stream, **options)

Populate the topology with this composite source.

Parameters:
  • topology – Topology containing the source.
  • name – Name passed into source.
  • **options – Future options passed to source.
Returns:

Single stream representing the source.

Return type:

Stream

rowPrefix

This parameter specifies that the scan only return rows that have this prefix.

Type:str
startRow

This parameter specifies the row to use to start the scan. The row that you specify is included in the scan.

Type:str
staticColumnFamily

If this parameter is specified, it will be used as the columnFamily for all operations. (Compare to columnFamilyAttrName.) For HBASEScan, it can have cardinality greater than one.

Type:str
staticColumnQualifier

If this parameter is specified, it will be used as the columnQualifier for all tuples. HBASEScan allows it to be specified multiple times.

Type:str
tableName

‘tableName’ or ‘tableNameAttribute’. Cannot be used with ‘tableNameAttribute’. If the table does not exist, the operator will throw an exception.

Type:str
Type:Name of the HBASE table. It is an optional parameter but one of these parameters must be set in opeartor
tableNameAttribute

Name of the attribute on the input tuple containing the tableName. Use this parameter to pass the table name to the operator via input port. Cannot be used with parameter ‘tableName’. This is suitable for tables with the same schema.

Type:str
triggerCount

This parameter specifies the number of rows to process before triggering a drain. This parameter is valid only in a operator-driven consistent region.

Type:int
vmArg

The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.

Type:str
streamsx.hbase.download_toolkit(url=None, target_dir=None)

Downloads the latest Hbase toolkit from GitHub.

Example for updating the Hbase toolkit for your topology with the latest toolkit from GitHub:

import streamsx.hbase as hbase
# download Hbase toolkit from GitHub
hbase_toolkit_location = hbase.download_toolkit()
# add the toolkit to topology
streamsx.spl.toolkit.add_toolkit(topology, hbase_toolkit_location)

Example for updating the topology with a specific version of the Hbase toolkit using a URL:

import streamsx.hbase as hbase
url380 = 'https://github.com/IBMStreams/streamsx.hbase/releases/download/v3.8.0/streamsx.hbase.toolkits-3.8.0-20190829-1529.tgz'
hbase_toolkit_location = hbase.download_toolkit(url=url380)
streamsx.spl.toolkit.add_toolkit(topology, hbase_toolkit_location)
Parameters:
  • url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.
  • target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is None a location relative to the system temporary directory is chosen.
Returns:

the location of the downloaded Hbase toolkit

Return type:

str

Note

This function requires an outgoing Internet connection

New in version 1.3.

streamsx.hbase.scan(topology, table_name, max_versions=None, init_delay=None, connection=None, name=None)

Scans a HBASE table and delivers the number of results, rows and values in output stream.

The output streams has to be defined as StreamSchema.

Parameters:
  • topology (Topology) – Topology to contain the returned stream.
  • max_versions (int32) – specifies the maximum number of versions that the operator returns. It defaults to a value of one. A value of 0 indicates that the operator gets all versions.
  • init_delay (int|float|datetime.timedelta) – The time to wait in seconds before the operator scans the directory for the first time. If not set, then the default value is 0.
  • connection (dict|filename|string) – Specify the connection to HBASE either with a filename of a HBase configuration file or as string in format “HOST:PORT” or as dict containing the properties ‘host’ and ‘port’. If not specified the environment variables HADOOP_HOST_PORT or HBASE_SITE_XML are used.
  • name (str) – Source name in the Streams context, defaults to a generated name.
Returns:

Output Stream containing the row numResults and values. It is a structured streams schema.

HBASEScanOutputSchema = StreamSchema(‘tuple<rstring row, int32 numResults, rstring columnFamily, rstring columnQualifier, rstring value>’)

Return type:

StreamSchema

streamsx.hbase.get(stream, table_name, row_attr_name, connection=None, name=None)

get tuples from a HBASE table and delivers the number of results, rows and values in output stream.

Parameters:
  • stream – contain the input stream.
  • table_name – The name of hbase table.
  • row_attr_name (rstring) – This parameter specifies the name of the attribute of the output port in which the operator puts the retrieval results. The data type for the attribute depends on whether you specified a columnFamily or columnQualifier.
  • connection (dict|filename|string) – Specify the connection to HBASE either with a filename of a HBase configuration file or as string in format “HOST:PORT” or as dict containing the properties ‘host’ and ‘port’. If not specified the environment variables HADOOP_HOST_PORT or HBASE_SITE_XML are used.
  • name (str) – Operator name in the Streams context, defaults to a generated name.
Returns:

Output Stream containing the row numResults and values. It is a structured streams schema.

HBASEGetOutputSchema = StreamSchema(‘tuple<rstring row, int32 numResults, rstring value, rstring infoType, rstring requestedDetail>’)

Return type:

StreamSchema

streamsx.hbase.put(stream, table_name, connection=None, name=None)

put a row which delivers in streams as tuple into a HBASE table.

The output streams has to be defined as StreamSchema.

Parameters:
  • stream – contain the input stream.
  • table_name – The name of hbase table,
  • connection (dict|filename|string) – Specify the connection to HBASE either with a filename of a HBase configuration file or as string in format “HOST:PORT” or as dict containing the properties ‘host’ and ‘port’. If not specified the environment variables HADOOP_HOST_PORT or HBASE_SITE_XML are used.
  • name (str) – Operator name in the Streams context, defaults to a generated name.
Returns:

Output Stream containing the result sucesss.

HBASEScanOutputSchema = StreamSchema(‘tuple<boolen success>’)

Return type:

StreamSchema

streamsx.hbase.delete(stream, table_name, connection=None, name=None)

delete a row which delivers in streams as tuple from a HBASE table.

The output streams has to be defined as StreamSchema.

Parameters:
  • stream – contain the input stream.
  • table_name – The name of hbase table,
  • connection (dict|filename|string) – Specify the connection to HBASE either with a filename of a HBase configuration file or as string in format “HOST:PORT” or as dict containing the properties ‘host’ and ‘port’. If not specified the environment variables HADOOP_HOST_PORT or HBASE_SITE_XML are used.
  • name (str) – Operator name in the Streams context, defaults to a generated name.
Returns:

Output Stream containing the result sucesss.

HBASEScanOutputSchema = StreamSchema(‘tuple<boolen success>’)

Return type:

StreamSchema

Indices and tables