飙血推荐
  • HTML教程
  • MySQL教程
  • JavaScript基础教程
  • php入门教程
  • JavaScript正则表达式运用
  • Excel函数教程
  • UEditor使用文档
  • AngularJS教程
  • ThinkPHP5.0教程

简简单单用一下Hbase-

时间:2022-05-19  作者:niejunlei  

一、Hbase 介绍

https://域名/域名#_preface

https://域名/hbase/

https://域名/archive/域名

什么是Hbase?

hadoop 数据库:分布式、可伸缩、大数据存储。

二、Hbase client

最开始引入 hbase-client,服务有使用【google/protobuf/域名o】,有很多包冲突,所以直接使用了 habase-shade-client: 

<dependency>
  <groupId>域名e</groupId>
  <artifactId>hbase-shaded-client</artifactId>
  <version>${域名域名ion}</version>
</dependency>

三、Hbase 配置

  • 域名um
    zookeeper server 地址,逗号分割。本地模式和伪集群模式下,默认为 127.0.0.1

  • 域名域名ntPort
    zookeeper server 端口,默认 2181

  • 域名域名er
    hbase client 所有操作的重试上限,默认 15。client 首先等待 域名e 执行第一次重试,之后每隔 10s 再次执行。

  • 域名out
    hbase client 一次 rpc 操作的超时时间(超时基于ping检查),默认60000ms,触发则抛出 TimeoutException 异常。

  • 域名域名out
    hbase client 一次操作的总的时间限制, 默认 1200000ms,触发则直接抛出 SocketTimeoutException 异常。

  • 示例:
    @Configuration
    public class HBaseConfig {
        @Value("${域名um}")
        private String hbaseZkQuorum;
        @Value("${域名域名ntPort:2181}")
        private String hbaseZkPort;
        @Value("${域名域名er:2}")
        private String hbaseClientRetry;
        @Value("${域名out:2000}")
        private String hbaseRpcTimeout;
        @Value("${域名域名out:3000}")
        private String hbaseClientOperationTimeout;
        @Bean
        public Connection hbaseConnection() throws IOException {
            域名域名iguration hbaseConfig = 域名te();
            域名("域名域名ntPort", hbaseZkPort);
            域名("域名um", hbaseZkQuorum);
            域名("域名域名er", hbaseClientRetry);
            域名("域名域名out", hbaseClientOperationTimeout);
            域名("域名out", hbaseRpcTimeout);
            return 域名teConnection(hbaseConfig);
        }
        @Bean
        public HbaseSimpleTemplate hbaseSimpleTemplate(@Qualifier("hbaseConnection") Connection hbaseConnection) {
            return new HbaseSimpleTemplate(hbaseConnection);
        }
    }

四、关于 Connection

1、Connection 是什么?

集群 connection 封装了底层和实际 hbase server 及 zookeeper 的连接。由 ConnectionFactory 创建并由发起端维护其整个生命周期。

承载了服务发现(hbase master 及 region server)及本地缓存维护(存储及更新)逻辑。所以基于此链接实例化而来的 Table 和 Admin 共享此信息。

2、Connection 怎么使用?

Connection 创建是一个很重的操作。

Connection 实现是 thread-safe 的。

所以通常的操作时,一次创建,到处使用。

这里我们通过 @Bean 注解,将 connection 实例交由 spring 管理,维护其从创建,使用到销毁的整个生命周期。

三、HbaseSimpleTemplate

Hbase Connection 数据操作封装:

row->column->all cells

row->column->cells

rows->column->cells

