avatar

目录
模拟电子钱包账户并发转账环境产生的脏数据及改进办法(java+mysql)

场景简述

假设用户 A 线上消费,其账户内有余额 100 元,现在他给 B 商户付款 99 元,写成最简单的业务逻辑大概是这样的:

    1. 数据库查询(SELECT)A 账户余额(100)
    1. 判断 A 账户余额是否够支付抵扣(100 - 99 >= 0)
    1. 如果够抵扣,数据库执行扣款操作(UPDATE),商户 B 加款(A->Banance -= 99, B->Banance += 99);如果不够抵扣,不执行操作

这是一个典型的单线程消费模式,显然如果每次都执行这一个操作,这个算法是正确无误的,下面进行一个稍微复杂的操作,假设 A 线上消费,A 转账,其账户内有余额 100 元,现在他给 B 商户付款 100 元,给商户 C 转账 100 元,且线上消费和转账是同时进行的,那么会产生多种可能,把所有的操作步骤拆分成以下 6 个步骤:

===

$1 表示账户余额

$2 表示消费金额

===

    1. A11 数据库查询(SELECT)A 账户余额($1) => trans for B
    1. A12 判断 A 账户余额是否够支付抵扣($1- $2 >= 0) => trans for B
    1. A13 如果够抵扣,数据库执行扣款操作(UPDATE),商户 B 加款(A->Banance -= $2, B->Banance += $2) => trans for B;如果不够抵扣,不执行操作,提示余额不足 => trans for B
    1. A21 数据库查询(SELECT)A 账户余额($1) => trans for C
    1. A22 判断 A 账户余额是否够支付抵扣($1- $2 >= 0) => trans for C
    1. A23 如果够抵扣,数据库执行扣款操作(UPDATE),商户 C 加款(A->Banance -= $2, B->Banance += $2) => trans for C;如果不够抵扣,不执行操作,提示余额不足 => trans for C

执行情况 1(序号相同的表示近乎同一时刻执行,序号越大执行顺序越偏后,单个线程的执行方式为同步执行):

Thread1: A11(0)->A12(1)->A13(2)

Thread2: A21(3)->A22(4)->A23(5)

执行情况 2(序号相同的表示近乎同一时刻执行,序号越大执行顺序越偏后,单个线程的执行方式为同步执行):

Thread1: A11(0)->A12(1)->A13(2)

Thread2: A21(0)->A22(1)->A23(2)

这里仅仅列举两种很特殊的情况作为例子说明,执行情况要远远复杂于上述两种情况

对于执行情况 1,向商户 B 支付扣款成功,而给 C 转账时,系统会提示余额不足

对于执行情况 2,向商户 B 支付扣款成功,给 C 转账扣款会仍然成功

执行结果的不同主要在于 A13 步骤的 UPDATE 事务提交操作,如果在 A13 的 UPDATE 成功之前执行 A21,那么 A21 查询出来的账户余额仍然是 100,依然可以进行消费,而事实却是此时正在进行的 A11 消费还没扣款成功,这样便使得一份余额重复使用,也就是产生了脏数据。而情况 1 是一个非常理想的情况,实际执行过程中个几乎不会发生 A13 执行完毕后,才会执行 A21,因为数据库的 SELECT 操作速度要快于 UPDATE 操作,因此很大可能是 A13 执行 UPDATE 之前,A11 和 A21 就已经完成了数据的查询,从而重复使用同一份余额。当然上面讲的 2 种情况是 2 个极端情况下的理想情况,实际情况更复杂更微妙。实际情况下,这样执行通常会会产生脏数据。

实验环境搭建

我们首先搭建一个上述提到的消费场景环境,模拟其产生脏数据的过程。这里用 java+mysql 来实现,采用 maven 项目构建。

参数名 参数值
数据库地址 127.0.0.1
帐户 root
密码 root
端口 3306
字符编码 UTF-8
数据库名 studyjpa
数据表名 bank_account

【自己搭建环境修改响应参数】

建立 maven 项目

  • 引入 jdbc 依赖,修改默认 jdk 编译版本为 jdk1.8
xml
application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.25</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

建立实验数据表,E-R 图如下

sql
1
2
3
4
5
6
7
CREATE TABLE `bank_account` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '客户编号',
`balance` int(11) unsigned DEFAULT NULL COMMENT '账户余额-分为单位',
`card_number` varchar(255) DEFAULT NULL,
`customer` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;
  • 用 InnoDB 引擎

建立数据表实体类 account

工程目录结构如下:

两个包:

  • package Bank
  • package Bank.dbBean

在 Bank.dbBean 下建立 bank_account 表的实体类

printInfo 方法可输出一个 account 对象内存储的数据

java
account.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package Bank.dbBean;

public class account {

private int id;
private int balance;
private String card_number;
private String customer;

public account(){
id = -1;
balance = 0;
card_number = "";
customer = "";
}

public void printInfo(){
System.out.println("[ id => " + id + " , balance => " + balance + " , card_number => " + card_number + " , customer => " + customer + " ]");
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public int getBalance() {
return balance;
}

public void setBalance(int balance) {
this.balance = balance;
}

public String getCard_number() {
return card_number;
}

public void setCard_number(String card_number) {
this.card_number = card_number;
}

public String getCustomer() {
return customer;
}

public void setCustomer(String customer) {
this.customer = customer;
}
}

建立数据库操作层

包括以下两个方法:

