示例
Configuration可以设置的参数
为了能够建立一个HBase Client端到HBase Server端的连接,需要设置如下几个参数。
- hbase.zookeeper.quorum: ZooKeeper的IP。多个ZooKeeper节点的话,中间用“,”隔开。
- hbase.zookeeper.property.clientPort: Zookeeper的端口。
通过HBaseConfiguration.create()创建的Configuration实例,会自动加载如下配置文件中的配置项:
- core-default.xml
- core-site.xml
- hbase-default.xml
- hbase-site.xml
因此,这4个配置文件,应该要放置在“Source Folder”下面(将一个文件夹设置为Source Folder的方法:如果在工程下面建立了一个resource的文件夹,那么,可以在该文件夹上右键鼠标,依次选择“Build Path”->“Use as Source Folder”即可,可参考下图)
下面是客户端可配置的一些参数集合。
在通常情况下,这些值都不建议修改。
参数名 |
参数解释 |
---|---|
hbase.client.pause |
每次异常或者其它情况下重试等待相关的时间参数(实际等待时间将根据该值与已重试次数计算得出)。 |
hbase.client.retries.number |
异常或者其它情况下的重试次数。 |
hbase.client.retries.longer.multiplier |
与重试次数有关。 |
hbase.client.rpc.maxattempts |
RPC请求不可达时的重试次数。 |
hbase.regionserver.lease.period |
与Scanner超时时间有关(单位ms)。 |
hbase.client.write.buffer |
在启用AutoFlush的情况下,该值不起作用。如果未启用AotoFlush的话,HBase Client端会首先缓存写入的数据,达到设定的大小后才向HBase集群下发一次写入操作。 |
hbase.client.scanner.caching |
Scan时一次next请求获取的行数。 |
hbase.client.keyvalue.maxsize |
一条keyvalue数据的最大值。 |
hbase.htable.threads.max |
HTable实例中与数据操作有关的最大线程数。 |
hbase.client.prefetch.limit |
客户端在写数据或者读取数据时,需要首先获取对应的Region所在的地址。客户端可以预缓存一些Region地址,这个参数就是与缓存的数目有关的配置。 |
正确设置参数的方法:
hbaseConfig = HBaseConfiguration.create();
//如下参数,如果在配置文件中已经存在,则无须再配置
hbaseConfig.set("hbase.zookeeper.quorum", "10.5.100.1,10.5.100.2,10.5.100.3");
hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181");
HTablePool在多线程写入操作中的应用
- 有多个写数据线程时,可以采用HTablePool。现在先简单介绍下该类的使用方法和注意点:
- 多个写数据的线程之间,应共享同一个HTablePool实例。
实例化HTablePool的时候,应要指定最大的HTableInterface实例个数maxSize,即需要通过如下构造函数实例化该类:
public HTablePool(final Configuration config, final int maxSize)
关于maxSize的值,可以根据写数据的线程数Threads以及涉及到的用户表个数Tables来定,理论上,不应该超过(Threads*Tables)。
- 客户端线程通过HTablePool#getTable(tableName)的方法,获取一个表名为tableName的HTableInterface实例。
- 同一个HTableInterface实例,在同一个时刻只能给一个线程使用。
- 如果HTableInterface使用完了,需要调用HTablePool#putTable(HTableInterface table)方法将它放回去。
/** * 写数据失败后需要一定的重试次数,每一次重试的等待时间,需要根据已经重试的次数而定. */ private static final int[] RETRIES_WAITTIME = {1, 1, 1, 2, 2, 4, 4, 8, 16, 32}; /** * 限定的重试次数 */ private static final int RETRIES = 10; /** * 失败后等待的基本时间单位 */ private static final int PAUSE_UNIT = 1000; private static Configuration hadoopConfig; private static HTablePool tablePool; private static String[] tables; /** * <初始化HTablePool> * <功能详细描述> * @param config * @see [类、类#方法、类#成员] */ public static void initTablePool() { DemoConfig config = DemoConfig.getInstance(); if (hadoopConfig == null) { hadoopConfig = HBaseConfiguration.create(); hadoopConfig.set("hbase.zookeeper.quorum", config.getZookeepers()); hadoopConfig.set("hbase.zookeeper.property.clientPort", config.getZookeeperPort()); } if (tablePool == null) { tablePool = new HTablePool(hadoopConfig, config.getTablePoolMaxSize()); tables = config.getTables().split(","); } } public void run() { // 初始化HTablePool.因为这是多线程间共享的一个实例, 仅被实例化一次. initTablePool(); for (;;) { Map<String, Object> data = DataStorage.takeList(); String tableName = tables[(Integer)data.get("table")]; List<Put> list = (List)data.get("list"); // 以Row为Key,保存List中所有的Put.该集合仅使用于写入失败时查找失败的数据记录. // 因为从Server端仅返回了失败的数据记录的Row值. Map<byte[], Put> rowPutMap = null; // 如果失败了(哪怕是部分数据失败),需要重试.每一次重试,都仅提交失败的数据条目 INNER_LOOP : for (int retry = 0; retry < RETRIES; retry++) { // 从HTablePool中获取一个HTableInterface实例.用完后需要放回去. HTableInterface table = tablePool.getTable(tableName); try { table.put(list); // 如果执行到这里,说明成功了 . break INNER_LOOP; } catch (IOException e) { // 如果是RetriesExhaustedWithDetailsException类型的异常, // 说明这些数据中有部分是写入失败的这通常都是因为HBase集群 // 的进程异常引起,当然有时也会因为有大量的Region正在被转移, // 导致尝试一定的次数后失败. // 如果非RetriesExhaustedWithDetailsException异常,则需要将 // list中的所有数据都要重新插入. if (e instanceof RetriesExhaustedWithDetailsException) { RetriesExhaustedWithDetailsException ree = (RetriesExhaustedWithDetailsException)e; int failures = ree.getNumExceptions(); System.out.println("本次插入失败了[" + failures + "]条数据."); // 第一次失败且重试时,实例化该Map. if (rowPutMap == null) { rowPutMap = new HashMap<byte[], Put>(failures); for (int m = 0; m < list.size(); m++) { Put put = list.get(m); rowPutMap.put(put.getRow(), put); } } //先Clear掉原数据,然后将失败的数据添加进来 list.clear(); for (int m = 0; m < failures; m++) { list.add(rowPutMap.get(ree.getRow(m))); } } } finally { // 用完之后,再将该实例放回去 tablePool.putTable(table); } // 如果异常了,就暂时等待一段时间.该等待应该在将HTableInterface实例放回去之后 try { sleep(getWaitTime(retry)); } catch (InterruptedException e1) { System.out.println("Interruped"); } } } }
Put实例的创建
HBase是一个面向列的数据库,一行数据,可能对应多个列族,而一个列族又可以对应多个列。通常,写入数据的时候,需要指定要写入的列(含列族名称和列名称):
如果要往HBase表中写入一行数据,需要首先构建一个Put实例。Put中包含了数据的Key值和相应的Value值,Value值可以有多个(即可以有多列值)。
有一点需要注意:在往Put实例中add一条KeyValue数据时,传入的family,qualifier,value都是字节数组。在将一个字符串转换为字节数组时,需要使用Bytes.toBytes方法,不要使用String.toBytes方法,因为后者无法保证编码,尤其是在Key或Value中出现中文字符的时候,就会出现问题。
代码示例:
//列族的名称为privateInfo private final static byte[] FAMILY_PRIVATE = Bytes.toBytes("privateInfo"); //列族privateInfo中总共有两个列"name"&"address" private final static byte[] COLUMN_NAME = Bytes.toBytes("name"); private final static byte[] COLUMN_ADDR = Bytes.toBytes("address"); /** * <创建一个Put实例> * <在该方法中,将会创建一个具有1个列族,2列数据的Put> * @param rowKey Key值 * @param name 人名 * @param address 地址 * @return * @see [类、类#方法、类#成员] */ public Put createPut(String rowKey, String name, String address) { Put put = new Put(Bytes.toBytes(rowKey)); put.add(FAMILY_PRIVATE, COLUMN_NAME, Bytes.toBytes(name)); put.add(FAMILY_PRIVATE, COLUMN_ADDR, Bytes.toBytes(address)); return put; }
HBaseAdmin实例的创建以及常用方法
代码示例:
private Configuration demoConf = null; private HBaseAdmin hbaseAdmin = null; /** * <构造函数> * 需要将已经实例化好的Configuration实例传递进来 */ public HBaseAdminDemo(Configuration conf) { this.demoConf = conf; try { // 实例化HBaseAdmin hbaseAdmin = new HBaseAdmin(this.demoConf); } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } } /** * <一些方法使用示例> * <更多的方法,请参考HBase接口文档> * @throws IOException * @throws ZooKeeperConnectionException * @throws MasterNotRunningException * @see [类、类#方法、类#成员] */ public void demo() throws MasterNotRunningException, ZooKeeperConnectionException, IOException { byte[] regionName = Bytes.toBytes("mrtest,jjj,1315449869513.fc41d70b84e9f6e91f9f01affdb06703."); byte[] encodeName = Bytes.toBytes("fc41d70b84e9f6e91f9f01affdb06703"); // 重新分配一个Reigon. hbaseAdmin.unassign(regionName, false); // 主动触发Balance. hbaseAdmin.balancer(); // 移动一个Region,第2个参数,是RegionServer的HostName+StartCode,例如: // host187.example.com,60020,1289493121758.如果将该参数设置为null,则会随机移动该Region hbaseAdmin.move(encodeName, null); // 判断一个表是否存在 hbaseAdmin.tableExists("tableName"); // 判断一个表是否被激活 hbaseAdmin.isTableEnabled("tableName"); } /** * <快速创建一个表的方法> * <首先创建一个HTableDescriptor实例,它里面包含了即将要创建的HTable的描述信息,同时,需要创建相应的列族。列族关联的实例是HColumnDescriptor。在本示例中,创建的列族名称为“columnName”> * @param tableName 表名 * @return * @see [类、类#方法、类#成员] */ public boolean createTable(String tableName) { try { if (hbaseAdmin.tableExists(tableName)) { return false; } HTableDescriptor tableDesc = new HTableDescriptor(tableName); HColumnDescriptor fieldADesc = new HColumnDescriptor("columnName".getBytes()); fieldADesc.setBlocksize(640 * 1024); tableDesc.addFamily(fieldADesc); hbaseAdmin.createTable(tableDesc); } catch (Exception e) { e.printStackTrace(); return false; } return true; }