public class HbaseSimpleTemplate {
    private Connection hbaseConnection;
    public HbaseSimpleTemplate(Connection hbaseConnection) {
        域名eConnection = hbaseConnection;
    }
    /**
     * 结果映射map
     *
     * @param result
     * @return
     */
    private Map<String, String> resultToMap(Result result) {
        if (result == null || 域名pty()) {
            return new HashMap<>();
        }
        return 域名Cells().stream().collect(
                域名p(cell -> 域名ring(域名eQualifier(cell)), cell -> 域名ring(域名eValue(cell))));
    }
    /**
     * 查询
     * @param tableName
     * @param rowName
     * @param familyName
     * @return
     * @throws IOException
     */
    public Map<String, String> get(String tableName, String rowName, String familyName) throws IOException {
        Map<String, Map<String, String>> resultMap = get(tableName, 域名letonList(rowName), familyName, null);
        return 域名es().stream().findFirst().orElse(new HashMap<>());
    }
    /**
     *
     * @param tableName
     * @param rowName
     * @param familyName
     * @param qualifiers
     * @return
     * @throws IOException
     */
    public Map<String, String> get(String tableName, String rowName, String familyName, List<String> qualifiers) throws IOException {
        Map<String, Map<String, String>> resultMap = get(tableName, 域名letonList(rowName), familyName, qualifiers);
        return 域名es().stream().findFirst().orElse(new HashMap<>());
    }
    /**
     * 批量查询
     *
     * @param tableName
     * @param rowNames
     * @param familyName
     * @return
     * @throws IOException
     */
    public Map<String, Map<String, String>> get(String tableName, List<String> rowNames, String familyName, List<String> qualifiers) throws IOException {
        Map<String, Map<String, String>> resultMap = new HashMap<>();
        List<Get> gets = new ArrayList<>();
        域名ach(rowName -> {
            Get get = new Get(域名ytes());
            if (域名tEmpty(qualifiers)) {
                域名ach(qualifier -> 域名olumn(域名ytes(), 域名ytes()));
            } else {
                域名amily(域名ytes());
            }
            域名(get);
        });
        域名am(域名able(域名eOf(tableName)).get(gets))
                .forEach(result -> {
                    Map<String, String> kvMap = resultToMap(result);
                    String id = 域名tring(kvMap, "id");
                    if (域名tBlank(id)) {
                        域名(id, kvMap);
                    }
                });
        return resultMap;
    }
    /**
     * 写入 qualifier
     *
     * @param tableName
     * @param rowName
     * @param familyName
     * @param qualifier
     * @param value
     * @return
     * @throws IOException
     */
    public boolean put(String tableName, String rowName, String familyName, String qualifier, String value) throws IOException {
        Map<String, String> qv = new HashMap<>();
        域名(qualifier, value);
        put(tableName, rowName, familyName, qv);
        return true;
    }
    /**
     * 写入 qualifiers
     *
     * @param tableName
     * @param rowName
     * @param familyName
     * @param qualifierValues
     * @return
     * @throws IOException
     */
    public boolean put(String tableName, String rowName, String familyName, Map<String, String> qualifierValues) throws IOException {
        if (域名pty(qualifierValues)) {
            return false;
        }
        List<Put> puts = new ArrayList<>();
        域名ach((qualifier, value) -> 域名(new Put(域名ytes()).addColumn(域名ytes(), 域名ytes(), 域名ytes())));
        域名able(域名eOf(tableName)).put(puts);
        return true;
    }
    /**
     * 删除 
     *
     * @param tableName
     * @param rowName
     * @param familyName
     * @return
     * @throws IOException
     */
    public boolean del(String tableName, String rowName, String familyName) throws IOException {
        Delete delete = new Delete(域名ytes());
        域名amily(域名ytes());
        域名able(域名eOf(tableName)).delete(delete);
        return true;
    }
    /**
     * 删除 qualifier
     *
     * @param tableName
     * @param rowName
     * @param familyName
     * @param qualifiers
     * @return
     * @throws IOException
     */
    public boolean delQualifiers(String tableName, String rowName, String familyName, List<String> qualifiers) throws IOException {
        Delete delete = new Delete(域名ytes());
        域名ach(qualifier -> 域名olumn(域名ytes(), 域名ytes()));
        域名able(域名eOf(tableName)).delete(delete);
        return true;
    }
}

getTable:

获取 Table 实现用以访问表数据。

Table 非 thread-safe 的并且其创建很轻量,所以线程内使用需要单独创建(不需要且不应该缓存和池化)。 

标签:编程
湘ICP备14001474号-3  投诉建议:234161800@qq.com   部分内容来源于网络,如有侵权,请联系删除。