    1. 通过卡号查询对应客户所有信息
    • 通过卡号查询信息
    • @param String cardNumber
    • @return account account
    1. 转账操作
    • @param String senderCardNumber
    • @param String receiverCardNumber
    • @param int transCount
    • 转账之前,转账完成后,都输出 sender 和 receiver 的账户信息,以供比对
    • void 异步执行
    • 提交 UPDATE 事务时单开了个线程,提高效率
java
Bean.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package Bank;

import Bank.dbBean.account;

import java.sql.*;

import java.util.ArrayList;
import java.util.List;

/**
* 简易Bean
*/

public class Bean {

// 参数配置
private static final String dbName = "studyjpa";
private static final String URL = "jdbc:mysql://127.0.0.1:3306/" + dbName + "?useUnicode=true&characterEncoding=UTF-8";
private static final String user = "root";
private static final String password = "root";
// 数据库句柄
private static Connection conn;
// SQL模板
private static PreparedStatement sql;
// res
private static ResultSet res;

/**
* 自动连接数据库
*/
public Bean(){

try{
Class.forName("com.mysql.jdbc.Driver"); // 加载驱动
}catch(ClassNotFoundException e) {
// 捕捉到错误
System.out.println("ClassNotFoundException!" + e);
}

// 连接至数据库
try {
conn = DriverManager.getConnection(URL, user, password);
conn.setAutoCommit(false);
System.out.println(dbName + " opened SUCCESS");
}catch(SQLException e) {
// 捕捉到错误
System.out.println(dbName + " opened error!" + e);
}
}

/**
* 通过卡号查询信息
* @param String cardNumber
* @return account account
*/
public account findByCardNumber(String cardNumber){
account r = new account();
try{
// System.out.println("conn.isClosed=" + conn.isClosed());
// 构造SQL模板
sql = conn.prepareStatement("SELECT `id`, `balance`, `card_number`, `customer` FROM `bank_account` WHERE `card_number`=?");
// 填充模板
sql.setString(1, cardNumber);
// 执行SQL语句
res = sql.executeQuery();
if(res.next()){
r.setId(res.getInt("id"));
r.setBalance(res.getInt("balance"));
r.setCard_number(res.getString("card_number"));
r.setCustomer(res.getString("customer"));
}
}catch(Exception e) {
// 捕捉到错误
System.out.println("Exception! FROM SELECT : " + e + " card_number: " + cardNumber);

}
return r;
}

/**
* 转账操作
* @param String senderCardNumber
* @param String receiverCardNumber
* @param int transCount
* @return List<account> - del
* index0=>senderBeforeSend, index1=>receiverBeforeReceive - del
* index2=>senderAfterSend, index3=>receiverAfterReceive - del
* void type 异步执行
*/
public void transMoney(String senderCardNumber, String receiverCardNumber, int transCount){
// 初始化
account sender0 = findByCardNumber(senderCardNumber);
account receiver0 = findByCardNumber(receiverCardNumber);
List<account> r = new ArrayList<account>(){
{
add(sender0);
add(receiver0);
add(sender0);
add(receiver0);
}
};
// 执行转账
int senderBalance = sender0.getBalance();
int receiverBalance = receiver0.getBalance();
// sender 账户余额是否充足
if(senderBalance - transCount >= 0){
senderBalance -= transCount;
receiverBalance += transCount;
try{
// 构造SQL模板
sql = conn.prepareStatement("UPDATE `bank_account` SET `balance`=? WHERE `card_number`=?");
// 填充模板
sql.setInt(1, senderBalance);
sql.setString(2, senderCardNumber);
sql.executeUpdate();

// 填充模板
sql.setInt(1, receiverBalance);
sql.setString(2, receiverCardNumber);
sql.executeUpdate();

// 开个新线程处理
new Thread(() -> {
// 这里不出错就不会再出错了
try {
conn.commit();
List<account> r1 = new ArrayList<account>(){
{
add(sender0);
add(receiver0);
add(findByCardNumber(senderCardNumber));
add(findByCardNumber(receiverCardNumber));
}
};
for (account item : r1) {
item.printInfo();
}
}catch(SQLException e) {
// 捕捉到错误
try{
conn.rollback();
System.out.println("new Thread update fail - been rollback!" + e);
}catch(SQLException e1){
System.out.println("rollback fail!" + e1);
}
System.out.println("new Thread update fail! " + e);
}
}).start();
}catch(SQLException e) {
// 捕捉到错误
try{
conn.rollback();
System.out.println("been rollback!" + e);
}catch(SQLException e1){
System.out.println("rollback fail!" + e1);
}
System.out.println("SQLException!" + e);
for (account item : r) {
item.printInfo();
}
}
}else{
System.out.println("senderCardNumber: " + senderCardNumber + ", Balance cannot afford, transCount: " + transCount);
for (account item : r) {
item.printInfo();
}
}
}
}

创建模拟数据

向 bank_account 表中插入 4 条记录

sql
1
2
3
4
5
6
7
-- ----------------------------
-- Records of bank_account
-- ----------------------------
INSERT INTO `bank_account` VALUES (1, 200, '1000', 'ZhangSan');
INSERT INTO `bank_account` VALUES (2, 0, '1001', 'LiSi');
INSERT INTO `bank_account` VALUES (3, 0, '1002', 'WangWu');
INSERT INTO `bank_account` VALUES (4, 0, '1003', 'ZhaoLiu');
余额 卡号 客户姓名
200 1000 张三
0 1001 李四
0 1002 王五
0 1003 赵六

模拟单线程消费操作 BankDemo0

该步骤用来检验上述写的基本业务逻辑是否正常、运算正确

在 BankDemo0 类中创建启动入口函数,同步执行转账操作

java
BankDemo0.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package Bank;

public class BankDemo0 {

public static void main(String[] args){

Bean bean = new Bean();
String sender = "1000";
String Lisi = "1001";
String WangWu = "1002";
String ZhaoLiu = "1003";
String[] acc = new String[3];
acc[0] = Lisi;
acc[1] = WangWu;
acc[2] = ZhaoLiu;
int i;
int transCount = 100;
for(i = 0; i < 3; i++) {
bean.transMoney(sender, acc[i], transCount);
}

}

}

该方法模拟了张三(sender,卡号 1000)分别(非并发)向李四、王五、赵六三人转账 100 分(1 元=100 分,数据库中以分为单位计数)的操作。

我们执行一下,查看结果:

运行前,数据表展示:

运行后,调试台输出:

studyjpa opened SUCCESS
[ id => 1 , balance => 200 , card_number => 1000 , customer => ZhangSan ]
[ id => 2 , balance => 0 , card_number => 1001 , customer => LiSi ]
[ id => 1 , balance => 100 , card_number => 1000 , customer => ZhangSan ]
[ id => 2 , balance => 100 , card_number => 1001 , customer => LiSi ]
senderCardNumber: 1000, Balance cannot afford, transCount: 100
[ id => 1 , balance => 0 , card_number => 1000 , customer => ZhangSan ]
[ id => 4 , balance => 0 , card_number => 1003 , customer => ZhaoLiu ]
[ id => 1 , balance => 100 , card_number => 1000 , customer => ZhangSan ]
[ id => 1 , balance => 0 , card_number => 1000 , customer => ZhangSan ]
[ id => 4 , balance => 0 , card_number => 1003 , customer => ZhaoLiu ]
[ id => 3 , balance => 0 , card_number => 1000 , customer => ZhangSan ]
[ id => 1 , balance => 0 , card_number => 1000 , customer => ZhangSan ]
[ id => 3 , balance => 100 , card_number => 1003 , customer => ZhaoLiu ]
Process finished with exit code 0

数据表展示:

业务逻辑执行的非常正确,按照李四、王五、赵六的先后顺序转账,在对赵六进行转账时系统提示余额不足:

senderCardNumber: 1000, Balance cannot afford, transCount: 100

模拟多线程并发消费操作 BankDemo1

在 BankDemo1 类中创建启动入口函数,新建线程池,执行并发

java
BankDemo1.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package Bank;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BankDemo1 {
public static void main(String[] args){

ExecutorService executor1 = Executors.newCachedThreadPool();
Bean bean = new Bean();
String sender = "1000";
String Lisi = "1001";
String WangWu = "1002";
String ZhaoLiu = "1003";
String[] acc = new String[3];
acc[0] = Lisi;
acc[1] = WangWu;
acc[2] = ZhaoLiu;
int i;
int transCount = 100;
for(i = 0; i < 3; i++){
String temp = acc[i];
executor1.submit(() -> {
try{
bean.transMoney(sender, temp, transCount);
}catch(Exception e) {
e.printStackTrace();
}
}
);
}
executor1.shutdownNow();
}
}

该方法模拟了张三(sender,卡号 1000)同时(并发)向李四、王五、赵六三人转账 100 分(1 元=100 分,数据库中以分为单位计数)的操作。

(李四、王五、赵六账户余额均为 0 元)

我们执行一下,查看结果:

运行前:

数据表展示:

运行后:

调试台输出:

studyjpa opened SUCCESS
Exception! FROM SELECT : java.lang.NullPointerException card_number: 1000
Exception! FROM SELECT : java.sql.SQLException: Before start of result set card_number: 1000
[ id => 1 , balance => 200 , card_number => 1000 , customer => WangWu ]
[ id => 1 , balance => 200 , card_number =>  , customer =>  ]
[ id => 3 , balance => 0 , card_number => 1002 , customer => WangWu ]
[ id => 1 , balance => 100 , card_number => 1000 , customer => ZhangSan ]
[ id => 3 , balance => 0 , card_number => 1003 , customer => ZhaoLiu ]
[ id => 1 , balance => 200 , card_number => 1000 , customer => WangWu ]
[ id => 4 , balance => 0 , card_number => 1003 , customer => ZhaoLiu ]
[ id => 2 , balance => 0 , card_number => 1003 , customer => ZhaoLiu ]
[ id => 1 , balance => 0 , card_number =>  , customer =>  ]
[ id => 2 , balance => 100 , card_number => 1003 , customer => ZhaoLiu ]
[ id => 1 , balance => 100 , card_number => 1000 , customer => ZhangSan ]
[ id => 4 , balance => 0 , card_number => 1003 , customer => ZhaoLiu ]
Process finished with exit code 0

数据表展示:

成功获取脏数据,显然凭空多出了 100 分,实验开始时总资产为 200,而并发执行后,总资产变为 300,可见在并发消费下,出现了逻辑错误。

重置表中的数据,反复运行几次,直接查看表数据变化:

===

===

===

===

… …

执行结果具有不确定性,这也反映了线程在执行时 CPU 对任务的快速切换选择顺序也是不确定的。

算法改进

我们的常规业务逻辑在单线程模式下运行正常,然而在多线程模式下却发生了错误,所以我们需要改进算法,支持多线程并发模式消费。

上述并发产生错误的原因,是由于在 UPDATE 没有操作成功时(包括 UPDATE 操作之前/操作中(并没有操作完成))我们便进行了第二次的查询操作,余额重复使用产生了脏数据。纠其根本原因,还是在于 MySQL 数据库对于多线程处理默认使用乐观锁,所以 update 的时候并不会完全锁死表,仍然支持查询,在此时很大可能就在查询数据,查询到的仍然是旧的数据,所以从根本上导致了数据错乱,最终导致脏数据的产生。

===

相关知识点:

事务的四个特性:原子性,一致性,隔离性,持久性

