Vert.x(vertx) 连接MySQL、Oracle数据库

Vert.x提供异步访问数据库的API,可能这里有朋友会有疑惑,直接使用我们之前的熟悉的Mybatis或者Hibernate不行吗,可行,但数据库操作是一个耗时操作,使用传统的同步模型,容易阻塞线程,导致整体性能下降,因此我们对于数据库操作,需要使用Vert.x提供的异步API。

Vert.x提供的API层级非常低,可以说是仅仅在原生JDBC基础上封装了一层异步接口。所有的对数据库操作都需要通过编写SQL来完成,参数的封装和结果的获取都需要手动的来实现,对于习惯使用ORM框架的开发者可能会非常的不习惯。

先来通过一个查询数据库的案例来演示如何使用Vert.x提供的异步API

基本操作

1.引入数据库依赖,我们需要引入两个包,一个是vertx-jdbc,另一个是要真正连接数据库的驱动包,这里以MySQL为例

  1. <dependency>
  2. <groupId>io.vertx</groupId>
  3. <artifactId>vertx-jdbc-client</artifactId>
  4. <version>3.6.0</version>
  5. </depend
  6. <dependency>
  7. <groupId>mysql</groupId>
  8. <artifactId>mysql-connector-java</artifactId>
  9. <version>8.0.13</version>
  10. </dependency>

注:2019-09-25 更新,在新的版本中 3.8.1,需要引入sql-common的包,否则会有类找不到。

  1. <dependency>
  2. <groupId>io.vertx</groupId>
  3. <artifactId>vertx-sql-common</artifactId>
  4. <version>3.8.1</version>
  5. </dependency>

2.抽象出一个DbUtils来方便获取数据库客户端,为了简单,直接就将配置写到代码里了

  1. public class JdbcUtils {
  2. // 用于操作数据库的客户端
  3. private JDBCClient dbClient;
  4. public JdbcUtils(Vertx vertx) {
  5. // 构造数据库的连接信息
  6. JsonObject dbConfig = new JsonObject();
  7. dbConfig.put("url", "jdbc:mysql://192.168.40.66:3306/test");
  8. dbConfig.put("driver_class", "com.mysql.jdbc.Driver");
  9. dbConfig.put("user", "xxxx");
  10. dbConfig.put("password", "xxxx");
  11. // 创建客户端
  12. dbClient = JDBCClient.createShared(vertx, dbConfig);
  13. }
  14. // 提供一个公共方法来获取客户端
  15. public JDBCClient getDbClient() {
  16. return dbClient;
  17. }
  18. }

通过上面的工具类,可以快速的获取到客户端,看上面的代码也很简单,通过JsonObect构建一些基本的数据库连接信息,然后通过JDBCClient的createShard方法创建一个JDBCClient实例。

3.进行数据库的操作,以查询年龄大于18岁的用户为例

  1. public class JdbcTestVerticle extends AbstractVerticle {
  2. @Override
  3. public void start() throws Exception {
  4. // 获取到数据库连接的客户端
  5. JDBCClient jdbcClient = new JdbcUtils(vertx).getDbClient();
  6. String sql = "select * from t_user where age > ?";
  7. // 构造参数
  8. JsonArray params = new JsonArray().add(18);
  9. // 执行查询
  10. jdbcClient.queryWithParams(sql, params, qryRes->{
  11. if(qryRes.succeeded()) {
  12. // 获取到查询的结果,Vert.x对ResultSet进行了封装
  13. ResultSet resultSet = qryRes.result();
  14. // 把ResultSet转为List<JsonObject>形式
  15. List<JsonObject> rows = resultSet.getRows();
  16. // 输出结果
  17. System.out.println(rows);
  18. } else {
  19. System.out.println("查询数据库出错!");
  20. }
  21. });
  22. }
  23. public static void main(String[] args) {
  24. Vertx vertx = Vertx.vertx();
  25. vertx.deployVerticle(new JdbcTestVerticle());
  26. }
  27. }

JsonArray是一个数组,SQL中用到的参数可以通过构建一个JsonArray来赋值。

JsonObejct是一个Json对象,类似于阿里的fastjson中提供的JSONObject

这两个对象在Vert.x中非常常用,而且非常的好用,但一定要注意空指针的问题,这是非常让人头疼的。

优化

