场景是这样的,我们有一个模块设定如下

用户通过各种渠道获取积分,每月10号,25号进行一次结算;结算排名存入数据库,排行前列的可获得奖励

因为是分布式架构,多个服务器同时操作积分表,于是就有了脏数据的情况。当a服务器对表进行了一次结算操作,b服务器又对结算后的结果再次进行结算。于是数据就脏掉了,那么怎么费最小的劲避免这个问题呢?我们可以使用mysql的事务加一张临时表

1
2
3
4
5
6
create table `SCHEDULED` 
(
`end_time` bigint(20) NOT NULL,
`id` int(11) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`end_time`)
);

增加SCHEDULED表来记录每一次排行榜的结算操作,其中字段END_TIME记录结算时的毫秒数;由于每次只会获取下个10号或25号,该字段是百分百唯一且无后效性的

定时任务的逻辑如下

表结构:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.meteor.activetop;

public enum SQL_MAPPER {

CREATE_RACE_SCHEDULED_TABLE(
"create table `{table_name}` (`end_time` bigint(20) NOT NULL,`id` int(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (`end_time`) );"
),
IS_EXECUTE_CLOSE_SCHEDULED(
"select count(*) as cnt from {table_name} where `end_time` = ?;"
),
CREATE_RACE_DATA_TABLE(
"create table `{table_name}` (`type` varchar(25) NOT NULL , `value` varchar(255) NOT NULL , PRIMARY KEY(`type`));"
),
INSERT_EXECUTE_CLOSE_LOG(
"insert into `table_name`(`end_time`) values({end_time});"
),
CLOSE_SORT(
"select player from {table_name} where last_active_time <= ? order by `point` desc limit 30;"
),
INSERT_TOP_LOG(
"insert into {table_name} values(?,?,?,?);"
),
RESET_POINT(
"update `{table_name}` set `point` = ?;"
),
SAVE_ENDTIME(
"insert into `{table_name}`(`type`,`value`) values('end_time',?) on duplicate key update `type`='end_time',`value`=?;"
),
SELECT_CURRENT_RACE_ENDTIME(
"select `value` from `{table_name}` where `type` = 'end_time';"
);

private String query;

SQL_MAPPER(String query) {
this.query = query;
}

public String getQuery() {
return query;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package com.meteor.activetop;

import com.meteor.activetop.storage.AbstractPlayerDataStorage;
import com.meteor.activetop.util.Utils;
import com.meteor.jellylib.mysql.FastMySQLStorage;
import com.meteor.jellylib.mysql.column.Column;
import com.meteor.jellylib.mysql.data.KeyValue;
import org.bukkit.Bukkit;

import javax.security.sasl.SaslServer;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class RaceStore {
private final String RACE_SCHEDULED_TABLE = "race_scheduled";
private final String RACE_DATA_TABLE = "race_data";

private ActiveTop plugin;

private Connection connection;

private long raceEndTime = 0L;


private RaceStore(ActiveTop plugin){
this.plugin = plugin;
this.initDatabase();
this.initRace();
}

private void initDatabase(){
try {
this.connection = DriverManager.getConnection(plugin.getConfig().getString("RaceStore.url"));
String create_race_scheduled_tableQuery = SQL_MAPPER.CREATE_RACE_SCHEDULED_TABLE.getQuery();
create_race_scheduled_tableQuery = create_race_scheduled_tableQuery.replace("{table_name}",RACE_SCHEDULED_TABLE);
executeSqlQuery(connection,create_race_scheduled_tableQuery);
String create_race_dataQuery = SQL_MAPPER.CREATE_RACE_DATA_TABLE.getQuery();
create_race_dataQuery = create_race_dataQuery.replace("{table_name}",RACE_DATA_TABLE);
executeSqlQuery(connection,create_race_dataQuery);
} catch (SQLException e) {
throw new RuntimeException(e);
}
plugin.getLogger().info("初始化竞赛数据库");
}

// 获取未结算的比赛结束日
private long getLastEndTime(){
String select_current_race_endtimeQuery = SQL_MAPPER.SELECT_CURRENT_RACE_ENDTIME.getQuery();
select_current_race_endtimeQuery = select_current_race_endtimeQuery.replace("{table_name}",RACE_DATA_TABLE);
try {
PreparedStatement preparedStatement = this.connection.prepareStatement(select_current_race_endtimeQuery);
ResultSet resultSet = preparedStatement.getResultSet();
while (resultSet.next()) Long.parseLong(resultSet.getString("value"));
} catch (SQLException e) {
throw new RuntimeException(e);
}
return -1;
}

// 使用临时表来存储事务是否执行过;避免重复结算
private boolean isExecuteClose(){
String is_execute_close_scheduledQuery = SQL_MAPPER.IS_EXECUTE_CLOSE_SCHEDULED.getQuery();
is_execute_close_scheduledQuery = is_execute_close_scheduledQuery.replace("{table_name}",RACE_SCHEDULED_TABLE);
try {
PreparedStatement preparedStatement = connection.prepareStatement(is_execute_close_scheduledQuery);
preparedStatement.setLong(1,getRaceEndTime());
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()){
int cnt = resultSet.getInt("cnt");
if(cnt>0) return true;
return false;
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
return true;
}

public static void executeSqlQuery(Connection connection,String sql,Object... args){
try (PreparedStatement preparedStatement = connection.prepareStatement(sql);){
for(int i=1;i<=args.length;i++)
preparedStatement.setObject(i,args[i-1]);
preparedStatement.executeUpdate();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

public static ResultSet executeQuery(Connection connection,String sql,Object... args){
try (PreparedStatement preparedStatement = connection.prepareStatement(sql);){
for(int i=1;i<=args.length;i++)
preparedStatement.setObject(i,args[i-1]);
return preparedStatement.executeQuery();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}


// 结算一场比赛...
private boolean closeReward(){
// 一场比赛已结束..
if(System.currentTimeMillis()>getRaceEndTime()){
// 如果事务未执行过,开始结算
if(!isExecuteClose()){
AbstractPlayerDataStorage.getInstance().closeReward(SQL_MAPPER.INSERT_EXECUTE_CLOSE_LOG
.getQuery().replace("{table_name}",RACE_SCHEDULED_TABLE)
.replace("{end_tme}",String.valueOf(getRaceEndTime())),getRaceEndTime());
}
raceEndTime = Utils.countRaceEndTime(System.currentTimeMillis());
}

long delayTick = ((raceEndTime - System.currentTimeMillis())/1000)*20;
plugin.getLogger().info("将在"+delayTick+"个tick后开始结算竞赛");
Bukkit.getScheduler().runTaskLaterAsynchronously(plugin,()->{
closeReward();
},delayTick+(60*20));
return true;
}

private void initRace(){
// 如果一场比赛正在进行中,尝试结算;否则计算新的结束日期
if((this.raceEndTime = getLastEndTime())!=-1) closeReward();
else raceEndTime = Utils.countRaceEndTime(System.currentTimeMillis());
}

public void close(){
// 保存未结束的比赛
executeSqlQuery(connection,SQL_MAPPER.SAVE_ENDTIME.getQuery().replace("{table_name}",RACE_SCHEDULED_TABLE),
raceEndTime,raceEndTime);
try {
connection.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}


public long getRaceEndTime() {
return raceEndTime;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public boolean closeReward(String lockSql,long endTime) {
Connection connection = null;
try {
connection = fastMySQLStorage.getConnection();
connection.setAutoCommit(false);
String close_sortQuery = SQL_MAPPER.CLOSE_SORT.getQuery();
close_sortQuery = close_sortQuery.replace("{table_name}",P_POINT_TABLE);
ResultSet resultSet = RaceStore.executeQuery(connection, close_sortQuery, endTime);
int top = 1;
String insert_top_logQuery = SQL_MAPPER.INSERT_TOP_LOG.getQuery();
insert_top_logQuery = insert_top_logQuery.replace("{table_name}",TOP_TABLE);
while (resultSet.next()){
RaceStore.executeSqlQuery(connection,insert_top_logQuery,top++,endTime,resultSet.getInt("point"),resultSet.getString("player"));
}
//标注已结算过奖励
RaceStore.executeSqlQuery(connection,lockSql);
//重置积分
RaceStore.executeSqlQuery(connection,SQL_MAPPER.RESET_POINT.getQuery().replace("{table_name}",P_POINT_TABLE),0);
connection.commit();
} catch (Throwable e) {
try {
connection.rollback();
} catch (SQLException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
return false;
}

考虑a,b服务器的结算操作。如果a子服的结算事务先成功提交,此时SCHEDULED表中会插入一条记录来堵塞B子服的事务开始;如果a,b子服同时进行结算操作,事务的默认隔离级别为可重复读,他们读到的数据是一致的,和业务逻辑并无冲突
至此,只要其中一台服务器没挂,每月10,25号的“结算”操作都能正常运行