  • 原子性:包含在事务内的所有操作,要么全部执行完成,要么全部执行失败

  • 一致性:包含在事务内的所有操作设计的数据行,能被查看到的要么全部执行完成后的结果,要么全部完成前的结果。也就是说小明有 100 块,小花有 10 块,小明给小花转了 50 块。那么对于其他事务来说,能看到的只有是小明有 50 块同时小花有 60 块。或者是小明有 100 块同时小花有 10 块。而不能出现小明有 50 块而小花有 10 块。这就叫做一致性

  • 持久性:事务执行完成后数据被持久化到磁盘。

  • 隔离性:隔离性有四大隔离级别:

    • ① Serializable (串行化):可避免脏读、不可重复读、幻读的发生。
    • ② Repeatable read (可重复读):可避免脏读、不可重复读的发生。
    • ③ Read committed (读已提交):可避免脏读的发生。
    • ④ Read uncommitted (读未提交):最低级别,任何情况都无法保证。

脏读,幻读

  • a、脏读

脏读是指在一个事务处理过程里读取了另一个未提交的事务中的数据。

当一个事务正在多次修改某个数据,而在这个事务中这多次的修改都还未提交,这时一个并发的事务来访问该数据,就会造成两个事务得到的数据不一致。

  • b、不可重复读

不可重复读是指在对于数据库中的某个数据,一个事务范围内多次查询却返回了不同的数据值,这是由于在查询间隔,被另一个事务修改并提交了。

