【玄学】数据挖掘:福彩双色球的预测

【玄学】数据挖掘:福彩双色球的预测

给定2013,2014,2015,2016,2017的福彩双色球数据.

输出:给出前区( 红球 )数据( 6个号码 )的预测至少一注( 一条比如 2, 5, 13, 14, 26, 33 )


环境说明:

  • 操作系统: MacOS 10.14.1
  • Python:v3.6.7
  • Jupyter Notebook: v1.0.0

数据的加载与分析

1
2
3
4
5
6
# 导入必要的库
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
1
2
3
4
5
# 读取数据并观察
df = pd.read_csv('./ssq.TXT', sep='\t')
# '期号','套球号','红球1','红球2','红球3','红球4','红球5','红球6','蓝球'
print('总期数:', len(df))
df[:5]
总期数: 767

数据介绍及目标

数据包含从2013年到2017年共767期的福彩双色球数据。其中红球按照从小到大的顺序给出,并没有提供6个红球的出球次序。
对于双色球彩票这样随机的事件,我们希望从历史数据中找出一些规律,从而得到一个可以明显提高收益预期的购买策略。

对数据进行一些可视化

对比枯燥的表格,将数据绘制成图形,会直观很多,下面进行一些尝试:

六个位置出现红球号数的分布情况:

1
plots = df.loc[:,['R1','R2','R3','R4','R5','R6']].hist(bins = 33)

png

因为各个号码最终安从小到大排列到各个位置,所以这样的图形的确符合常理,但每个直方图都有许多项缺失的情况。

蓝色球号的分布情况

1
plots = df['Blue'].hist(bins = 16, figsize = [5,2])

png

16个蓝色的球,其理论上分布应该是均匀分布,在767期历史数据中,似乎的确有概率上的细微差别。

第一个红球的编号随期数(时间)的变动:

1
2
# 画出了前500期 R1 的时间序列折线图
plots = df['R1'][:500].plot(figsize = [20,2])

png

套球号随期数(时间)的变动:

1
2
# 画出了前500期 套球号 的时间序列折线图
plots = df['NUM'][:500].plot(figsize = [20,2])

png

套球号与出球分布的关系

1
2
3
4
5
6
7
8
9
10
# 绘制套球号与出球分布的关系
# 灰色部分为套球号与 R1 球号分布的关系
colors = ['red','blue','orange','green']
for i in [1,2,3,4]:
axi = plt.subplot(2,2,i)
n = df[df.NUM == i].loc[:,['R1','R2','R3','R4','R5','R6']].values
n = n.reshape([n.shape[0]*n.shape[1]])
plots = axi.hist(n,bins = 33, alpha = 0.25, color=colors[i-1])
plots = axi.hist(df[df.NUM == i]['R1'], bins = 33, alpha = 0.5, color='black')
plots = axi.legend([str(i)+'-R1', str(i)])

png

一些小小的思路

在给定的数据中,我们只有红球按大小排列的结果,而得不到顺序信息,但反过来想,这其实简化我们的分析压力,我们只需要将排序完成的序列作为预测对象即可。

关于套球号

套球号决定了摇号的机器,本身会对出球的概率造成一定的影响,但在历史数据绘制的图像中,暂时不能确定套球号对于所以出球号的概率到底有怎么样的影响,但对 R1 概率的影响还是肉眼可见的。

策略:

基本方案

根据六个红球按大小顺序出现的历史数据,我们统计从每一号球后出现球号的概率,并按照此概率进行下一轮球号的预测,并尝试考虑套球号的影响。
在每一步的预测中,我们需要:

  1. 指定第一个球号(R1)
  2. 根据 R1 已经先前得到的概率生成之后的序列
  3. 判断这个序列是否符合双色球规则

在上述流程中,若考虑套球号的影响,则一开始指定套球号而不是 R1

R1/套球号 的指定方案

将其历史数据作为时间序列进行预测,得到下一个数据的预测值

策略的实现方案

基于上述策略,我们发现我们所要预测的,是在给出先验结果的基础上,得到最有可能依次出现的序列组合,这点与语言文本的生成极为相似,于是我们打算使用训练一个 RNN 的方式来实现序列预测。

模型的实现

