streamsx.hbase package

HBASE integration for IBM Streams

For details of implementing applications in Python for IBM Streams and the Streaming Analytics service running on IBM Cloud see:

This package exposes SPL operators in the com.ibm.streamsx.hbase toolkit as Python methods.

Overview

Provides functions to access HBASE.

HBASE configuration file

The host name and the port of hadoop server has to be specified with the environment variable HADOOP_HOST_PORT.

For example:

export HADOOP_HOST_PORT=hdp264.fyre.ibm.com:8020

The package creates a HBase configuration file (hbase-site.xml) from a template.

And replaces the hadoop server name and the port with values from environment variable HADOOP_HOST_PORT.

Alternative is specify the location of HBase configuration file hbase-site.xml with the environment variable HBASE_SITE_XML.

For example:

export HBASE_SITE_XML=/usr/hdp/current/hbase-client/conf/hbase-site.xml

Sample

A simple Streams application that scans a HBASE table and prints the scanned rows:

from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology.context import submit
import streamsx.hbase as hbase

topo = Topology('hbase_scan_sample')

scanned_rows = hbase.scan(topo, table_name='sample', max_versions=1 , init_delay=2)
scanned_rows.print()

cfg = {}
cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
submit ('DISTRIBUTED', topo, cfg)
class streamsx.hbase.HBaseGet(tableName, rowAttrName, connection=None, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, **options)

HBaseGet gets tuples from an HBase table.

Example, gets tuples from HBase table ‘streamsSample_lotr’:

inputStream = _create_stream_for_get(topo)
# crete a query stream

output_schema = StreamSchema('tuple<rstring who, rstring colF, rstring colQ, rstring value, int32 numResults>')

options = {
  'columnFamilyAttrName' : 'colF',
  'columnQualifierAttrName' : "colQ",
  'outAttrName' : "value" ,
  'outputCountAttr' : 'numResults',
  'maxVersions' : 0
}

get_rows = inputStream.map(hbase.HBaseGet(tableName=_get_table_name(), rowAttrName='who', schema=output_schema, **options))
hbaseSite

The hbaseSite specifies the path of hbase-site.xml file.

Type

dict|str

tableName

The name of HBase table.

Type

str

schema

Output schema, defaults to CommonSchema.String

Type

StreamSchema

options

The additional optional parameters as variable keyword arguments.

Type

kwargs

property authKeytab

The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.

Type

str

property authPrincipal

The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.

Type

str

property columnFamilyAttrName

Name of the attribute on the input tuple containing the columnFamily. Cannot be used with staticColumnFmily.

Type

str

property columnQualifierAttrName

Name of the attribute on the input tuple containing the columnQualifier. Cannot be used with staticColumnQualifier.

Type

str

property hbaseSite

The hbaseSite parameter specifies the path of hbase-site.xml file. This is the recommended way to specify the HBASE configuration.

Type

str

property maxVersions

The optional parameter maxVersions specifies the maximum number of versions that the operator returns. It defaults to a value of one. A value of 0 indicates that the operator gets all versions.

Type

int

property minTimestamp

The optional parameter minTimestamp specifies the minimum timestamp that is used for queries. The operator does not return any entries with a timestamp older than this value. Unless you specify the maxVersions parameter, the opertor returns only one entry in this time range.

Type

int

property outAttrName

The optional parameter outAttrName specifies the name of the attribute of the output port in which the operator puts the retrieval results. The data type for the attribute depends on whether you specified a columnFamily or columnQualifier.

Type

str

property outputCountAttr

The optional parameter outputCountAttr specifies the name of attribute of the output port where the operator puts a count of the values it populated.

Type

str

populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

transformed_stream = input_stream.map(myMapComposite)
Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • schema – Schema passed into map.

  • name – Name passed into map.

  • **options – Future options passed to map.

Returns

Single stream representing the transformation of stream.

Return type

Stream

property rowAttrName

Name of the attribute on the input tuple containing the row. It is required.

Type

str

property staticColumnFamily

If this parameter is specified, it will be used as the columnFamily for all operations. (Compare to columnFamilyAttrName.) For HBASEScan, it can have cardinality greater than one.

Type

str

property staticColumnQualifier

If this parameter is specified, it will be used as the columnQualifier for all tuples. HBASEScan allows it to be specified multiple times.

Type

str

property tableName

‘tableName’ or ‘tableNameAttribute’. Cannot be used with ‘tableNameAttribute’. If the table does not exist, the operator will throw an exception.

Type

str

Type

Name of the HBASE table. It is an optional parameter but one of these parameters must be set in opeartor

property tableNameAttribute