  • c、虚读(幻读)

幻读是事务非独立执行时发生的一种现象。例如事务 T1 对一个表中所有的行的某个数据项做了从“1”修改为“2”的操作,这时事务 T2 又对这个表中插入了一行数据项,而这个数据项的数值还是为“1”并且提交给数据库。而操作事务 T1 的用户如果再查看刚刚修改的数据,会发现还有一行没有修改,其实这行是从事务 T2 中添加的,就好像产生幻觉一样,这就是发生了幻读。

幻读和不可重复读都是读取了另一条已经提交的事务(这点就脏读不同),所不同的是不可重复读查询的都是同一个数据项,而幻读针对的是一批数据整体(比如数据的个数)。

===

Ⅰ、乐观锁改进 - 类 CAS 无锁操作机制改进办法

1、JVM-CAS 简述

JVM 为了保证数据不被脏读,措施之一就是采用了 CAS(compare and set)操作机制。我们举个简单的例子了解一下该机制:

对于某个变量 A,我们为其增加一个版本号 version,使用 getA(version)方法获取 A 的值,使用 setA(value1, version++)来更新 A 的值,即每次更新 A 的值其版本号都+1。现在有 2 个线程,Thread1 and Thread2,Thread1=>( getA( 0 ), setA( v2, 0 ) ), Thread2=>( getA( 0 ), setA(v3, 0) )。CPU 必定会先调取某一个线程,我们假设先调取 Thread1,Thread1=>setA( v2, 0 )方法,会使得 A 的版本号+1,即由[version=0]=>[version=1],此后 Thread2=>getA( 0 )方法不会失效,注意 getA 方法不因为版本号的不一致而失败,当进行到 Thread2=>setA(v3, 0)方法时,此时 setA 方法中 A 的版本号为 0,而此时将要被 set 的 A 的变量的版本号已经被 Thread1 修改为 1,0≠1,setA 方法调用失效,Thread2 会回到线程的第一步重新操作,升高变量 A 的版本号,执行 Thread2=>( getA( 1 ), setA(v3, 1) ),当再次执行到 Thread2=>setA(v3, 1)方法时,有操作版本号 1=存储版本号 1,此时版本号达到一致,A 变量的值更新为 v3,同时其版本号+1,即 version=2。如果 Thread2 第二次进行 setA 方法的时候,A 的版本号又被其他线程修改提升,那么 Thread2 将会继续重新执行本线程,重新 Thread2=>( getA( version ), setA(v3, version ) ),如此循环,直到操作版本号和存储版本号一致为止。

我们可以参照下 JVM-CAS 的操作机制,在数据库字段中对 balance 字段增加一个 version 字段,当 balance 的值改变时,version 自增 1。

sql
1
2
ALTER TABLE `studyjpa`.`bank_account`
ADD COLUMN `version` int(11) COMMENT '数据版本号-从0开始每次+1' AFTER `customer`;

E-R 图

初始数据:

此外,我们将 update 并发操作串行化,再加上数据版本控制,实现类 CAS 操作。串行化采用生产者->消费者模式,消费者并发的操作请求将会逐条用编号标记放置在计算机内存中排队等待消费者处理(该机制参考 java 的一些并发框架)。

2、并行转串行处理

我们将该过程分为以下几个关键步骤:

A.并发请求接收=>为请求分配序号=>按序号将处理的内容存储在缓存池对应的缓存块中

B.消费者自动处理服务循环扫描缓存块=>有未处理请求进行处理

A、B 为两个相互不干扰的一个过程,A 作为生产者 Producer 向缓存块中不断存入数据,B 作为消费者 Consumer 从缓存块中取出数据进行处理,B 对 A 产生的请求进行异步处理。之所以设置缓存块,是因为在上面的实验环境中,要保证不产生脏数据,必须是生产者产生 1 个请求,消费者就同步处理 1 个请求,然而生产者产生请求的速度要高于消费者处理数据的速度,所以无法完成生产者<=>消费者之间的同步,因此请求需要排队进行异步处理。

最后在阐述几个关键性的要点:

