HBase基础操作,包括表的增删改查过滤等
package com.snglw.basic; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; import com.snglw.util.ConnectInit; public class BasicOperator { ConnectInit ci = new ConnectInit(); Configuration conf = ci.getConfiguration(); /*建表与删表*/ public void createTable(){ String tableName = "user"; HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); HColumnDescriptor hcd = new HColumnDescriptor("info"); htd.addFamily(hcd); HBaseAdmin admin = null; try { admin = new HBaseAdmin(conf); if (admin.tableExists(tableName)){ admin.disableTable(tableName); admin.deleteTable(tableName); }else{ admin.createTable(htd); System.out.println("Table Created!"); } }catch(IOException e){ e.printStackTrace(); } finally{ if(admin != null){ try{ admin.close(); }catch(IOException e){ e.printStackTrace(); } } } } /*修改表(注意:修改表或者列时,只有表的Enabled属性为false时才能生效)*/ public void modifyTable(){ //指定表名 String tableName = "user"; //指定列族名 byte[] familyName = Bytes.toBytes("education"); HBaseAdmin admin = null; try{ //实例化HBaseAdmin对象 admin = new HBaseAdmin(conf); //获取表描述信息对象 HTableDescriptor htd = admin.getTableDescriptor(Bytes.toBytes(tableName)); //修改前,判断表是否有指定列族 if(!htd.hasFamily(familyName)){ //创建列描述信息 HColumnDescriptor hcd = new HColumnDescriptor(familyName); htd.addFamily(hcd); //修改表前,需要disable表,使其下线 admin.disableTable(tableName); //提交modifyTable请求 admin.modifyTable(tableName, htd); //修改完成之后,使表上线 admin.enableTable(tableName); } }catch(IOException e){ e.printStackTrace(); }finally{ if(admin != null){ try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } } } /*插入数据*/ public void put(){ //指定表名 String tableName = "user"; //指定列族名 byte[] familyName = Bytes.toBytes("info"); //指定列名 byte[][] qualifiers = {Bytes.toBytes("name"),Bytes.toBytes("gender"), Bytes.toBytes("age"),Bytes.toBytes("address")}; HTable table = null; try{ //实例化一个HTable对象 table = new HTable(conf,tableName); Listputs = new ArrayList (); //实例化一个Put对象 Put put = new Put(Bytes.toBytes("012005000201")); put.addImmutable(familyName, qualifiers[0],Bytes.toBytes("张三")); put.addImmutable(familyName, qualifiers[1],Bytes.toBytes("男")); put.addImmutable(familyName, qualifiers[2],Bytes.toBytes(new Long(19))); put.addImmutable(familyName, qualifiers[3],Bytes.toBytes("广东省深圳市")); puts.add(put); put = new Put(Bytes.toBytes("012005000202")); put.addImmutable(familyName, qualifiers[0],Bytes.toBytes("李")); put.addImmutable(familyName, qualifiers[1],Bytes.toBytes("女")); put.addImmutable(familyName, qualifiers[2],Bytes.toBytes(new Long(23))); put.addImmutable(familyName, qualifiers[3],Bytes.toBytes("山西省大同市")); puts.add(put); put = new Put(Bytes.toBytes("012005000203")); put.addImmutable(familyName, qualifiers[0],Bytes.toBytes("王")); put.addImmutable(familyName, qualifiers[1],Bytes.toBytes("男")); put.addImmutable(familyName, qualifiers[2],Bytes.toBytes(new Long(26))); put.addImmutable(familyName, qualifiers[3],Bytes.toBytes("浙江省宁波市")); puts.add(put); //提交put数据请求 table.put(puts); }catch(IOException e){ e.printStackTrace(); }finally{ if(table != null){ try{ //关闭HTable对象 table.close(); }catch(IOException e){ e.printStackTrace(); } } } } /*删除数据*/ public void delete(){ String tableName = "user"; //指定rowKey值,即编号为012005000201 byte[] rowKey = Bytes.toBytes("012005000201"); HTable table = null; try{ table = new HTable(conf,tableName); Delete delete = new Delete(rowKey); //提交一次delete数据请求 table.delete(delete); }catch(IOException e){ e.printStackTrace(); }finally{ if(table != null){ try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } } /*使用Get读取数据*/ public void get(){ String tableName = "user"; //指定列族名 byte[] familyName = Bytes.toBytes("info"); //指定列名 byte[][] qualifier = {Bytes.toBytes("name"),Bytes.toBytes("address")}; //指定rowKey值 byte[] rowKey = Bytes.toBytes("012005000202"); HTable table = null; try{ table = new HTable(conf,tableName); //实例化get对象 Get get = new Get(rowKey); //设置列族和列名 get.addColumn(familyName,qualifier[0]); get.addColumn(familyName,qualifier[1]); //提交请求 Result result = table.get(get); for(Cell cell:result.rawCells()){ System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + ":" + Bytes.toString(CellUtil.cloneValue(cell))); } }catch(IOException e){ e.printStackTrace(); }finally{ if(table != null){ try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } } /*使用scan读取数据*/ public void scan(){ String tableName = "webPage"; HTable table = null; try{ table = new HTable(conf,tableName); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("webPageInfo"),Bytes.toBytes("doc")); //设置缓存大小 scan.setCaching(5000); scan.setBatch(2); //实例化一个ResultScanner对象 ResultScanner rScanner = null; //提交请求 rScanner = table.getScanner(scan); for(Result r = rScanner.next();r != null;r = rScanner.next()){ for(Cell cell:r.rawCells()){ System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + ":" + Bytes.toString(CellUtil.cloneValue(cell))); } } }catch(IOException e){ e.printStackTrace(); }finally{ if(table != null){ try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } } /*使用过滤器*/ public void singleColumnValueFilter(){ String tableName = "user"; HTable table = null; try{ table = new HTable(conf,tableName); //实例化一个Scan对象 Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name")); //设置过滤条件 SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes("name"), CompareOp.EQUAL,Bytes.toBytes("王")); scan.setFilter(filter); //实例化ResultScanner对象 ResultScanner rScanner = null; rScanner = table.getScanner(scan); for(Result r = rScanner.next();r != null;r = rScanner.next()){ for(Cell cell:r.rawCells()){ System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + ":" + Bytes.toString(CellUtil.cloneValue(cell))); } } }catch(IOException e){ e.printStackTrace(); }finally{ if(table != null){ try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } } /*使用FilterList过滤器*/ public void filterList(){ String tableName = "user"; HTable table = null; try{ table = new HTable(conf,tableName); //实例化一个Scan对象 Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name")); //实例化FilterList对象,里各个filter的是"and"关系 FilterList list = new FilterList(Operator.MUST_PASS_ALL); //设置过滤条件(age>20的数据) list.addFilter(new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes("age"), CompareOp.GREATER_OR_EQUAL,Bytes.toBytes(new Long(20)))); //获取age<=29的数据 list.addFilter(new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes("age"), CompareOp.GREATER_OR_EQUAL,Bytes.toBytes(new Long(29)))); scan.setFilter(list); //实例化ResultScanner对象 ResultScanner rScanner = null; rScanner = table.getScanner(scan); for(Result r = rScanner.next();r != null;r = rScanner.next()){ for(Cell cell:r.rawCells()){ System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + ":" + Bytes.toString(CellUtil.cloneValue(cell))); } } }catch(IOException e){ e.printStackTrace(); }finally{ if(table != null){ try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } } /*聚合函数Aggregate*/ public void aggregate(){ //指定表名 byte[] tableName = Bytes.toBytes("user"); //指定列族名 byte[] family = Bytes.toBytes("info"); AggregationClient aggregationClient = new AggregationClient(conf); //实例化scan对象 Scan scan = new Scan(); scan.addFamily(family); scan.addColumn(family,Bytes.toBytes("age")); try{ //获取行数 long rowCount = aggregationClient.rowCount(TableName.valueOf(tableName),null,scan); System.out.println("row count is "+rowCount); //获取最大值 long max = aggregationClient.max(TableName.valueOf(tableName),new LongColumnInterpreter(),scan); System.out.println("max number is "+max); //获取最小值 long min = aggregationClient.min(TableName.valueOf(tableName),new LongColumnInterpreter(),scan); System.out.println("min number is "+min); }catch(Throwable e){ e.printStackTrace(); } } public static void main(String[] args){ BasicOperator bo = new BasicOperator(); bo.aggregate(); } }
当前标题:HBase基础操作,包括表的增删改查过滤等
地址分享:http://pwwzsj.com/article/igcppc.html