hbase分页查询实现

Hbase本身是没有分页查询的,我在网上找了很多资料来实现一个分页功能,在这里做了一下记录,分享给大家,有什么不足之处,请尽管指出。废话不多说,看代码。

创新互联公司是一家专注于网站设计、成都做网站与策划设计,关岭网站建设哪家好?创新互联公司做网站,专注于网站建设10年,网设计领域的专业建站公司;建站业务涵盖:关岭等地区。关岭做网站价格咨询:028-86922220

import java.io.IOException;

import java.util.LinkedHashMap;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

 

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.HTableInterface;

import org.apache.hadoop.hbase.client.HTablePool;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;

import org.apache.hadoop.hbase.filter.Filter;

import org.apache.hadoop.hbase.filter.FilterList;

import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;

import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;

import org.apache.hadoop.hbase.util.Bytes;

 

publicclass HBaseUtils {

   privatestatic Configurationconfig =null;

   privatestatic HTablePooltp =null;

   static {

       //加载集群配置

       config = HBaseConfiguration.create();

       config.set("hbase.zookeeper.quorum","xx.xx.xx");

       config.set("hbase.zookeeper.property.clientPort","2181");

       //创建表池(可伟略提高查询性能,具体说明请百度或官方API)

       tp =new HTablePool(config, 10);

    }

 

   /*

     *获取hbase的表

     */

   publicstatic HTableInterface getTable(StringtableName) {

 

       if (StringUtils.isEmpty(tableName))

           returnnull;

 

       returntp.getTable(getBytes(tableName));

    }

 

   /*转换byte数组 */

   publicstaticbyte[] getBytes(String str) {

       if (str ==null)

            str="";

 

       return Bytes.toBytes(str);

    }

 

   /**

     *查询数据

     *@param tableKey表标识

     *@param queryKey查询标识

     *@param startRow开始行

     *@param paramsMap参数集合

     *@return结果集

     */

   publicstatic TBData getDataMap(StringtableName, String startRow,

            StringstopRow, Integer currentPage, Integer pageSize)

           throws IOException {

        List>mapList =null;

        mapList =new LinkedList>();

 

        ResultScanner scanner =null;

       //为分页创建的封装类对象,下面有给出具体属性

        TBData tbData =null;

       try {

           //获取最大返回结果数量

           if (pageSize ==null || pageSize == 0L)

                pageSize = 100;

 

           if (currentPage ==null || currentPage == 0)

                currentPage = 1;

 

           //计算起始页和结束页

            IntegerfirstPage = (currentPage - 1) * pageSize;

 

            IntegerendPage = firstPage + pageSize;

 

           //从表池中取出HBASE表对象

            HTableInterfacetable = getTable(tableName);

           //获取筛选对象

            Scanscan = getScan(startRow, stopRow);

           //给筛选对象放入过滤器(true标识分页,具体方法在下面)

            scan.setFilter(packageFilters(true));

           //缓存1000条数据

            scan.setCaching(1000);

            scan.setCacheBlocks(false);

            scanner= table.getScanner(scan);

           int i = 0;

            List<byte[]> rowList =new LinkedList<byte[]>();

           //遍历扫描器对象, 并将需要查询出来的数据row key取出

           for (Result result : scanner) {

                String row = toStr(result.getRow());

               if (i >= firstPage && i< endPage) {

                    rowList.add(getBytes(row));

                }

                i++;

            }

 

           //获取取出的row key的GET对象

            ListgetList = getList(rowList);

            Result[]results = table.get(getList);

           //遍历结果

           for (Result result : results) {

                Map<byte[],byte[]> fmap = packFamilyMap(result);

                Map rmap = packRowMap(fmap);

                mapList.add(rmap);

            }

 

           //封装分页对象

            tbData=new TBData();

            tbData.setCurrentPage(currentPage);

            tbData.setPageSize(pageSize);

            tbData.setTotalCount(i);

            tbData.setTotalPage(getTotalPage(pageSize, i));

            tbData.setResultList(mapList);

        }catch (IOException e) {

            e.printStackTrace();

        }finally {

            closeScanner(scanner);

        }

 

       return tbData;

    }

 

   privatestaticint getTotalPage(int pageSize,int totalCount) {

       int n = totalCount / pageSize;

       if (totalCount % pageSize == 0) {

           return n;

        }else {

           return ((int) n) + 1;

        }

    }

 

   //获取扫描器对象

   privatestatic Scan getScan(String startRow,String stopRow) {

        Scan scan =new Scan();

        scan.setStartRow(getBytes(startRow));

        scan.setStopRow(getBytes(stopRow));

 

       return scan;

    }

 

   /**

     *封装查询条件

     */

   privatestatic FilterList packageFilters(boolean isPage) {

        FilterList filterList =null;

       // MUST_PASS_ALL(条件 AND) MUST_PASS_ONE(条件OR)

        filterList =new FilterList(FilterList.Operator.MUST_PASS_ALL);

        Filter filter1 =null;

        Filter filter2 =null;

        filter1 = newFilter(getBytes("family1"), getBytes("column1"),

                CompareOp.EQUAL, getBytes("condition1"));

        filter2 = newFilter(getBytes("family2"), getBytes("column1"),

                CompareOp.LESS, getBytes("condition2"));

        filterList.addFilter(filter1);

        filterList.addFilter(filter2);

       if (isPage) {

            filterList.addFilter(new FirstKeyOnlyFilter());

        }

       return filterList;

    }

 

   privatestatic Filter newFilter(byte[] f,byte[] c, CompareOp op,byte[] v) {

       returnnew SingleColumnValueFilter(f, c, op,v);

    }

 

   privatestaticvoid closeScanner(ResultScannerscanner) {

       if (scanner !=null)

            scanner.close();

    }

 

   /**

     *封装每行数据

     */

   privatestatic MappackRowMap(Map<byte[],byte[]> dataMap) {

        Map map =new LinkedHashMap();

 

       for (byte[] key : dataMap.keySet()) {

 

           byte[] value = dataMap.get(key);

 

            map.put(toStr(key), toStr(value));

 

        }

       return map;

    }

 

   /*根据ROW KEY集合获取GET对象集合 */

   privatestatic List getList(List<byte[]> rowList) {

        List list =new LinkedList();

       for (byte[] row : rowList) {

            Getget =new Get(row);

 

            get.addColumn(getBytes("family1"), getBytes("column1"));

            get.addColumn(getBytes("family1"), getBytes("column2"));

            get.addColumn(getBytes("family2"), getBytes("column1"));

            list.add(get);

        }

       return list;

    }

 

   /**

     *封装配置的所有字段列族

     */

   privatestatic Map<byte[],byte[]> packFamilyMap(Result result){

        Map<byte[],byte[]> dataMap =null;

        dataMap =new LinkedHashMap<byte[],byte[]>();

        dataMap.putAll(result.getFamilyMap(getBytes("family1")));

        dataMap.putAll(result.getFamilyMap(getBytes("family2")));

       return dataMap;

    }

 

   privatestatic String toStr(byte[] bt) {

       return Bytes.toString(bt);

    }

 

   publicstaticvoid main(String[] args)throws IOException {

       //拿出row key的起始行和结束行

       // #<0<9<:

        String startRow ="aaaa#";

        String stopRow ="aaaa:";

       int currentPage = 1;

       int pageSize = 20;

       //执行hbase查询

        getDataMap("table", startRow, stopRow, currentPage,pageSize);

 

    }

}

 

class TBData {

   private IntegercurrentPage;

   private IntegerpageSize;

   private IntegertotalCount;

   private IntegertotalPage;

   private List>resultList;

 

   public Integer getCurrentPage() {

       returncurrentPage;

    }

 

   publicvoid setCurrentPage(IntegercurrentPage) {

       this.currentPage = currentPage;

    }

 

   public Integer getPageSize() {

       returnpageSize;

    }

 

   publicvoid setPageSize(Integer pageSize) {

       this.pageSize = pageSize;

    }

 

   public Integer getTotalCount() {

       returntotalCount;

    }

 

   publicvoid setTotalCount(Integer totalCount){

       this.totalCount = totalCount;

    }

 

   public Integer getTotalPage() {

       returntotalPage;

    }

 

   publicvoid setTotalPage(Integer totalPage) {

       this.totalPage = totalPage;

    }

 

   public List> getResultList() {

       returnresultList;

    }

 

   publicvoidsetResultList(List> resultList) {

       this.resultList = resultList;

    }

}


新闻名称:hbase分页查询实现
文章链接:http://pwwzsj.com/article/geigig.html