Name of the attribute on the input tuple containing the tableName. Use this parameter to pass the table name to the operator via input port. Cannot be used with parameter ‘tableName’. This is suitable for tables with the same schema.

Type

str

property vmArg

The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.

Type

str

class streamsx.hbase.HBasePut(tableName, rowAttrName, valueAttrName, connection=None, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, **options)

HBasePut puts the incoming tuples into an Hbase table.

Example, puts tuples into HBase table ‘streamsSample_lotr’:

import streamsx.hbase as hbase

output_schema = StreamSchema('tuple<rstring who, rstring infoType, rstring requestedDetail, rstring value, int32 numResults>')

HBasePutParameters = {
  'columnFamilyAttrName' : 'infoType',
  'columnQualifierAttrName' : "requestedDetail",
  'HBasePutParameters' : "value" ,
 }

put_rows = inputStream.map(hbase.HBasePut(tableName='streamsSample_lotr', rowAttrName='who', schema=output_schema, **HBasePutParameters))

get_rows.print()
hbaseSite

The hbaseSite specifies the path of hbase-site.xml file. .

Type

dict|str

schema

Output schema, defaults to CommonSchema.String

Type

StreamSchema

options

The additional optional parameters as variable keyword arguments.

Type

kwargs

property Timestamp

This parameter specifies the timestamp in milliseconds (INT64). The timestamp allows for versioning of the cells. Everytime HBaes make a PUT on a table it set the timestamp. By default this is the current time in milliseconds, but you can set your own timestamp as well with this parametr. Cannot be used with TimestampAttrName.

Type

int

property TimestampAttrName

Name of the attribute on the input tuple containing the timestamp in milliseconds. Cannot be used with Timestamp.

Type

str

property authKeytab

The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.

Type

str

property authPrincipal

The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.

Type

str

property batchSize

This parameter has been deprecated as of Streams 4.2.0. The enableBuffer parameter should be used instead.

Type

int

property checkAttrName

Name of the attribute specifying the tuple to check for before applying the Put or Delete. The type of the attribute is tuple with attributes columnFamily and columnQualifier, or a tuple with attributes columnFamily, columnQualifier, and value. In the first case, the Put or Delete will be allowed to proceed only when there is no entry for the row, columnFamily, columnQualifer combination. When the the type of the attribute given by checkAttrName contains an attribute value, the Put or Delete operation will only succeed when the entry specified the row, columnFamily, and columnQualifier has the given value.

Type

str

property columnFamilyAttrName

Name of the attribute on the input tuple containing the columnFamily. Cannot be used with staticColumnFmily.

Type

str

property columnQualifierAttrName

Name of the attribute on the input tuple containing the columnQualifier. Cannot be used with staticColumnQualifier.

Type

str

property enableBuffer

When set to true, this parameter can improve the performance of the operator because tuples received by the operator will not be immediately forwarded to the HBase server.

Type

bool

property hbaseSite

The hbaseSite parameter specifies the path of hbase-site.xml file. This is the recommended way to specify the HBASE configuration.

Type

str

populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

transformed_stream = input_stream.map(myMapComposite)
Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • schema – Schema passed into map.

  • name – Name passed into map.

  • **options – Future options passed to map.

Returns

Single stream representing the transformation of stream.

Return type

Stream

property rowAttrName

Name of the attribute on the input tuple containing the row. It is required.

Type

str

property staticColumnFamily

If this parameter is specified, it will be used as the columnFamily for all operations. (Compare to columnFamilyAttrName.) For HBASEScan, it can have cardinality greater than one.

Type

str

property staticColumnQualifier

If this parameter is specified, it will be used as the columnQualifier for all tuples. HBASEScan allows it to be specified multiple times.

Type

str

property successAttr

Attribute on the output port to be set to true if the check passes and the action is successful.

Type

str

property tableName

‘tableName’ or ‘tableNameAttribute’. Cannot be used with ‘tableNameAttribute’. If the table does not exist, the operator will throw an exception.

Type

str

Type

Name of the HBASE table. It is an optional parameter but one of these parameters must be set in opeartor

property tableNameAttribute

Name of the attribute on the input tuple containing the tableName. Use this parameter to pass the table name to the operator via input port. Cannot be used with parameter ‘tableName’. This is suitable for tables with the same schema.

Type

str

property valueAttrName

This parameter specifies the name of the attribute that contains the value that is put into the table. It is required.

Type

str

property vmArg

The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.

Type

str

class streamsx.hbase.HBaseScan(tableName, connection=None, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, name=None, **options)

HBaseScan operator scans an HBase table. Like the FileSource operator, it has an optional input port. If no input port is specifed, then the operator scans the table according to the parameters that you specify, and sends the final punctuation.