1
2
3
4
# 导入必要的库
import time
import tensorflow as tf
tf.enable_eager_execution()

RNN 模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Model(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, units):
super(Model, self).__init__()
self.units = units
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)

self.gru = tf.keras.layers.CuDNNGRU(self.units,
return_sequences=True,
recurrent_initializer='glorot_uniform',
stateful=True)
self.fc = tf.keras.layers.Dense(vocab_size)
def call(self, x):
embedding = self.embedding(x)
output = self.gru(embedding)
prediction = self.fc(output)
return prediction
def build(self, input_shape):
self.embedding.build(input_shape)
self.gru.build(tf.TensorShape([input_shape[0], input_shape[1], embedding_dim]))
self.fc.build(tf.TensorShape([input_shape[0], input_shape[1], self.units]))
super(Model, self).build(input_shape)

指定一些参数值,包括红球编号的范围、Embedding 层的大小,以及 RNN 单元的大小:

1
2
3
4
5
6
7
8
tf.reset_default_graph()
rball_size = 34
embedding_dim = 256
units = 1024

optimizer = tf.train.AdamOptimizer()
def loss_function(real, preds):
return tf.losses.sparse_softmax_cross_entropy(labels=real, logits=preds)

定义训练模型的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def train_model(model, dataset, checkpoint_dir = './training_checkpoints' , EPOCHS = 50):
# 定义模型存放的路径
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)

for epoch in range(EPOCHS):
start = time.time()
hidden = model.reset_states()

for (batch, (inp, target)) in enumerate(dataset):
with tf.GradientTape() as tape:
predictions = model(inp)
loss = loss_function(target, predictions)
grads = tape.gradient(loss, model.variables)
optimizer.apply_gradients(zip(grads, model.variables))

if (epoch + 1) % 5 == 0:
checkpoint.save(file_prefix = checkpoint_prefix)

if (epoch + 1) % 10 == 0:
print ('Epoch {} Loss {:.4f}'.format(epoch+1, loss))
print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

数据集的加载

定义加载数据集的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
# 把数据集分为 input 和 target
def split_input_target(chunk):
input_ = chunk[:-1]
target = chunk[1:]
return input_, target

# 生成数据集
def load_dataset(chunks, seq_length, BATCH_SIZE, BUFFER_SIZE = 10000):
dataset = chunks.map(split_input_target)
# 输出一组 input 和 target
for inp, target in dataset.take(1):
print('INPUT: ',inp.numpy(), '\nTARGET: ', target.numpy())
return dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)

我们在模型的训练中,会训练两个 RNN 模型, 一个根据历史的 R1 数据预测下一个 R1, 另一个是根据 R1 来预测 R2-R6 的数值。所以我们准备了两份训练集:

R2-6 预测训练集

1
2
3
4
chunks = tf.data.Dataset.from_tensor_slices(df.loc[:,['R1','R2','R3','R4','R5','R6']].values)
r26_dataset = load_dataset(chunks, seq_length = 5, BATCH_SIZE = 32)
r26_model = Model(rball_size, embedding_dim, units)
r26_model.build(tf.TensorShape([32, 5]))
INPUT:  [ 6  8 14 15 24] 
TARGET:  [ 8 14 15 24 25]

R1 预测训练集

1
2
3
4
chunks = tf.data.Dataset.from_tensor_slices(df['R1'].values).batch(33, drop_remainder=True)
r1_dataset = load_dataset(chunks, seq_length = 32, BATCH_SIZE = 3)
r1_model = Model(rball_size, embedding_dim, units)
r1_model.build(tf.TensorShape([3, 32]))
INPUT:  [ 6  1 22  6  1  9  2  3  1  1  3  6  5  2  5  2  4  2  1  1  1  2  3  4
 16  4  1  7  6  7  3  4] 
TARGET:  [ 1 22  6  1  9  2  3  1  1  3  6  5  2  5  2  4  2  1  1  1  2  3  4 16
  4  1  7  6  7  3  4  5]

模型的训练:

训练前面提到的两个模型,保存位置分别为:

  • R2-6 预测:’./r26_training_checkpoints’
  • R1 预测:’./r1_training_checkpoints’
