streamsx.hbase package¶
HBASE integration for IBM Streams¶
For details of implementing applications in Python for IBM Streams and the Streaming Analytics service running on IBM Cloud see:
This package exposes SPL operators in the com.ibm.streamsx.hbase toolkit as Python methods.
Overview¶
Provides functions to access HBASE.
HBASE configuration file¶
The host name and the port of hadoop server has to be specified with the environment variable HADOOP_HOST_PORT.
For example:
export HADOOP_HOST_PORT=hdp264.fyre.ibm.com:8020
The package creates a HBase configuration file (hbase-site.xml) from a template.
And replaces the hadoop server name and the port with values from environment variable HADOOP_HOST_PORT.
Alternative is specify the location of HBase configuration file hbase-site.xml with the environment variable HBASE_SITE_XML.
For example:
export HBASE_SITE_XML=/usr/hdp/current/hbase-client/conf/hbase-site.xml
Sample¶
A simple Streams application that scans a HBASE table and prints the scanned rows:
from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology.context import submit
import streamsx.hbase as hbase
topo = Topology('hbase_scan_sample')
scanned_rows = hbase.scan(topo, table_name='sample', max_versions=1 , init_delay=2)
scanned_rows.print()
cfg = {}
cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
submit ('DISTRIBUTED', topo, cfg)
-
class
streamsx.hbase.
HBaseGet
(tableName, rowAttrName, connection=None, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, **options)¶ HBaseGet gets tuples from an HBase table.
Example, gets tuples from HBase table ‘streamsSample_lotr’:
inputStream = _create_stream_for_get(topo) # crete a query stream output_schema = StreamSchema('tuple<rstring who, rstring colF, rstring colQ, rstring value, int32 numResults>') options = { 'columnFamilyAttrName' : 'colF', 'columnQualifierAttrName' : "colQ", 'outAttrName' : "value" , 'outputCountAttr' : 'numResults', 'maxVersions' : 0 } get_rows = inputStream.map(hbase.HBaseGet(tableName=_get_table_name(), rowAttrName='who', schema=output_schema, **options))
-
hbaseSite
¶ The hbaseSite specifies the path of hbase-site.xml file.
- Type
dict|str
-
tableName
¶ The name of HBase table.
- Type
str
-
schema
¶ Output schema, defaults to CommonSchema.String
- Type
StreamSchema
-
options
¶ The additional optional parameters as variable keyword arguments.
- Type
kwargs
-
property
authKeytab
¶ The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.
- Type
str
-
property
authPrincipal
¶ The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.
- Type
str
-
property
columnFamilyAttrName
¶ Name of the attribute on the input tuple containing the columnFamily. Cannot be used with staticColumnFmily.
- Type
str
-
property
columnQualifierAttrName
¶ Name of the attribute on the input tuple containing the columnQualifier. Cannot be used with staticColumnQualifier.
- Type
str
-
property
hbaseSite
The hbaseSite parameter specifies the path of hbase-site.xml file. This is the recommended way to specify the HBASE configuration.
- Type
str
-
property
maxVersions
¶ The optional parameter maxVersions specifies the maximum number of versions that the operator returns. It defaults to a value of one. A value of 0 indicates that the operator gets all versions.
- Type
int
-
property
minTimestamp
¶ The optional parameter minTimestamp specifies the minimum timestamp that is used for queries. The operator does not return any entries with a timestamp older than this value. Unless you specify the maxVersions parameter, the opertor returns only one entry in this time range.
- Type
int
-
property
outAttrName
¶ The optional parameter outAttrName specifies the name of the attribute of the output port in which the operator puts the retrieval results. The data type for the attribute depends on whether you specified a columnFamily or columnQualifier.
- Type
str
-
property
outputCountAttr
¶ The optional parameter outputCountAttr specifies the name of attribute of the output port where the operator puts a count of the values it populated.
- Type
str
-
populate
(topology, stream, schema, name, **options)¶ Populate the topology with this composite map transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:transformed_stream = input_stream.map(myMapComposite)
- Parameters
topology – Topology containing the composite map.
stream – Stream to be transformed.
schema – Schema passed into
map
.name – Name passed into
map
.**options – Future options passed to
map
.
- Returns
Single stream representing the transformation of stream.
- Return type
Stream
-
property
rowAttrName
¶ Name of the attribute on the input tuple containing the row. It is required.
- Type
str
-
property
staticColumnFamily
¶ If this parameter is specified, it will be used as the columnFamily for all operations. (Compare to columnFamilyAttrName.) For HBASEScan, it can have cardinality greater than one.
- Type
str
-
property
staticColumnQualifier
¶ If this parameter is specified, it will be used as the columnQualifier for all tuples. HBASEScan allows it to be specified multiple times.
- Type
str
-
property
tableName
‘tableName’ or ‘tableNameAttribute’. Cannot be used with ‘tableNameAttribute’. If the table does not exist, the operator will throw an exception.
- Type
str
- Type
Name of the HBASE table. It is an optional parameter but one of these parameters must be set in opeartor
-
property
tableNameAttribute
¶ Name of the attribute on the input tuple containing the tableName. Use this parameter to pass the table name to the operator via input port. Cannot be used with parameter ‘tableName’. This is suitable for tables with the same schema.
- Type
str
-
property
vmArg
¶ The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.
- Type
str
-
-
class
streamsx.hbase.
HBasePut
(tableName, rowAttrName, valueAttrName, connection=None, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, **options)¶ HBasePut puts the incoming tuples into an Hbase table.
Example, puts tuples into HBase table ‘streamsSample_lotr’:
import streamsx.hbase as hbase output_schema = StreamSchema('tuple<rstring who, rstring infoType, rstring requestedDetail, rstring value, int32 numResults>') HBasePutParameters = { 'columnFamilyAttrName' : 'infoType', 'columnQualifierAttrName' : "requestedDetail", 'HBasePutParameters' : "value" , } put_rows = inputStream.map(hbase.HBasePut(tableName='streamsSample_lotr', rowAttrName='who', schema=output_schema, **HBasePutParameters)) get_rows.print()
-
hbaseSite
¶ The hbaseSite specifies the path of hbase-site.xml file. .
- Type
dict|str
-
schema
¶ Output schema, defaults to CommonSchema.String
- Type
StreamSchema
-
options
¶ The additional optional parameters as variable keyword arguments.
- Type
kwargs
-
property
Timestamp
¶ This parameter specifies the timestamp in milliseconds (INT64). The timestamp allows for versioning of the cells. Everytime HBaes make a PUT on a table it set the timestamp. By default this is the current time in milliseconds, but you can set your own timestamp as well with this parametr. Cannot be used with TimestampAttrName.
- Type
int
-
property
TimestampAttrName
¶ Name of the attribute on the input tuple containing the timestamp in milliseconds. Cannot be used with Timestamp.
- Type
str
-
property
authKeytab
¶ The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.
- Type
str
-
property
authPrincipal
¶ The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.
- Type
str
-
property
batchSize
¶ This parameter has been deprecated as of Streams 4.2.0. The enableBuffer parameter should be used instead.
- Type
int
-
property
checkAttrName
¶ Name of the attribute specifying the tuple to check for before applying the Put or Delete. The type of the attribute is tuple with attributes columnFamily and columnQualifier, or a tuple with attributes columnFamily, columnQualifier, and value. In the first case, the Put or Delete will be allowed to proceed only when there is no entry for the row, columnFamily, columnQualifer combination. When the the type of the attribute given by checkAttrName contains an attribute value, the Put or Delete operation will only succeed when the entry specified the row, columnFamily, and columnQualifier has the given value.
- Type
str
-
property
columnFamilyAttrName
¶ Name of the attribute on the input tuple containing the columnFamily. Cannot be used with staticColumnFmily.
- Type
str
-
property
columnQualifierAttrName
¶ Name of the attribute on the input tuple containing the columnQualifier. Cannot be used with staticColumnQualifier.
- Type
str
-
property
enableBuffer
¶ When set to true, this parameter can improve the performance of the operator because tuples received by the operator will not be immediately forwarded to the HBase server.
- Type
bool
-
property
hbaseSite
The hbaseSite parameter specifies the path of hbase-site.xml file. This is the recommended way to specify the HBASE configuration.
- Type
str
-
populate
(topology, stream, schema, name, **options)¶ Populate the topology with this composite map transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:transformed_stream = input_stream.map(myMapComposite)
- Parameters
topology – Topology containing the composite map.
stream – Stream to be transformed.
schema – Schema passed into
map
.name – Name passed into
map
.**options – Future options passed to
map
.
- Returns
Single stream representing the transformation of stream.
- Return type
Stream
-
property
rowAttrName
¶ Name of the attribute on the input tuple containing the row. It is required.
- Type
str
-
property
staticColumnFamily
¶ If this parameter is specified, it will be used as the columnFamily for all operations. (Compare to columnFamilyAttrName.) For HBASEScan, it can have cardinality greater than one.
- Type
str
-
property
staticColumnQualifier
¶ If this parameter is specified, it will be used as the columnQualifier for all tuples. HBASEScan allows it to be specified multiple times.
- Type
str
-
property
successAttr
¶ Attribute on the output port to be set to true if the check passes and the action is successful.
- Type
str
-
property
tableName
¶ ‘tableName’ or ‘tableNameAttribute’. Cannot be used with ‘tableNameAttribute’. If the table does not exist, the operator will throw an exception.
- Type
str
- Type
Name of the HBASE table. It is an optional parameter but one of these parameters must be set in opeartor
-
property
tableNameAttribute
¶ Name of the attribute on the input tuple containing the tableName. Use this parameter to pass the table name to the operator via input port. Cannot be used with parameter ‘tableName’. This is suitable for tables with the same schema.
- Type
str
-
property
valueAttrName
¶ This parameter specifies the name of the attribute that contains the value that is put into the table. It is required.
- Type
str
-
property
vmArg
¶ The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.
- Type
str
-
-
class
streamsx.hbase.
HBaseScan
(tableName, connection=None, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, name=None, **options)¶ HBaseScan operator scans an HBase table. Like the FileSource operator, it has an optional input port. If no input port is specifed, then the operator scans the table according to the parameters that you specify, and sends the final punctuation.
Example, puts tuples into HBase table ‘streamsSample_lotr’:
import streamsx.hbase as hbase output_schema = StreamSchema('tuple<rstring who, rstring infoType, rstring requestedDetail, rstring value, int32 numResults>') HBasePutParameters = { 'columnFamilyAttrName' : 'infoType', 'columnQualifierAttrName' : "requestedDetail", 'HBasePutParameters' : "value" , } scanned_rows = inputStream.map(hbase.HBaseScan(tableName='streamsSample_lotr', rowAttrName='who', schema=output_schema, **HBasePutParameters)) scanned_rows.print()
-
hbaseSite
¶ The hbaseSite specifies the path of hbase-site.xml file. .
- Type
dict|str
-
schema
¶ Output schema, defaults to CommonSchema.String
- Type
StreamSchema
-
options
¶ The additional optional parameters as variable keyword arguments.
- Type
kwargs
-
property
authKeytab
¶ The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.
- Type
str
-
property
authPrincipal
¶ The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.
- Type
str
-
property
channel
¶ If this operator is part of a parallel region, it shares the work of scanning with other operators in the region. To do this, set this parameter value by calling getChannel(). This parameter is required if the maximum number of channels has a value other than zero.
- Type
int
-
property
endRow
¶ This parameter specifies the row to use to stop the scan. The row that you specify is excluded from the scan.
- Type
str
-
property
hbaseSite
The hbaseSite parameter specifies the path of hbase-site.xml file. This is the recommended way to specify the HBASE configuration.
- Type
str
-
property
initDelay
¶ The parameter initDelay specifies the time to wait in seconds before the operator HBASEScan reads the first row. The default value is 0 .
- Type
float
-
property
maxChannels
¶ If this operator is part of a parallel region, set this parameter value by calling getMaxChannels(). If the operator is in a parallel region, then the regions to be scanned are divided among the other copies of this operator in the other channels. If this parameter is set, you must also set the channel parameter.
- Type
int
-
property
maxThreads
¶ Maximum number of threads to use to scan the table. Defaults to one.
- Type
int
-
property
maxVersions
¶ This parameter specifies the minimum timestamp that is used for queries. The operator does not return any entries with a timestamp older than this value. Unless you specify the maxVersions parameter, the opertor returns only one entry in this time range.
- Type
int
-
property
minTimestamp
¶ This parameter specifies the minimum timestamp that is used for queries. The operator does not return any entries with a timestamp older than this value. Unless you specify the maxVersions parameter, the opertor returns only one entry in this time range.
- Type
int
-
property
outAttrName
¶ This parameter specifies the name of the attribute in which to put the value. It defaults to value. If the attribute is a tuple data type, the attribute names are used as columnQualifiers. If multiple families are included in the scan and they have the same columnQualifiers, there is no way of knowing which columnFamily was used to populate a tuple attribute.
- Type
str
-
property
outputCountAttr
¶ This parameter specifies the output attribute in which to put the number of results that are found. When the result is a tuple, this parameter value is the number attributes that were populated in that tuple.
- Type
str
-
populate
(topology, stream, **options)¶ Populate the topology with this composite source. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:topo = Topology() source_stream = topo.source(mySourceComposite)
- Parameters
topology – Topology containing the source.
name – Name passed into
source
.**options – Future options passed to
source
.
- Returns
Single stream representing the source.
- Return type
Stream
-
property
rowPrefix
¶ This parameter specifies that the scan only return rows that have this prefix.
- Type
str
-
property
startRow
¶ This parameter specifies the row to use to start the scan. The row that you specify is included in the scan.
- Type
str
-
property
staticColumnFamily
¶ If this parameter is specified, it will be used as the columnFamily for all operations. (Compare to columnFamilyAttrName.) For HBASEScan, it can have cardinality greater than one.
- Type
str
-
property
staticColumnQualifier
¶ If this parameter is specified, it will be used as the columnQualifier for all tuples. HBASEScan allows it to be specified multiple times.
- Type
str
-
property
tableName
¶ ‘tableName’ or ‘tableNameAttribute’. Cannot be used with ‘tableNameAttribute’. If the table does not exist, the operator will throw an exception.
- Type
str
- Type
Name of the HBASE table. It is an optional parameter but one of these parameters must be set in opeartor
-
property
tableNameAttribute
¶ Name of the attribute on the input tuple containing the tableName. Use this parameter to pass the table name to the operator via input port. Cannot be used with parameter ‘tableName’. This is suitable for tables with the same schema.
- Type
str
-
property
triggerCount
¶ This parameter specifies the number of rows to process before triggering a drain. This parameter is valid only in a operator-driven consistent region.
- Type
int
-
property
vmArg
¶ The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.
- Type
str
-
-
streamsx.hbase.
download_toolkit
(url=None, target_dir=None)¶ Downloads the latest Hbase toolkit from GitHub.
Example for updating the Hbase toolkit for your topology with the latest toolkit from GitHub:
import streamsx.hbase as hbase # download Hbase toolkit from GitHub hbase_toolkit_location = hbase.download_toolkit() # add the toolkit to topology streamsx.spl.toolkit.add_toolkit(topology, hbase_toolkit_location)
Example for updating the topology with a specific version of the Hbase toolkit using a URL:
import streamsx.hbase as hbase url380 = 'https://github.com/IBMStreams/streamsx.hbase/releases/download/v3.8.0/streamsx.hbase.toolkits-3.8.0-20190829-1529.tgz' hbase_toolkit_location = hbase.download_toolkit(url=url380) streamsx.spl.toolkit.add_toolkit(topology, hbase_toolkit_location)
- Parameters
url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.
target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is
None
a location relative to the system temporary directory is chosen.
- Returns
the location of the downloaded Hbase toolkit
- Return type
str
Note
This function requires an outgoing Internet connection
New in version 1.3.
-
streamsx.hbase.
scan
(topology, table_name, max_versions=None, init_delay=None, connection=None, name=None)¶ Scans a HBASE table and delivers the number of results, rows and values in output stream.
The output streams has to be defined as StreamSchema.
- Parameters
topology (Topology) – Topology to contain the returned stream.
max_versions (int32) – specifies the maximum number of versions that the operator returns. It defaults to a value of one. A value of 0 indicates that the operator gets all versions.
init_delay (int|float|datetime.timedelta) – The time to wait in seconds before the operator scans the directory for the first time. If not set, then the default value is 0.
connection (dict|filename|string) – Specify the connection to HBASE either with a filename of a HBase configuration file or as string in format “HOST:PORT” or as dict containing the properties ‘host’ and ‘port’. If not specified the environment variables
HADOOP_HOST_PORT
orHBASE_SITE_XML
are used.name (str) – Source name in the Streams context, defaults to a generated name.
- Returns
Output Stream containing the row numResults and values. It is a structured streams schema.
HBASEScanOutputSchema = StreamSchema(‘tuple<rstring row, int32 numResults, rstring columnFamily, rstring columnQualifier, rstring value>’)
- Return type
StreamSchema
-
streamsx.hbase.
get
(stream, table_name, row_attr_name, connection=None, name=None)¶ get tuples from a HBASE table and delivers the number of results, rows and values in output stream.
- Parameters
stream – contain the input stream.
table_name – The name of hbase table.
row_attr_name (rstring) – This parameter specifies the name of the attribute of the output port in which the operator puts the retrieval results. The data type for the attribute depends on whether you specified a columnFamily or columnQualifier.
connection (dict|filename|string) – Specify the connection to HBASE either with a filename of a HBase configuration file or as string in format “HOST:PORT” or as dict containing the properties ‘host’ and ‘port’. If not specified the environment variables
HADOOP_HOST_PORT
orHBASE_SITE_XML
are used.name (str) – Operator name in the Streams context, defaults to a generated name.
- Returns
Output Stream containing the row numResults and values. It is a structured streams schema.
HBASEGetOutputSchema = StreamSchema(‘tuple<rstring row, int32 numResults, rstring value, rstring infoType, rstring requestedDetail>’)
- Return type
StreamSchema
-
streamsx.hbase.
put
(stream, table_name, connection=None, name=None)¶ put a row which delivers in streams as tuple into a HBASE table.
The output streams has to be defined as StreamSchema.
- Parameters
stream – contain the input stream.
table_name – The name of hbase table,
connection (dict|filename|string) – Specify the connection to HBASE either with a filename of a HBase configuration file or as string in format “HOST:PORT” or as dict containing the properties ‘host’ and ‘port’. If not specified the environment variables
HADOOP_HOST_PORT
orHBASE_SITE_XML
are used.name (str) – Operator name in the Streams context, defaults to a generated name.
- Returns
Output Stream containing the result sucesss.
HBASEScanOutputSchema = StreamSchema(‘tuple<boolen success>’)
- Return type
StreamSchema
-
streamsx.hbase.
delete
(stream, table_name, connection=None, name=None)¶ delete a row which delivers in streams as tuple from a HBASE table.
The output streams has to be defined as StreamSchema.
- Parameters
stream – contain the input stream.
table_name – The name of hbase table,
connection (dict|filename|string) – Specify the connection to HBASE either with a filename of a HBase configuration file or as string in format “HOST:PORT” or as dict containing the properties ‘host’ and ‘port’. If not specified the environment variables
HADOOP_HOST_PORT
orHBASE_SITE_XML
are used.name (str) – Operator name in the Streams context, defaults to a generated name.
- Returns
Output Stream containing the result sucesss.
HBASEScanOutputSchema = StreamSchema(‘tuple<boolen success>’)
- Return type
StreamSchema