Example, puts tuples into HBase table ‘streamsSample_lotr’:

import streamsx.hbase as hbase

output_schema = StreamSchema('tuple<rstring who, rstring infoType, rstring requestedDetail, rstring value, int32 numResults>')

HBasePutParameters = {
  'columnFamilyAttrName' : 'infoType',
  'columnQualifierAttrName' : "requestedDetail",
  'HBasePutParameters' : "value" ,
 }

scanned_rows = inputStream.map(hbase.HBaseScan(tableName='streamsSample_lotr', rowAttrName='who', schema=output_schema, **HBasePutParameters))

scanned_rows.print()
hbaseSite

The hbaseSite specifies the path of hbase-site.xml file. .

Type

dict|str

schema

Output schema, defaults to CommonSchema.String

Type

StreamSchema

options

The additional optional parameters as variable keyword arguments.

Type

kwargs

property authKeytab

The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.

Type

str

property authPrincipal

The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.

Type

str

property channel

If this operator is part of a parallel region, it shares the work of scanning with other operators in the region. To do this, set this parameter value by calling getChannel(). This parameter is required if the maximum number of channels has a value other than zero.

Type

int

property endRow

This parameter specifies the row to use to stop the scan. The row that you specify is excluded from the scan.

Type

str

property hbaseSite

The hbaseSite parameter specifies the path of hbase-site.xml file. This is the recommended way to specify the HBASE configuration.

Type

str

property initDelay

The parameter initDelay specifies the time to wait in seconds before the operator HBASEScan reads the first row. The default value is 0 .

Type

float

property maxChannels

If this operator is part of a parallel region, set this parameter value by calling getMaxChannels(). If the operator is in a parallel region, then the regions to be scanned are divided among the other copies of this operator in the other channels. If this parameter is set, you must also set the channel parameter.

Type

int

property maxThreads

Maximum number of threads to use to scan the table. Defaults to one.

Type

int

property maxVersions

This parameter specifies the minimum timestamp that is used for queries. The operator does not return any entries with a timestamp older than this value. Unless you specify the maxVersions parameter, the opertor returns only one entry in this time range.

Type

int

property minTimestamp

This parameter specifies the minimum timestamp that is used for queries. The operator does not return any entries with a timestamp older than this value. Unless you specify the maxVersions parameter, the opertor returns only one entry in this time range.

Type

int

property outAttrName

This parameter specifies the name of the attribute in which to put the value. It defaults to value. If the attribute is a tuple data type, the attribute names are used as columnQualifiers. If multiple families are included in the scan and they have the same columnQualifiers, there is no way of knowing which columnFamily was used to populate a tuple attribute.

Type

str

property outputCountAttr

This parameter specifies the output attribute in which to put the number of results that are found. When the result is a tuple, this parameter value is the number attributes that were populated in that tuple.

Type

str

populate(topology, stream, **options)

Populate the topology with this composite source. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

topo = Topology()
source_stream = topo.source(mySourceComposite)
Parameters
  • topology – Topology containing the source.

  • name – Name passed into source.

  • **options – Future options passed to source.

Returns

Single stream representing the source.

Return type

Stream

property rowPrefix

This parameter specifies that the scan only return rows that have this prefix.

Type

str

property startRow

This parameter specifies the row to use to start the scan. The row that you specify is included in the scan.

Type

str

property staticColumnFamily

If this parameter is specified, it will be used as the columnFamily for all operations. (Compare to columnFamilyAttrName.) For HBASEScan, it can have cardinality greater than one.

Type

str

property staticColumnQualifier

If this parameter is specified, it will be used as the columnQualifier for all tuples. HBASEScan allows it to be specified multiple times.

Type

str

property tableName

‘tableName’ or ‘tableNameAttribute’. Cannot be used with ‘tableNameAttribute’. If the table does not exist, the operator will throw an exception.

Type

str

Type

Name of the HBASE table. It is an optional parameter but one of these parameters must be set in opeartor

property tableNameAttribute

Name of the attribute on the input tuple containing the tableName. Use this parameter to pass the table name to the operator via input port. Cannot be used with parameter ‘tableName’. This is suitable for tables with the same schema.

Type

str

property triggerCount

This parameter specifies the number of rows to process before triggering a drain. This parameter is valid only in a operator-driven consistent region.

Type

int

property vmArg

The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.

Type

str

streamsx.hbase.download_toolkit(url=None, target_dir=None)

Downloads the latest Hbase toolkit from GitHub.

Example for updating the Hbase toolkit for your topology with the latest toolkit from GitHub:

import streamsx.hbase as hbase
# download Hbase toolkit from GitHub
hbase_toolkit_location = hbase.download_toolkit()
# add the toolkit to topology
streamsx.spl.toolkit.add_toolkit(topology, hbase_toolkit_location)

Example for updating the topology with a specific version of the Hbase toolkit using a URL:

import streamsx.hbase as hbase
url380 = 'https://github.com/IBMStreams/streamsx.hbase/releases/download/v3.8.0/streamsx.hbase.toolkits-3.8.0-20190829-1529.tgz'
hbase_toolkit_location = hbase.download_toolkit(url=url380)
streamsx.spl.toolkit.add_toolkit(topology, hbase_toolkit_location)
Parameters
  • url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.

  • target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is None a location relative to the system temporary directory is chosen.

Returns

the location of the downloaded Hbase toolkit

Return type

str

Note

This function requires an outgoing Internet connection

New in version 1.3.

streamsx.hbase.scan(topology, table_name, max_versions=None, init_delay=None, connection=None, name=None)

Scans a HBASE table and delivers the number of results, rows and values in output stream.

The output streams has to be defined as StreamSchema.

Parameters
  • topology (Topology) – Topology to contain the returned stream.

  • max_versions (int32) – specifies the maximum number of versions that the operator returns. It defaults to a value of one. A value of 0 indicates that the operator gets all versions.

  • init_delay (int|float|datetime.timedelta) – The time to wait in seconds before the operator scans the directory for the first time. If not set, then the default value is 0.

  • connection (dict|filename|string) – Specify the connection to HBASE either with a filename of a HBase configuration file or as string in format “HOST:PORT” or as dict containing the properties ‘host’ and ‘port’. If not specified the environment variables HADOOP_HOST_PORT or HBASE_SITE_XML are used.

  • name (str) – Source name in the Streams context, defaults to a generated name.

Returns

Output Stream containing the row numResults and values. It is a structured streams schema.

HBASEScanOutputSchema = StreamSchema(‘tuple<rstring row, int32 numResults, rstring columnFamily, rstring columnQualifier, rstring value>’)

Return type

StreamSchema

streamsx.hbase.get(stream, table_name, row_attr_name, connection=None, name=None)

get tuples from a HBASE table and delivers the number of results, rows and values in output stream.

Parameters
  • stream – contain the input stream.

  • table_name – The name of hbase table.

  • row_attr_name (rstring) – This parameter specifies the name of the attribute of the output port in which the operator puts the retrieval results. The data type for the attribute depends on whether you specified a columnFamily or columnQualifier.

  • connection (dict|filename|string) – Specify the connection to HBASE either with a filename of a HBase configuration file or as string in format “HOST:PORT” or as dict containing the properties ‘host’ and ‘port’. If not specified the environment variables HADOOP_HOST_PORT or HBASE_SITE_XML are used.

  • name (str) – Operator name in the Streams context, defaults to a generated name.

Returns

Output Stream containing the row numResults and values. It is a structured streams schema.

HBASEGetOutputSchema = StreamSchema(‘tuple<rstring row, int32 numResults, rstring value, rstring infoType, rstring requestedDetail>’)

Return type

StreamSchema

streamsx.hbase.put(stream, table_name, connection=None, name=None)

put a row which delivers in streams as tuple into a HBASE table.

The output streams has to be defined as StreamSchema.

Parameters
  • stream – contain the input stream.

  • table_name – The name of hbase table,

  • connection (dict|filename|string) – Specify the connection to HBASE either with a filename of a HBase configuration file or as string in format “HOST:PORT” or as dict containing the properties ‘host’ and ‘port’. If not specified the environment variables HADOOP_HOST_PORT or HBASE_SITE_XML are used.

  • name (str) – Operator name in the Streams context, defaults to a generated name.

Returns

Output Stream containing the result sucesss.

HBASEScanOutputSchema = StreamSchema(‘tuple<boolen success>’)

Return type

StreamSchema

streamsx.hbase.delete(stream, table_name, connection=None, name=None)

delete a row which delivers in streams as tuple from a HBASE table.

The output streams has to be defined as StreamSchema.

Parameters
  • stream – contain the input stream.

  • table_name – The name of hbase table,

  • connection (dict|filename|string) – Specify the connection to HBASE either with a filename of a HBase configuration file or as string in format “HOST:PORT” or as dict containing the properties ‘host’ and ‘port’. If not specified the environment variables HADOOP_HOST_PORT or HBASE_SITE_XML are used.

  • name (str) – Operator name in the Streams context, defaults to a generated name.

Returns

Output Stream containing the result sucesss.

HBASEScanOutputSchema = StreamSchema(‘tuple<boolen success>’)

Return type

StreamSchema

Indices and tables