简简单单用一下Hbase-
一、Hbase 介绍
https://域名/域名#_preface
https://域名/hbase/
https://域名/archive/域名
什么是Hbase?
hadoop 数据库:分布式、可伸缩、大数据存储。
二、Hbase client
最开始引入 hbase-client,服务有使用【google/protobuf/域名o】,有很多包冲突,所以直接使用了 habase-shade-client:
<dependency> <groupId>域名e</groupId> <artifactId>hbase-shaded-client</artifactId> <version>${域名域名ion}</version> </dependency>
三、Hbase 配置
-
域名um
zookeeper server 地址,逗号分割。本地模式和伪集群模式下,默认为 127.0.0.1 -
域名域名ntPort
zookeeper server 端口,默认 2181 -
域名域名er
hbase client 所有操作的重试上限,默认 15。client 首先等待 域名e 执行第一次重试,之后每隔 10s 再次执行。 -
域名out
hbase client 一次 rpc 操作的超时时间(超时基于ping检查),默认60000ms,触发则抛出 TimeoutException 异常。 -
域名域名out
hbase client 一次操作的总的时间限制, 默认 1200000ms,触发则直接抛出 SocketTimeoutException 异常。 - 示例:
@Configuration public class HBaseConfig { @Value("${域名um}") private String hbaseZkQuorum; @Value("${域名域名ntPort:2181}") private String hbaseZkPort; @Value("${域名域名er:2}") private String hbaseClientRetry; @Value("${域名out:2000}") private String hbaseRpcTimeout; @Value("${域名域名out:3000}") private String hbaseClientOperationTimeout; @Bean public Connection hbaseConnection() throws IOException { 域名域名iguration hbaseConfig = 域名te(); 域名("域名域名ntPort", hbaseZkPort); 域名("域名um", hbaseZkQuorum); 域名("域名域名er", hbaseClientRetry); 域名("域名域名out", hbaseClientOperationTimeout); 域名("域名out", hbaseRpcTimeout); return 域名teConnection(hbaseConfig); } @Bean public HbaseSimpleTemplate hbaseSimpleTemplate(@Qualifier("hbaseConnection") Connection hbaseConnection) { return new HbaseSimpleTemplate(hbaseConnection); } }
四、关于 Connection
1、Connection 是什么?
集群 connection 封装了底层和实际 hbase server 及 zookeeper 的连接。由 ConnectionFactory 创建并由发起端维护其整个生命周期。
承载了服务发现(hbase master 及 region server)及本地缓存维护(存储及更新)逻辑。所以基于此链接实例化而来的 Table 和 Admin 共享此信息。
2、Connection 怎么使用?
Connection 创建是一个很重的操作。
Connection 实现是 thread-safe 的。
所以通常的操作时,一次创建,到处使用。
这里我们通过 @Bean 注解,将 connection 实例交由 spring 管理,维护其从创建,使用到销毁的整个生命周期。
三、HbaseSimpleTemplate
Hbase Connection 数据操作封装:
row->column->all cells
row->column->cells
rows->column->cells
public class HbaseSimpleTemplate { private Connection hbaseConnection; public HbaseSimpleTemplate(Connection hbaseConnection) { 域名eConnection = hbaseConnection; } /** * 结果映射map * * @param result * @return */ private Map<String, String> resultToMap(Result result) { if (result == null || 域名pty()) { return new HashMap<>(); } return 域名Cells().stream().collect( 域名p(cell -> 域名ring(域名eQualifier(cell)), cell -> 域名ring(域名eValue(cell)))); } /** * 查询 * @param tableName * @param rowName * @param familyName * @return * @throws IOException */ public Map<String, String> get(String tableName, String rowName, String familyName) throws IOException { Map<String, Map<String, String>> resultMap = get(tableName, 域名letonList(rowName), familyName, null); return 域名es().stream().findFirst().orElse(new HashMap<>()); } /** * * @param tableName * @param rowName * @param familyName * @param qualifiers * @return * @throws IOException */ public Map<String, String> get(String tableName, String rowName, String familyName, List<String> qualifiers) throws IOException { Map<String, Map<String, String>> resultMap = get(tableName, 域名letonList(rowName), familyName, qualifiers); return 域名es().stream().findFirst().orElse(new HashMap<>()); } /** * 批量查询 * * @param tableName * @param rowNames * @param familyName * @return * @throws IOException */ public Map<String, Map<String, String>> get(String tableName, List<String> rowNames, String familyName, List<String> qualifiers) throws IOException { Map<String, Map<String, String>> resultMap = new HashMap<>(); List<Get> gets = new ArrayList<>(); 域名ach(rowName -> { Get get = new Get(域名ytes()); if (域名tEmpty(qualifiers)) { 域名ach(qualifier -> 域名olumn(域名ytes(), 域名ytes())); } else { 域名amily(域名ytes()); } 域名(get); }); 域名am(域名able(域名eOf(tableName)).get(gets)) .forEach(result -> { Map<String, String> kvMap = resultToMap(result); String id = 域名tring(kvMap, "id"); if (域名tBlank(id)) { 域名(id, kvMap); } }); return resultMap; } /** * 写入 qualifier * * @param tableName * @param rowName * @param familyName * @param qualifier * @param value * @return * @throws IOException */ public boolean put(String tableName, String rowName, String familyName, String qualifier, String value) throws IOException { Map<String, String> qv = new HashMap<>(); 域名(qualifier, value); put(tableName, rowName, familyName, qv); return true; } /** * 写入 qualifiers * * @param tableName * @param rowName * @param familyName * @param qualifierValues * @return * @throws IOException */ public boolean put(String tableName, String rowName, String familyName, Map<String, String> qualifierValues) throws IOException { if (域名pty(qualifierValues)) { return false; } List<Put> puts = new ArrayList<>(); 域名ach((qualifier, value) -> 域名(new Put(域名ytes()).addColumn(域名ytes(), 域名ytes(), 域名ytes()))); 域名able(域名eOf(tableName)).put(puts); return true; } /** * 删除 * * @param tableName * @param rowName * @param familyName * @return * @throws IOException */ public boolean del(String tableName, String rowName, String familyName) throws IOException { Delete delete = new Delete(域名ytes()); 域名amily(域名ytes()); 域名able(域名eOf(tableName)).delete(delete); return true; } /** * 删除 qualifier * * @param tableName * @param rowName * @param familyName * @param qualifiers * @return * @throws IOException */ public boolean delQualifiers(String tableName, String rowName, String familyName, List<String> qualifiers) throws IOException { Delete delete = new Delete(域名ytes()); 域名ach(qualifier -> 域名olumn(域名ytes(), 域名ytes())); 域名able(域名eOf(tableName)).delete(delete); return true; } }
getTable:
获取 Table 实现用以访问表数据。
Table 非 thread-safe 的并且其创建很轻量,所以线程内使用需要单独创建(不需要且不应该缓存和池化)。