hbase orm 中间层 hbasedao
背景
hbase 是分布式的 kv(key value) 存储系统,hbase 提供的针对底层数据的操作也是基于 kv 维度的,使用起来更像是Map的操作方式。但是上层业务应用一般是采用的面向对象的设计,这就导致了使用 hbase 的底层 api 必须要写很多的代码来做KV原始数据到上层业务对象的转化。
使用关系型数据库如 MySQL,也会遇到关系型数据跟对象之间的适配问题,所以出现了很多成熟的做对象关系映射的产品,像 hibernate
和 mybatis
。
hbase 当然也需要一个类似的东西,来做对象到 kv 数据的适配,这样上层应用可以继续专注于上层 面向对象的方式的开发,而不用直接操作kv数据,提高工作效率。
hbase使用场景
hbase 要解决的问题是海量数据的分布式存储,传统数据库如 mysql 解决这个问题也是有办法的,可以通过分库分表的方式做到。 所以,hbase 和 mysql 的竞争点是在需要 mysql 分库分表的情况。下面以一个很常见的场景举例,来看之前数据存储使用mysql的场景,迁移hbase上的过程。
问题描述:用户的基本信息(包括:用户头像,上次登录时间),好友关系(正向 我关注的人,反向关注我的人)
mysql的方式:
用户信息表
列名 | 说明 |
---|---|
id | 用户id |
head_image | 头像 |
last_visit_time | 上次登录时间 |
用户关注表
列名 | 说明 |
---|---|
userId | 用户id |
follow_user | 关注的人 |
用户粉丝表
列名 | 说明 |
---|---|
userId | 用户id |
be_followed_user | 关注我的人 |
之所以用户关系两张表来保存是因为,用户关系数据量比较大的情况下,采用分库分表存储的方案,又要提供正向、反向的查询,所以需要分别以关注者和被关注者为路由字段存储两份。
hbase的方式:
rowKey | 列簇 | 列 |
---|---|---|
userId | info_cf(单version) | head_image |
last_visit_time | ||
relation_cf (多个version) | follow_user | |
be_followed_user |
存入 hbase 的数据的逻辑结构会是: hbase_data.png
通过上面 mysql 表到 hbase 表的迁移过程,可以清楚地看到:mysql 中 和 userId 一对一的用户基本信息迁移到 hbase 可以用无 version 特性的列簇保存;一对多的关注关系迁移到 hbase,可以利用hbase 多 version 的列簇保存。下面介绍的hbasedao中间层就会主要解决mysql迁移到 hbase 的适配,包括单 version(一对一)和多 version(一对多)的情形。
NOTE: hbase 多 version 的列簇,不同column之间没有任何对应关系,所以不要尝试在不同的 column 之间找行的对应关系。
hbasedao介绍
hbasedao 是一个简单地解决kv数据到业务对象适配的中间层,类似关系型数据库orm中间层mybatis。
针对于 hbase 存储结构抽象出来的类结构:
habasedao_realtion.png
使用
封装业务对象成 DO 类,一个 rowKey 对应一个 DO 类的对象,通过指定DO类里的 column,中间层可以做到针对于指定 column 的查询。使用 hbasedao 之后,查询的方式如下:
public UserHBaseDO get(String rowKey) throws HBaseDAOException {
HBaseDO hBaseDO = new HBaseDO();
hBaseDO.setTableName(UserHBaseDO.TABLE_NAME);
hBaseDO.setRowKey(rowKey);
hBaseDO.addColumnFamily(UserHBaseDO.CF_NAME_info_cf)
.putColumn(UserHBaseDO.CL_NAME_head_image)
.putColumn(UserHBaseDO.CL_NAME_last_visit_time);
hBaseDO.addColumnFamily(UserHBaseDO.CF_NAME_relation_cf, 10)
.putColumn(UserHBaseDO.CL_NAME_follow_user)
.putColumn(UserHBaseDO.CL_NAME_be_followed_user);
super.getHbaseDao().get(hBaseDO);
UserHBaseDO userHBaseDO = null;
try {
userHBaseDO = new UserHBaseDO(hBaseDO);
} catch (UnsupportedEncodingException e) {
new HBaseDAOException(e);
}
return userHBaseDO;
}
支持的 API
- 以对象的方式插入数据
- 批量插入数据
- 传入 rowKey,查询对象。可支持针对不同的列簇指定查询的 version 数量,支持指定列簇中数据的时间范围
- 传入多个 rowKey,查询对象的列表
- 删除指定的 rowKey 的一行记录
- 指定开始扫描的 rowKey,开始按行扫描数据,可以限制扫描的结果集大小
public interface HBaseDAO {
public void put(HBaseDO hBaseDO) throws HBaseDAOException;
public void putList(List<HBaseDO> hBaseDOList, String tableName)
throws HBaseDAOException;
public void get(HBaseDO hBaseDO) throws HBaseDAOException;
public void getList(List<HBaseDO> hBaseDOList, String tableName)
throws HBaseDAOException;
public void delete(HBaseDO hBaseDO) throws HBaseDAOException;
public List<HBaseDO> scan(String tableName, String startRow, String endRow,
int maxVersion, int maxResultSize, Map<String, List<String>> columnFamilyMap)
throws HBaseDAOException;
}
使用举例(step by step)
代码工程结构,sample 包里提供了详细的使用举例。可以看到,使用时只需要定义 hbase 表对应的 DO 类,在此类中编写对象和 hbase 里数据的对应关系,在 DAO 层以上就可以提供和 mybatis 类似的接口访问形式。
├── pom.xml
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── taobao
│ │ │ └── hbasedao
│ │ │ ├── ColumnFamilyInfo.java
│ │ │ ├── HBaseCell.java
│ │ │ ├── HBaseClientDaoSupport.java
│ │ │ ├── HBaseColumnFamily.java
│ │ │ ├── HBaseConsole.java
│ │ │ ├── HBaseDAO.java
│ │ │ ├── HBaseDAOException.java
│ │ │ ├── HBaseDAOFactory.java
│ │ │ ├── HBaseDAOImpl.java
│ │ │ ├── HBaseDO.java
│ │ │ ├── HBaseVO.java
│ │ │ ├── MapAble.java
│ │ │ └── sample
│ │ │ ├── dao
│ │ │ │ └── UserHBaseDAO.java
│ │ │ ├── dataobject
│ │ │ │ └── UserHBaseDO.java
│ │ │ └── vo
│ │ │ └── FollowerVO.java
│ │ └── resources
│ └── test
│ ├── java
│ │ └── com
│ │ └── taobao
│ │ └── hbasedao
│ │ └── sample
│ │ └── test
│ │ └── UserHBaseDAOTest.java
│ └── resources
│ └── hbase-dao.xml
步骤
- 引入依赖
<dependency>
<groupId>com.taobao.hbasedao</groupId>
<artifactId>hbasedao</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
- 定义hbae表对应的DO类,相当于使用 mybati s时,定义的 sqlmap,需要给出 hbase 表的 schema 信息,如:列簇名、列名、列簇的最大 version,列簇和列的绑定关系等。
public class UserHBaseDO implements MapAble<UserHBaseDO> {
public static final String TABLE_NAME = "hbasedao_user";
public static final String CF_NAME_info_cf = "info_cf";
public static final int MAX_SIZE_CF_NAME_info_cf = 1;
public static final String CF_NAME_relation_cf = "relation_cf";
public static final int MAX_SIZE_CF_NAME_relation_cf = 1000;
public static final String CL_NAME_head_image = "head_image";
public static final String CL_NAME_last_visit_time = "last_visit_time";
public static final String CL_NAME_follow_user = "follow_user";
public static final String CL_NAME_be_followed_user = "be_followed_user";
private static Map<String, List<String>> columnFamilyMap;
static {
columnFamilyMap = new HashMap<String, List<String>>();
columnFamilyMap.put(CF_NAME_info_cf,
Arrays.asList(new String[] { CL_NAME_head_image, CL_NAME_last_visit_time }));
columnFamilyMap.put(CF_NAME_relation_cf,
Arrays.asList(new String[] { CL_NAME_follow_user, CL_NAME_be_followed_user }));
}
private String rowKey;
private String headImage;
private long lastVisitTime;
private final List<FollowerVO> followVOList = new ArrayList<FollowerVO>();
private final List<FollowerVO> beFollowedVOList = new ArrayList<FollowerVO>();
- 实现 MapAble 接口,MapAble 接口包括两个方法,一个是插入数据时将用户定义的 UserHBaseDO 转化为框架使用的 HBaseDO,一个是查询时将 HBaseDO 里包含的数据转为为用户定义 UserHBaseDO。
@Override
public HBaseDO converDOToHBaseDO() {
HBaseDO hbaseDO = new HBaseDO();
hbaseDO.setRowKey(this.rowKey);
hbaseDO.setTableName(UserHBaseDO.TABLE_NAME);
for (Map.Entry<String, List<String>> columnFamilyEntry :
columnFamilyMap.entrySet()) {
String columnFamilyName = columnFamilyEntry.getKey();
List<String> columnNameList = columnFamilyEntry.getValue();
HBaseColumnFamily columnFamily = new HBaseColumnFamily();
for (String columnName : columnNameList) {
if (CF_NAME_info_cf.equals(columnFamilyName) &&
CL_NAME_head_image.equals(columnName)) {
if (this.headImage != null) {
List<HBaseCell> cellDOList = new ArrayList<HBaseCell>();
HBaseCell cellDO = new HBaseCell();
cellDO.setValue(this.headImage);
cellDOList.add(cellDO);
columnFamily.putColumn(columnName, cellDOList);
}
}
...
}
hbaseDO.getColumnFamilyMap().put(columnFamilyName, columnFamily);
}
return hbaseDO;
}
@Override
public UserHBaseDO convertHBaseDOToDO(HBaseDO hBaseDO) throws UnsupportedEncodingException {
this.rowKey = hBaseDO.getRowKey();
List<KeyValue> results = hBaseDO.getResults();
for (Map.Entry<String, List<String>> columnFamilyEntry :
columnFamilyMap.entrySet()) {
String columnFamilyName = columnFamilyEntry.getKey();
List<String> columnNameList = columnFamilyEntry.getValue();
for (String columnName : columnNameList) {
if (CF_NAME_info_cf.equals(columnFamilyName) &&
CL_NAME_head_image.equals(columnName)) {
for (KeyValue kv : results) {
if (columnFamilyName.equals(new String(kv.getFamily()))
&& columnName.equals(new String(kv.getQualifier()))) {
this.headImage = new String(kv.getValue(), "UTF-8");
}
}
}
...
}
}
return this;
}
- 然后就是定义 DAO 类,主要的逻辑已经在 UserHBaseDO 写了,这里的实现可以足够简单,查询的代码在 hbasedao 介绍里已经给出,插入数据的代码如下:
public void insert(UserHBaseDO userHBaseDO) throws HBaseDAOException {
super.getHbaseDao().put(userHBaseDO.converDOToHBaseDO());
}
- 编写测试用例
@Test
public void test_insert() throws HBaseDAOException {
ApplicationContext context = new ClassPathXmlApplicationContext("hbase-dao.xml");
UserHBaseDAO userHBaseDAO = context.getBean("userHBaseDAO", UserHBaseDAO.class);
UserHBaseDO userHBaseDO = new UserHBaseDO();
userHBaseDO.setRowKey("2222");
userHBaseDO.setHeadImage("icon1.jpg");
userHBaseDO.setLastVisitTime(4820023);
int i = 0;
FollowerVO followerVO1 = new FollowerVO("444_errik");
followerVO1.setTimeStamp(System.currentTimeMillis() + i++);
FollowerVO followerVO2 = new FollowerVO("555_wiie");
followerVO2.setTimeStamp(System.currentTimeMillis() + i++);
FollowerVO followerVO3 = new FollowerVO("666_gate");
followerVO3.setTimeStamp(System.currentTimeMillis() + i++);
userHBaseDO.getFollowVOList().add(followerVO1);
userHBaseDO.getFollowVOList().add(followerVO2);
userHBaseDO.getFollowVOList().add(followerVO3);
FollowerVO followerVO4 = new FollowerVO("999_hena");
userHBaseDO.getBeFollowedVOList().add(followerVO4);
userHBaseDAO.insert(userHBaseDO);
}
@Test
public void test_get() throws HBaseDAOException {
ApplicationContext context = new ClassPathXmlApplicationContext("hbase-dao.xml");
UserHBaseDAO userHBaseDAO = context.getBean("userHBaseDAO", UserHBaseDAO.class);
UserHBaseDO userHBaseDO = userHBaseDAO.get("2222");
System.out.println(userHBaseDO);
}
完整地代码示例请参考 com.taobao.hbasedao.sample
代码工程在:hbasedao