Updated on 2022-06-01 GMT+08:00

Java Sample Code

Function Description

In the Spark applications, users can use HBase APIs to create a table, read the table, and insert data into the table.

Sample Code

The following code snippets are used as an example. For complete codes, see SparkOnHbaseJavaExample.

Example: Creating an HBase table

public class TableCreation {
    public static void main (String[] args) throws IOException {

        SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

        // Create a connection channel to connect to HBase.
        Connection connection = ConnectionFactory.createConnection(hbConf);

        // Declare table description.
        TableName userTable  = TableName.valueOf("shb1");
        HTableDescriptor tableDescr = new HTableDescriptor(userTable);
        tableDescr.addFamily(new HColumnDescriptor("info".getBytes()));

        //Create a table.
        System.out.println("Creating table shb1. ");
        Admin admin = connection.getAdmin();
        if (admin.tableExists(userTable)) {
            admin.disableTable(userTable);
            admin.deleteTable(userTable);
        }
        admin.createTable(tableDescr);

        connection.close();
        jsc.stop();
        System.out.println("Done!");
        
    }
}

Example: Inserting data into the HBase table

public class TableInputData {
    public static void main (String[] args) throws IOException {

        // Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath.
        SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

        // Declare table information.
        Table table = null;
        String tableName = "shb1";
        byte[] familyName = Bytes.toBytes("info");
        Connection connection = null;

        try {
            // Obtain the HBase connection.
            connection = ConnectionFactory.createConnection(hbConf);
            // Obtain the table object.
            table = connection.getTable(TableName.valueOf(tableName));
            List<Tuple4<String, String, String, String>> data = jsc.textFile(args[0]).map(
                    new Function<String, Tuple4<String, String, String, String>>() {
                        @Override
                        public Tuple4<String, String, String, String> call(String s) throws Exception {
                            String[] tokens = s.split(",");

                            return new Tuple4<String, String, String, String>(tokens[0], tokens[1], tokens[2], tokens[3]);
                        }
                    }).collect();

            Integer i = 0;
            for(Tuple4<String, String, String, String> line:  data) {
                Put put = new Put(Bytes.toBytes("row" + i));
                put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1()));
                put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2()));
                put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3()));
                put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4()));
                i += 1;
                table.put(put);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                 try {
                     // Close the HBase connection.
                     connection.close();
                 } catch (IOException e) {
                     e.printStackTrace();
                 }
             }
             jsc.stop();
        }
    }
}

Example: Reading HBase table data

public class TableOutputData {
    public static void main(String[] args) throws IOException {

        System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        System.setProperty("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator");

        // Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath.
        SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

        //Declare information about the table to be queried.
        Scan scan = new org.apache.hadoop.hbase.client.Scan();
        scan.addFamily(Bytes.toBytes("info"));
        org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        String scanToString = Base64.encodeBytes(proto.toByteArray());
        hbConf.set(TableInputFormat.INPUT_TABLE, "shb1");
        hbConf.set(TableInputFormat.SCAN, scanToString);

        // Use the Spark API to obtain table data.
        JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

        // Traverse every row in the HBase table and print the results.
        List<Tuple2<ImmutableBytesWritable, Result>> rddList = rdd.collect();
        for (int i = 0; i < rddList.size(); i++) {
            Tuple2<ImmutableBytesWritable, Result> t2 = rddList.get(i);
            ImmutableBytesWritable key = t2._1();
            Iterator<Cell> it = t2._2().listCells().iterator();
            while (it.hasNext()) {
                Cell c = it.next();
                String family = Bytes.toString(CellUtil.cloneFamily(c));
                String qualifier = Bytes.toString(CellUtil.cloneQualifier(c));
                String value = Bytes.toString(CellUtil.cloneValue(c));
                Long tm = c.getTimestamp();
                System.out.println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm);
            }
        }
        jsc.stop();
    }
}