通过上面的三个步骤,就可成功的对数据库进行操作了,但还有些问题需要优化,比如数据库连接信息放到配置文件中,再比如使用数据库连接池等等。

* 使用配置文件

  1. {
  2. "default":{
  3. "url":"jdbc:mysql://localhost:3306/my_project",
  4. "driver_class":"com.mysql.cj.jdbc.Driver",
  5. "user":"root",
  6. "password":"root"
  7. },
  8. "prod":{
  9. "url":"jdbc:mysql://localhost:3306/my_project",
  10. "driver_class":"com.mysql.cj.jdbc.Driver",
  11. "user":"root",
  12. "password":"root"
  13. }
  14. }

修改DbUtils工具类

  1. public class JdbcUtils {
  2. private JDBCClient dbClient;
  3. private static JsonObject config ;
  4. static {
  5. byte[] buff = new byte[102400];
  6. try {
  7. // 读取配置文件
  8. InputStream ins = new FileInputStream("db.json");
  9. int i = IOUtils.read(ins, buff);
  10. config = new JsonObject(new String(buff, 0, i));
  11. } catch (Exception e) {
  12. System.out.println("读取配置文件失败");
  13. }
  14. }
  15. public JdbcUtils(Vertx vertx, String dsName) {
  16. JsonObject dbConfig = config.getJsonObject(dsName);
  17. if(dbConfig == null) {
  18. throw new RuntimeException("没有找到指定的数据源");
  19. }
  20. dbClient = JDBCClient.createShared(vertx, dbConfig);
  21. }
  22. public JdbcUtils(Vertx vertx) {
  23. this(vertx, "default");
  24. }
  25. public JDBCClient getDbClient() {
  26. return dbClient;
  27. }
  28. }

这样就支持了多个数据源,而且数据库连接配置都放到了配置文件中。

连接池配置

数据连接池默认使用的C3P0,所以可以在db.json中进行配置C3P0连接池的参数就可以了,这里官网的地址为:https://vertx.io/docs/vertx-jdbc-client/java/

具体配置可以参考官网给出的配置,下面是一个简单的截图

遗憾的是,Vert.x给出的数据库连接池的支持并不多,如果我们想要使用比如阿里的Druid连接池,需要自己来实现DataSourceProvider。当然DataSourceProvider的实现并不复杂,但麻烦啊!后面我会给出一个关于druid的DataSourceProvider的实现。

事务

Vert.x从比较低的层面来控制事务,不像Spring一样可以使用声明式事务管理。要想在Vert.x中开启事务,和传统的JDBC管理事务的方式非常类似。首先要获得到连接,然后调用连接的setAutoCommit方法,关闭事务的自动提交,然后再手动的提交和回滚事务。

因为开启事务、提交事务、执行SQL都需要和数据库服务进行通信,因此在Vert.x中都是异步操作,按传统方式实现一个事务代码非常痛苦,看下面的一段开启事务的代码。写了一遍以后,绝对不愿意再写第二遍。

1. 获得连接

  1. // 获得连接
  2. jdbcClient.getConnection(con -> {
  3. if (con.succeeded()) {
  4. System.out.println("获取到数据库连接");
  5. // 获取到的连接对象
  6. SQLConnection connection = con.result();
  7. }
  8. });

2. 设置不自动提交事务

  1. // 开启事务
  2. connection.setAutoCommit(false, (v) -> {
  3. if (v.succeeded()) {
  4. }
  5. });

3.dml操作

  1. // 执行更新操作
  2. connection.update("sql", upRes -> {
  3. if(upRes.succeed()){
  4. }
  5. });

4. 提交事务

  1. // 提交事务
  2. connection.commit(rx -> {
  3. if (rx.succeeded()) {
  4. // 事务提交成功
  5. }
  6. });

 回滚事务

  1. // 回滚事务
  2. connection.rollback(rb -> {
  3. if (rb.succeeded()) {
  4. // 事务回滚成功
  5. }
  6. });