  • 为提高效率,并发请求过程中为请求分配任务序号不使用锁机制(synchronized 等)。
  • 任务序号(TaskIndex)采用的是原子计数方式生成,保障并发的请求的任务序号不重复。
  • 缓存块采用普通的 String[]数组(String[] Task)实现,从内存中读取缓存数据效率要高很多。
  • 数组的最大长度(CoreMAX)决定着程序能够承载的最大并发数。
  • 缓存块的 index 就是并发请求分配的任务序号

下面用 java 代码实现:

生产者模拟

java
TransMoneyProducer.java
1
2
3
4
5
6
7
8
9
package Bank.demo1;

public class TransMoneyProducer {

public String test(int for_id){
return "for_id = " + for_id + " is waiting for deal";
}

}

BankCore 类 - 并行转串行核心类

java
BankCore.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package Bank.demo1;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
* BankCore 建立缓存池,并行转串行
*/
public class BankCore {

public final static int CoreMAX = 1024; // 最大支持缓存数

public static AtomicInteger TaskIndex = new AtomicInteger(); // 循环自增原子计数,为每个请求标上序号(0-1023-0-1023-...)
public static AtomicInteger RequestIndex = new AtomicInteger(); // 自增原子计数,为每个请求标上序号(用途就是和写入数组的TaskIndex对比,查看请求是否有丢失)
public static AtomicInteger scanIndex = new AtomicInteger(); // 扫描服务
public static String[] Task = new String[CoreMAX]; // 缓存,支持1024个并发转换

ExecutorService executor;

public BankCore(){
// 初始化1024个缓存
for(int i = 0; i < CoreMAX; i++){
Task[i] = "";
}
// 初始化
TaskIndex.compareAndSet(0,0);
RequestIndex.compareAndSet(0,0);
executor = Executors.newCachedThreadPool();
}

public void NewRequest(String requestStr){
int newRequestIndex = RequestIndex.incrementAndGet();
// int newTaskIndex = newRequestIndex & (CoreMAX - 1) - 1;
int newTaskIndex = TaskIndex.incrementAndGet();
TaskIndex.compareAndSet(1024, 0);
System.out.println("NewRequest[" + newRequestIndex + "] => [" + newTaskIndex + "] " + requestStr);
// 如果有空余的位子存放新请求那么就存放,否则会抛弃新请求
if(Task[newTaskIndex].equals("")){
Task[newTaskIndex] = requestStr;
}
}

/**
* 异步自动处理请求开始
*/
public void AutoListenStart(){
executor.submit(() -> {
System.out.println("new Thread AutoListenStart");
// 阻塞,循环扫描第0-1023处缓存
while(true){
int task_index = scanIndex.get();
if(!Task[task_index].equals("")){
// 有新任务
Thread.sleep(50);
System.out.println("TaskIndex = " + task_index + " [" + Task[task_index] + "]" + " | 已处理");
Task[task_index] = "";
}
scanIndex.incrementAndGet();
scanIndex.compareAndSet(1024, 0);
}
});
}

/**
* 停止listen
*/
public void shutdownNow(){
executor.shutdownNow();
}

}
  • 我们建立了一个 1024 长度的缓存池,可以承载 1024 个并发。
  • NewRequest 方法:并发线程提交请求,即[并发接收池=>存入缓存]。
  • AutoListenStart 方法是一个独立的线程,当其被启动后,它会在后台持续扫描缓存池,以单线程的方式处理新请求。
  • AutoListenStart 方法处理新请求完毕后,会清空对应的缓存块,这也是判断缓存块中是否有未处理请求的依据。
  • AutoListenStart 方法在处理请求的时候,延时 50ms,假设处理数据花了 50ms。

下面我们测试一下:

新建 DemoTest1 类,模拟并发请求:

java
DemoTest1.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package Bank.demo1;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class DemoTest1 {

public static void main(String[] args) throws InterruptedException {

ExecutorService executor1 = Executors.newCachedThreadPool();
BankCore bankCore = new BankCore();
TransMoneyProducer transMoneyProducer = new TransMoneyProducer();

bankCore.AutoListenStart();

// 开启并发线程模拟
for(int i = 0; i<10; i++){
int ii = i;
executor1.submit(() -> {
bankCore.NewRequest(transMoneyProducer.test(ii));
});
}

Thread.sleep(1000);
System.out.println("RequestIndex Max: "+ BankCore.RequestIndex.get());
System.out.println("TaskIndex Max: "+ BankCore.TaskIndex.get());

executor1.shutdownNow();
bankCore.shutdownNow();

}

}

运行,看下 Console 输出如下:

我们可以看到并发产生的请求 NewRequest 有条不紊的被我们的 AutoListenStart 处理成功,10 个并发线程一个都没有丢失,都被正确的处理了。

3、应用上述算法处理并发转账引起的脏数据

Ⅰ、TransMoneyProducer 生产者

一个正式方法:newTrans

TransMoneyProducer.newTrans(senderCardNumber, receiverCardNumber, transCount)

java
TransMoneyProducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
package Bank.demo1;

public class TransMoneyProducer {

public String newTrans(String senderCardNumber, String receiverCardNumber, int transCount){
return senderCardNumber + "#" + receiverCardNumber + "#" + transCount;
}

public String test(int for_id){
return "for_id = " + for_id + " is waiting for deal";
}

}

Ⅱ、TransMoneyConsumer 消费者

消费者类不需要支持多线程,按照单线程正常的代码结构即可

使用方法:直接调用 TransMoneyConsumer 对象,传入发送者卡号、接收者卡号、金额即可

new TransMoneyConsumer(senderCardNumber, receiverCardNumber, transCount)

java
TransMoneyConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package Bank.demo1;

import Bank.dbBean.accountSuper;

import java.sql.*;

/**
* 转战类
*/

public class TransMoneyConsumer {

// 参数配置
private static final String dbName = "studyjpa";
private static final String URL = "jdbc:mysql://127.0.0.1:3306/" + dbName + "?useUnicode=true&characterEncoding=UTF-8";
private static final String user = "root";
private static final String password = "root";
// 数据库句柄
private static Connection conn;
// SQL模板
private static PreparedStatement sql;
// res
private static ResultSet res;

/**
* 转账操作初始化
* @param String senderCardNumber
* @param String receiverCardNumber
* @param int transCount
*/
public TransMoneyConsumer(String senderCardNumber, String receiverCardNumber, int transCount){
openConn();
doTrans(senderCardNumber, receiverCardNumber, transCount);
System.out.println("------------------------");
System.out.println("TransMoney1 is Construct");
System.out.println("receiverCardNumber: " + receiverCardNumber);
System.out.println("------------------------");
}

/**
* 打开数据库连接
*/
public void openConn(){
pl("openConn");
try{
Class.forName("com.mysql.jdbc.Driver"); // 加载驱动
}catch(ClassNotFoundException e) {
// 捕捉到错误
System.out.println("ClassNotFoundException!" + e);
}
// 连接至数据库
try {
conn = DriverManager.getConnection(URL, user, password);
conn.setAutoCommit(false);
// System.out.println(dbName + " opened SUCCESS");
}catch(SQLException e) {
// 捕捉到错误
System.out.println(dbName + " opened error!" + e);
}
}

/**
* 通过卡号查询信息
* @param String cardNumber
* @return account account
*/
public accountSuper findByCardNumber(String cardNumber){
accountSuper r = new accountSuper();
try{
// 执行SQL语句
sql = conn.prepareStatement("SELECT `id`, `balance`, `card_number`, `customer`, `version` FROM `bank_account` WHERE `card_number`=?");
sql.setString(1, cardNumber);
res = sql.executeQuery();
if(res.next()){
r.setId(res.getInt("id"));
r.setBalance(res.getInt("balance"));
r.setCard_number(res.getString("card_number"));
r.setCustomer(res.getString("customer"));
r.setVersion(res.getInt("version"));
}
res.close();
if(conn.isClosed()){
openConn();
}
}catch(Exception e) {
// 捕捉到错误
System.out.println("Exception! FROM SELECT : " + e + " card_number: " + cardNumber);
}
return r;
}

/**
* 转账
*/
public void doTrans(String senderCardNumber, String receiverCardNumber, int transCount){
accountSuper sender = findByCardNumber(senderCardNumber);
accountSuper receiver = findByCardNumber(receiverCardNumber);
int sender_id = sender.getId();
int receiver_id = receiver.getId();
// 执行转账
int senderBalance = sender.getBalance();
int receiverBalance = receiver.getBalance();
// sender 账户余额是否充足
if (senderBalance - transCount >= 0) {
senderBalance -= transCount;
receiverBalance += transCount;
try {
// 构造sender-SQL模板
PreparedStatement sender_sql =
conn.prepareStatement("UPDATE `bank_account` SET `balance`=?, version=version+1 WHERE `id`=?");
// 填充模板
sender_sql.setInt(1, senderBalance);
sender_sql.setInt(2, sender_id);
sender_sql.executeUpdate();

// 构造receiver-SQL模板
PreparedStatement receiver_sql =
conn.prepareStatement("UPDATE `bank_account` SET `balance`=?, version=version+1 WHERE `id`=?");
// 填充模板
receiver_sql.setInt(1, receiverBalance);
receiver_sql.setInt(2, receiver_id);
receiver_sql.executeUpdate();
// 执行事务
try {
conn.commit();
getTradePrint("√ TransMoney Success");
} catch (SQLException e) {
// 捕捉到错误
try {
conn.rollback();
getTradePrint("×× new Thread update fail: " + e + ", been rollback!");
} catch (SQLException e1) {
getTradePrint("××× new Thread update fail: " + e + ", rollback fail!: " + e1);
}
}
} catch (SQLException e) {
// 捕捉到错误
try {
conn.rollback();
getTradePrint("×× SQLException: " + e + ", been rollback! ");
} catch (SQLException e1) {
getTradePrint("××× SQLException: " + e + ", rollback fail: " + e1);
}
}
}else{
// 余额不足
getTradePrint(sender.getCard_number() + "=>Trans=>" + receiver.getCard_number() + " Balance cannot afford");
}
}

public void getTradePrint(String resString){
// System.out.println("------------------------");
// System.out.println(">>>" + resString);
// System.out.println("Sender: " + sender.getCard_number() + "receiver: " + receiver.getCard_number());
// System.out.println("SenderName: " + sender.getCustomer() + "receiverName: " + receiver.getCustomer());
// System.out.println("TransMoney: ¥" + deltaMoney);
// System.out.println("sender balance: ¥" + sender.getBalance() + " => ¥" + findByCardNumber(sender.getCard_number()).getBalance());
// System.out.println("receiver balance: ¥" + receiver.getBalance() + " => ¥" + findByCardNumber(receiver.getCard_number()).getBalance());
// System.out.println("------------------------");
System.out.println(resString);
}

public void pl(String s){
System.out.println(s);
}


}

Ⅲ、并行=>串行转换核心代码

BankCore1 类

使用方法:初始化对象后,首先调用 AutoListenStart 进行后台扫描缓存,有新请调用 NewRequest 方法

BankCore1.AutoListenStart()

BankCore1.NewRequest(requestStr)

java
BankCore1.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package Bank.demo1;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class BankCore1 {

public final static int CoreMAX = 1024; // 最大支持缓存数

public static AtomicInteger TaskIndex = new AtomicInteger(); // 循环自增原子计数,为每个请求标上序号(0-1023-0-1023-...)
public static AtomicInteger RequestIndex = new AtomicInteger(); // 自增原子计数,为每个请求标上序号(用途就是和写入数组的TaskIndex对比,查看请求是否有丢失)
public static AtomicInteger scanIndex = new AtomicInteger(); // 扫描服务
public static String[] Task = new String[CoreMAX]; // 缓存,支持1024个并发转换

ExecutorService executor;

public BankCore1(){
// 初始化1024个缓存
for(int i = 0; i < CoreMAX; i++){
Task[i] = "";
}
// 初始化
TaskIndex.compareAndSet(0,0);
RequestIndex.compareAndSet(0,0);
executor = Executors.newCachedThreadPool();
}

public void NewRequest(String requestStr){
int newRequestIndex = RequestIndex.incrementAndGet();
// int newTaskIndex = newRequestIndex & (CoreMAX - 1) - 1;
int newTaskIndex = TaskIndex.incrementAndGet();
TaskIndex.compareAndSet(1024, 0);
System.out.println("NewRequest[" + newRequestIndex + "] => [" + newTaskIndex + "] " + requestStr);
// 如果有空余的位子存放新请求那么就存放,否则会抛弃新请求
if(Task[newTaskIndex].equals("")){
Task[newTaskIndex] = requestStr;
}
}

/**
* 异步自动处理请求开始
*/
public void AutoListenStart(){
executor.submit(() -> {
System.out.println("new Thread AutoListenStart");
// 阻塞,循环扫描第0-1023处缓存
while(true){
int task_index = scanIndex.get();
if(!Task[task_index].equals("")){
// 分割信息
// 任务格式 转账卡号#收款卡号#金额
String[] deal = Task[task_index].split("#");
String senderCardNumber = deal[0];
String receiverCardNumber = deal[1];
int transCount = Integer.parseInt(deal[2]);
new TransMoneyConsumer(senderCardNumber, receiverCardNumber, transCount);
System.out.println("TaskIndex = " + task_index + " [" + Task[task_index] + "]" + " | 已处理");
Task[task_index] = "";
}
scanIndex.incrementAndGet();
scanIndex.compareAndSet(1024, 0);
}
});
}

/**
* 停止listen
*/
public void shutdownNow(){
executor.shutdownNow();
}

}

Ⅳ、组合 Ⅰ,Ⅱ,Ⅲ,即生产者=>缓存池=>消费者 模式

DemoTest2 类

java
DemoTest2.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package Bank.demo1;


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class DemoTest2 {

public static void main(String[] args) throws InterruptedException {

ExecutorService executor1 = Executors.newCachedThreadPool();
String sender = "1000";
String Lisi = "1001";
String WangWu = "1002";
String ZhaoLiu = "1003";
String[] acc = new String[3];
acc[0] = Lisi;
acc[1] = WangWu;
acc[2] = ZhaoLiu;

BankCore1 bankCore = new BankCore1();
TransMoneyProducer transMoneyProducer = new TransMoneyProducer();
bankCore.AutoListenStart();

// 开启并发线程模拟
for(int i = 0; i<6; i++){
int ii = 0;
if(i < 3){
ii = i;
}else{
ii = i - 3;
}
String receiver = acc[ii];
executor1.submit(() -> {
bankCore.NewRequest(transMoneyProducer.newTrans(sender, receiver, 100));
});
}

Thread.sleep(2000);
System.out.println("RequestIndex Max: "+ BankCore1.RequestIndex.get());
System.out.println("TaskIndex Max: "+ BankCore1.TaskIndex.get());

executor1.shutdownNow();
bankCore.shutdownNow();

}

}

我们这次进行 6 线程并发测试,张三并发向李四、王五、赵六转账两次,即

理论上讲,转账过后,张三的余额为 0,李四/王五/赵六的其中两人账户余额为 100,另外一个人账户余额为 0,Console 会有账户余额不足提示 4 次。

重置数据库中的数据:

我们接下来运行一下,得到 Console 打印结果:

new Thread AutoListenStart
NewRequest[2] => [2] 1000#1002#100
NewRequest[1] => [1] 1000#1001#100
NewRequest[3] => [3] 1000#1003#100
NewRequest[4] => [4] 1000#1001#100
NewRequest[5] => [5] 1000#1002#100
NewRequest[6] => [6] 1000#1003#100
openConn
√ TransMoney Success
------------------------
TransMoney1 is Construct
receiverCardNumber: 1001
------------------------
TaskIndex = 1 [1000#1001#100] | 已处理
openConn
√ TransMoney Success
------------------------
TransMoney1 is Construct
receiverCardNumber: 1002
------------------------
TaskIndex = 2 [1000#1002#100] | 已处理
openConn
1000=>Trans=>1003 Balance cannot afford
------------------------
TransMoney1 is Construct
receiverCardNumber: 1003
------------------------
TaskIndex = 3 [1000#1003#100] | 已处理
openConn
1000=>Trans=>1001 Balance cannot afford
------------------------
TransMoney1 is Construct
receiverCardNumber: 1001
------------------------
TaskIndex = 4 [1000#1001#100] | 已处理
openConn
1000=>Trans=>1002 Balance cannot afford
------------------------
TransMoney1 is Construct
receiverCardNumber: 1002
------------------------
TaskIndex = 5 [1000#1002#100] | 已处理
openConn
1000=>Trans=>1003 Balance cannot afford
------------------------
TransMoney1 is Construct
receiverCardNumber: 1003
------------------------
TaskIndex = 6 [1000#1003#100] | 已处理
RequestIndex Max: 6
TaskIndex Max: 6

Process finished with exit code 1

数据表数据变化如下:

经过多次运行测试,均是正常的转出两次 100,再转出开始提示余额不足(最后那个版本号 version 暂时没有用到,因为我们并行转串行后就不需要考虑数据版本的问题了)

可见我们这种并行转串行的算法生效了。下面我们通过几个示意图总结一下上述算法的流程:

===

当然并行转串行只是众多方法的一种。

===

程序分析

  • 该方法最大的特点就是实现无锁高并发,适用于一些小规模的并发场景。

  • TaskTool 扫描 Task 的方法比较耗费资源,当高并发条件下当然没有问题,但是当并发数较低或没有并发时,应当适当降低扫描消耗。

  • 基于 2 条,应当为 TaskPool 设置自动调速算法,合理利用计算机资源。

  • 学海无涯,仍然有很多知识等待我们去学习思考。

===

===

2019-06-10 18:18:23 毕

文章作者: Bill
文章链接: http://blog.webpro.ltd/2019/06/07/ewallet-concurrency-java-mysql/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Bill's blog

评论