1
2
# 训练 r26_model
train_model(r26_model, r26_dataset, checkpoint_dir = './r26_training_checkpoints', EPOCHS = 60)
Epoch 10 Loss 2.3451
Time taken for 1 epoch 0.33402252197265625 sec

Epoch 20 Loss 2.0390
Time taken for 1 epoch 0.33139848709106445 sec

Epoch 30 Loss 1.3000
Time taken for 1 epoch 0.33975768089294434 sec

Epoch 40 Loss 1.1307
Time taken for 1 epoch 1.2251815795898438 sec

Epoch 50 Loss 1.0488
Time taken for 1 epoch 0.3994264602661133 sec

Epoch 60 Loss 1.1033
Time taken for 1 epoch 0.344959020614624 sec
1
2
# 训练 r1_model
train_model(r1_model, r1_dataset, checkpoint_dir = './r1_training_checkpoints', EPOCHS = 60)
Epoch 10 Loss 2.6247
Time taken for 1 epoch 0.16772770881652832 sec

Epoch 20 Loss 2.3420
Time taken for 1 epoch 0.17430996894836426 sec

Epoch 30 Loss 2.2850
Time taken for 1 epoch 0.9093096256256104 sec

Epoch 40 Loss 2.2243
Time taken for 1 epoch 0.580397367477417 sec

Epoch 50 Loss 1.9372
Time taken for 1 epoch 0.18292641639709473 sec

Epoch 60 Loss 1.5689
Time taken for 1 epoch 0.18114113807678223 sec

模型的加载与预测

模型的加载