如果你觉得上面的还很简单,看看下面一个完整的例子吧,把这些嵌套在一起,你还觉得简单吗?

  1. package stu.vertx.jdbc;
  2. import io.vertx.core.AbstractVerticle;
  3. import io.vertx.core.Vertx;
  4. import io.vertx.ext.jdbc.JDBCClient;
  5. import io.vertx.ext.sql.SQLConnection;
  6. /**
  7. * 获得数据库连接,执行查询,开启事务,执行更新操作
  8. *
  9. * @author <a href="https://blog.csdn.net/king_kgh>Kingh</a>
  10. * @version 1.0
  11. * @date 2019/4/3 9:19
  12. */
  13. public class GetConnection extends AbstractVerticle {
  14. @Override
  15. public void start() throws Exception {
  16. JDBCClient jdbcClient = new JdbcUtils(vertx).getDbClient();
  17. System.out.println("获取到数据库客户端");
  18. // 获取数据库连接
  19. jdbcClient.getConnection(con -> {
  20. if (con.succeeded()) {
  21. System.out.println("获取到数据库连接");
  22. // 获取到的连接对象
  23. SQLConnection connection = con.result();
  24. // 执行查询操作
  25. connection.query("select * from t1", rs -> {
  26. // 处理查询结果
  27. if (rs.succeeded()) {
  28. System.out.println(rs.result().getRows());
  29. }
  30. });
  31. // 开启事务
  32. connection.setAutoCommit(false, (v) -> {
  33. if (v.succeeded()) {
  34. // 事务开启成功 执行crud操作
  35. connection.update("update t1 set name = '被修改了' where name = '111'", up -> {
  36. if (up.succeeded()) {
  37. // 再来一笔写操作
  38. connection.update("insert into t1 values ('222','222222') ", up2 -> {
  39. if (up2.succeeded()) {
  40. // 提交事务
  41. connection.commit(rx -> {
  42. if (rx.succeeded()) {
  43. // 事务提交成功
  44. }
  45. });
  46. } else {
  47. connection.rollback(rb -> {
  48. if (rb.succeeded()) {
  49. // 事务回滚成功
  50. }
  51. });
  52. }
  53. });
  54. } else {
  55. connection.rollback(rb -> {
  56. if (rb.succeeded()) {
  57. // 事务回滚成功
  58. }
  59. });
  60. }
  61. });
  62. } else {
  63. System.out.println("开启事务失败");
  64. }
  65. });
  66. } else {
  67. System.out.println("获取数据库连接失败");
  68. }
  69. });
  70. }
  71. public static void main(String[] args) {
  72. Vertx.vertx().deployVerticle(new GetConnection());
  73. }
  74. }

RxJava解决多层回调嵌套问题

上面的代码仅仅是做了两个写操作,可以说是非常的痛苦了,一层一层的嵌套,根本没法维护。那么在真实的开发环境中,该如何管理事务呢,这就需要使用rxjava了,能够有效的减少多层嵌套带来的问题。使用rxjava首先是需要引入rxjava的依赖

  1. <dependency>
  2. <groupId>io.vertx</groupId>
  3. <artifactId>vertx-rx-java</artifactId>
  4. <version>3.7.0</version>
  5. </dependency>

完成上面案例的同样代码如下

  1. package stu.vertx.jdbc;
  2. import io.vertx.core.*;
  3. import io.vertx.core.json.JsonArray;
  4. import io.vertx.ext.jdbc.JDBCClient;
  5. import io.vertx.ext.sql.SQLConnection;
  6. import rx.Single;
  7. import java.util.UUID;
  8. /**
  9. * @author <a href="https://blog.csdn.net/king_kgh>Kingh</a>
  10. * @version 1.0
  11. * @date 2019/4/5 14:10
  12. */
  13. public class GetConnectionWithRxJava extends AbstractVerticle {
  14. @Override
  15. public void start() throws Exception {
  16. // 获取JDBC客户端
  17. JDBCClient jdbcClient = new JdbcUtils(vertx).getDbClient();
  18. getConnection(jdbcClient, con -> {
  19. if (con.succeeded()) {
  20. // 获取到与数据库的连接
  21. SQLConnection connection = con.result();
  22. // 开启事务
  23. rxOpenTx(connection)
  24. // 执行写操作
  25. .flatMap(this::rxExecuteUpdate1)
  26. // 执行写操作
  27. .flatMap(this::rxExecuteUpdate2)
  28. .subscribe(ok -> {
  29. // 提交事务
  30. ok.commit(v -> {
  31. });
  32. }, err -> {
  33. // 回滚事务
  34. connection.rollback(v -> {
  35. });
  36. });
  37. }
  38. });
  39. }
  40. public Single<SQLConnection> rxOpenTx(SQLConnection connection) {
  41. return Single.create(new io.vertx.rx.java.SingleOnSubscribeAdapter<>(fut -> openTx(connection, fut)));
  42. }
  43. public Single<SQLConnection> rxExecuteUpdate1(SQLConnection connection) {
  44. return Single.create(new io.vertx.rx.java.SingleOnSubscribeAdapter<>(fut -> update1(connection, fut)));
  45. }
  46. public Single<SQLConnection> rxExecuteUpdate2(SQLConnection connection) {
  47. return Single.create(new io.vertx.rx.java.SingleOnSubscribeAdapter<>(fut -> update2(connection, fut)));
  48. }
  49. public void getConnection(JDBCClient jdbcClient, Handler<AsyncResult<SQLConnection>> resultHandler) {
  50. jdbcClient.getConnection(con -> {
  51. if (con.succeeded()) {
  52. resultHandler.handle(Future.succeededFuture(con.result()));
  53. } else {
  54. resultHandler.handle(Future.failedFuture(con.cause()));
  55. }
  56. });
  57. }
  58. public void openTx(SQLConnection connection, Handler<AsyncResult<SQLConnection>> resultHandler) {
  59. connection.setAutoCommit(false, o -> {
  60. if (o.succeeded()) {
  61. resultHandler.handle(Future.succeededFuture(connection));
  62. } else {
  63. resultHandler.handle(Future.failedFuture(o.cause()));
  64. }
  65. });
  66. }
  67. public void update1(SQLConnection connection, Handler<AsyncResult<SQLConnection>> resultHandler) {
  68. connection.updateWithParams("insert into t1 values (?,?)", new JsonArray().add(UUID.randomUUID().toString()).add(UUID.randomUUID().toString()), in -> {
  69. if (in.succeeded()) {
  70. resultHandler.handle(Future.succeededFuture(connection));
  71. } else {
  72. resultHandler.handle(Future.failedFuture(in.cause()));
  73. }
  74. });
  75. }
  76. public void update2(SQLConnection connection, Handler<AsyncResult<SQLConnection>> resultHandler) {
  77. connection.update("update t1 set name = '111' where passwd = '111'", in -> {
  78. if (in.succeeded()) {
  79. resultHandler.handle(Future.succeededFuture(connection));
  80. } else {
  81. resultHandler.handle(Future.failedFuture(in.cause()));
  82. }
  83. });
  84. }
  85. public static void main(String[] args) {
  86. Vertx.vertx().deployVerticle(new GetConnectionWithRxJava());
  87. }
  88. }

通过使用RxJava,没有那么深的嵌套层次,逻辑比较清晰。当然了,为了一个简单的操作,还是需要写很多的代码。

 

Vert.x相关系列文章

(一)Vert.x 简明介绍 https://blog.csdn.net/king_kgh/article/details/80772657

(二)Vert.x创建简单的HTTP服务 https://blog.csdn.net/king_kgh/article/details/80804078

(三)Vert.x Web开发之路由 https://blog.csdn.net/king_kgh/article/details/80848571

(四)Vert.x TCP服务实现 https://blog.csdn.net/king_kgh/article/details/84870775

(五)Vert.x数据库访问 https://blog.csdn.net/king_kgh/article/details/84894599

(六)Vert.x认证和授权 https://blog.csdn.net/king_kgh/article/details/85218454

(七)Vert.x事件总线(Event Bus)与远程服务调用 https://blog.csdn.net/king_kgh/article/details/86993812

 

Vert.x 案例代码:https://github.com/happy-fly

 

笔者就职于一家互联网支付公司,公司的核心项目使用的是Vert.x技术体系。记得笔者刚进入公司,接触Vert.x的时候,找遍了大大小小的网站,发现市面上关于Vert.x的文档除了官方文档外,几乎找不到其他资料。当时就励志要出一个专栏来写写Vert.x,以此帮助在Vert.x上采坑的朋友。因为笔者能力有限,文章中难免会有错误和疏漏,如果您对文章有意见或者好的建议,可以直接留言或者发送邮件到18366131816@163.com。如果您也对Vert.感兴趣,可以加入到我们,共同学习Vert.x,并推进国内开发者对Vert.x的认知。

文章知识点与官方知识档案匹配,可进一步学习相关知识