1
2
3
4
5
6
7
8
9
10
11
def load_latest_model(checkpoint_dir, seq_len = None):
model = Model(rball_size, embedding_dim, units)
checkpoint = tf.train.Checkpoint(model=model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
model.build(tf.TensorShape([1, seq_len]))
return model

# 加载 model_r1
model_r1 = load_latest_model('./r1_training_checkpoints', seq_len = 30)
# 加载 model_r26
model_r26 = load_latest_model('./r26_training_checkpoints', seq_len = None)

彩票的预测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 根据先验数据预测 r1
def predict_r1(input_eval, model, temperature = 2):
hidden = model.reset_states()
input_eval = tf.expand_dims(input_eval, 0)
predictions = model(input_eval)
predictions = tf.squeeze(predictions, 0)
predictions = predictions / temperature
return tf.multinomial(predictions, num_samples=1)[-1,0].numpy()

# 根据 r1 预测 r2-6
def predict_with_r1(r1, model, temperature = 2):
hidden = model.reset_states()
input_eval = [r1]
result = [r1]
for _ in range(5):
input_eval = tf.expand_dims(input_eval, 0)
predictions = model(input_eval)
predictions = tf.squeeze(predictions, 0)
predictions = predictions / temperature
predicted_id = tf.multinomial(predictions, num_samples=1)[-1,0].numpy()
input_eval = [predicted_id]
result.append(predicted_id)
return result

# 根据先验30天数据预测下一组彩票
def predict_with_history(data, temperature = 1):
return predict_with_r1(predict_r1(data, model_r1,
temperature = temperature),
model_r26, temperature = temperature)

# 测试预测一组数据:
predict_with_history(np.random.randint(1,10,[30]))
[1, 7, 13, 19, 21, 29]

模型的评价

彩票中奖金额计算代码:

函数 “eval_balls” 返回一个评奖的值,未获奖为 0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def eval_balls(my, ans):
my = my[:7]
ans = ans[:7]
nums = 0
for i in my[:6]:
if i in ans[:6]:
nums += 1
result = (nums, 1 if my[6] == ans[6] else 0)
if result == (6, 1):
return 1
elif result == (6, 0):
return 2
elif result == (5, 1):
return 3
elif result == (5, 0) or result == (4, 1):
return 4
elif result == (4, 0) or result == (3, 1):
return 5
elif result == (2, 1) or result == (1, 1) or result == (0, 1):
return 6
else:
pass
return 0

模型的比较(相较于随机购买):

这一轮,我们来比较两种购买彩票的策略:

  • RNN 预测法(因为本次预测暂不考虑蓝球,所以蓝球随机购买)
  • 念佛随机购买法
1
2
3
4
5
6
7
8
9
10
11
12
13
# RNN 预测法
def buy_with_prediction(history, temp = 1):
a = predict_with_history(history, temperature=temp)
a.append(np.random.randint(1,17))
return a
# 念佛随机购买法
def buy_random_with_belief():
# 默念:南无阿弥陀佛
while True:
a = list(np.random.randint(1,34,6))
if len(set(a)) == 6:
break
return a+list(np.random.randint(1,17,1))

我们从网上获取了 2018 年从 18001 到 18137 共137 期双色球数据,保存为 “2018ssq.txt” 作为测试数据

读取测试数据

1
2
3
test_data = pd.read_csv('./2018ssq.txt', sep = ' ').loc[:,['R1','R2','R3','R4','R5','R6','Blue']]
# 测试数据展示:
test_data[:5]

一个提供先验数据与中奖号码的生成器

1
2
3
4
5
6
data = pd.concat([df.loc[:,['R1','R2','R3','R4','R5','R6','Blue']][-30:], test_data])
def test_generator():
for i in range(137):
history = data[i:i+30]['R1'].values
target = data[i+30:i+31].values[0]
yield history, target

预测并显示结果:

我们使用两种策略购买这137期彩票,并统计其中每种奖的次数,为避免出现偶然性,我们购买100次:

1
2
3
4
5
6
7
8
9
10
from tqdm import tqdm

def cmp(times, temp):
pred_result = np.zeros([7])
rand_result = np.zeros([7])
for _ in tqdm(range(times)):
for h, t in test_generator():
pred_result[eval_balls(buy_with_prediction(h, temp = 2), t)] += 1
rand_result[eval_balls(buy_random_with_belief(), t)] += 1
return pred_result,rand_result

当 temperture < 1 时,预测输出更加偏向先验结果,当 temperture > 1 是,预测输出更加新颖

图表表示的是在各个奖项中,预测法比随机法多中奖的数量:(下方代码多次尝试,结论相似)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
times = 100
plt.figure(figsize=(15, 3))
m = np.array([-2,5000000,5000000,298,198,8,3])

pred_result,rand_result = cmp(times = times, temp = 0.5)
ax = plt.subplot(1,3,1)
ax.bar([1,2,3,4,5,6], pred_result[1:] - rand_result[1:])
print('Temp = 0.5,合计盈亏', (m*pred_result).sum())

pred_result,rand_result = cmp(times = times, temp = 1)
ax = plt.subplot(1,3,2)
ax.bar([1,2,3,4,5,6], pred_result[1:] - rand_result[1:])
print('Temp = 1,合计盈亏', (m*pred_result).sum())

pred_result,rand_result = cmp(times = times, temp = 2)
ax = plt.subplot(1,3,3)
ax.bar([1,2,3,4,5,6], pred_result[1:] - rand_result[1:])
print('Temp = 2,合计盈亏', (m*pred_result).sum())
100%|██████████| 100/100 [04:03<00:00,  2.44s/it]
  0%|          | 0/100 [00:00<?, ?it/s]

Temp = 0.5,合计盈亏 -19555.0


100%|██████████| 100/100 [04:04<00:00,  2.44s/it]
  0%|          | 0/100 [00:00<?, ?it/s]

Temp = 1,合计盈亏 -19475.0


100%|██████████| 100/100 [04:01<00:00,  2.39s/it]


Temp = 2,合计盈亏 -19740.0

png

结论

  • 由上面的图表可看出,经过预测的购买方式明显比随机购买的收益高,我们的预测方法有效。
  • 对比三张图以及合计盈亏来看,参数 temperature 应当选用 1 最为恰当。
  • 无法做到提高 1、2、3 等奖的中奖率,但若只是小奖,无法做到盈利
  • 另外,关于 RNN 的训练次数,这里的 EPOCH 设置在 60,大概是 loss 下降变慢的时候结束,但并没有训练至 loss 收敛为止,因为怕过拟合,预测值如果与之前太过相似不利于整体的预测,但这也仅仅是猜想,我们的电脑没有足够的计算力让我们去尝试各种参数,目前的预测方法有效就已经足够。

参考文献

[1] Text generation using a RNN with eager execution

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×