给定2013,2014,2015,2016,2017的福彩双色球数据.

输出:给出前区( 红球 )数据( 6个号码 )的预测至少一注( 一条比如 2, 5, 13, 14, 26, 33 )


环境说明:

  • 操作系统: MacOS 10.14.1
  • Python:v3.6.7
  • Jupyter Notebook: v1.0.0

数据的加载与分析

# 导入必要的库
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
# 读取数据并观察
df = pd.read_csv('./ssq.TXT', sep='\t')
# '期号','套球号','红球1','红球2','红球3','红球4','红球5','红球6','蓝球'
print('总期数:', len(df))
df[:5]
总期数: 767

数据介绍及目标

数据包含从2013年到2017年共767期的福彩双色球数据。其中红球按照从小到大的顺序给出,并没有提供6个红球的出球次序。 对于双色球彩票这样随机的事件,我们希望从历史数据中找出一些规律,从而得到一个可以明显提高收益预期的购买策略。

对数据进行一些可视化

对比枯燥的表格,将数据绘制成图形,会直观很多,下面进行一些尝试:

六个位置出现红球号数的分布情况:

plots = df.loc[:,['R1','R2','R3','R4','R5','R6']].hist(bins = 33)

png

因为各个号码最终安从小到大排列到各个位置,所以这样的图形的确符合常理,但每个直方图都有许多项缺失的情况。

蓝色球号的分布情况

plots = df['Blue'].hist(bins = 16, figsize = [5,2])

png

16个蓝色的球,其理论上分布应该是均匀分布,在767期历史数据中,似乎的确有概率上的细微差别。

第一个红球的编号随期数(时间)的变动:

# 画出了前500期 R1 的时间序列折线图
plots = df['R1'][:500].plot(figsize = [20,2])

png

套球号随期数(时间)的变动:

# 画出了前500期 套球号 的时间序列折线图
plots = df['NUM'][:500].plot(figsize = [20,2])

png

套球号与出球分布的关系

# 绘制套球号与出球分布的关系
# 灰色部分为套球号与 R1 球号分布的关系
colors = ['red','blue','orange','green']
for i in [1,2,3,4]:
    axi = plt.subplot(2,2,i)
    n = df[df.NUM == i].loc[:,['R1','R2','R3','R4','R5','R6']].values
    n = n.reshape([n.shape[0]*n.shape[1]])
    plots = axi.hist(n,bins = 33, alpha = 0.25, color=colors[i-1])
    plots = axi.hist(df[df.NUM == i]['R1'], bins = 33, alpha = 0.5, color='black')
    plots = axi.legend([str(i)+'-R1', str(i)])

png

一些小小的思路

在给定的数据中,我们只有红球按大小排列的结果,而得不到顺序信息,但反过来想,这其实简化我们的分析压力,我们只需要将排序完成的序列作为预测对象即可。

关于套球号

套球号决定了摇号的机器,本身会对出球的概率造成一定的影响,但在历史数据绘制的图像中,暂时不能确定套球号对于所以出球号的概率到底有怎么样的影响,但对 R1 概率的影响还是肉眼可见的。

策略:

基本方案

根据六个红球按大小顺序出现的历史数据,我们统计从每一号球后出现球号的概率,并按照此概率进行下一轮球号的预测,并尝试考虑套球号的影响。 在每一步的预测中,我们需要:

  1. 指定第一个球号(R1)
  2. 根据 R1 已经先前得到的概率生成之后的序列
  3. 判断这个序列是否符合双色球规则

在上述流程中,若考虑套球号的影响,则一开始指定套球号而不是 R1

R1/套球号 的指定方案

将其历史数据作为时间序列进行预测,得到下一个数据的预测值

策略的实现方案

基于上述策略,我们发现我们所要预测的,是在给出先验结果的基础上,得到最有可能依次出现的序列组合,这点与语言文本的生成极为相似,于是我们打算使用训练一个 RNN 的方式来实现序列预测。

模型的实现

# 导入必要的库
import time
import tensorflow as tf
tf.enable_eager_execution()

RNN 模型

class Model(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, units):
        super(Model, self).__init__()
        self.units = units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)

        self.gru = tf.keras.layers.CuDNNGRU(self.units, 
                                          return_sequences=True, 
                                          recurrent_initializer='glorot_uniform',
                                          stateful=True)
        self.fc = tf.keras.layers.Dense(vocab_size)  
    def call(self, x):
        embedding = self.embedding(x)
        output = self.gru(embedding)
        prediction = self.fc(output)
        return prediction
    def build(self, input_shape):
        self.embedding.build(input_shape)
        self.gru.build(tf.TensorShape([input_shape[0], input_shape[1], embedding_dim]))
        self.fc.build(tf.TensorShape([input_shape[0], input_shape[1], self.units]))
        super(Model, self).build(input_shape)

指定一些参数值,包括红球编号的范围、Embedding 层的大小,以及 RNN 单元的大小:

tf.reset_default_graph()
rball_size = 34
embedding_dim = 256
units = 1024

optimizer = tf.train.AdamOptimizer()
def loss_function(real, preds):
    return tf.losses.sparse_softmax_cross_entropy(labels=real, logits=preds)

定义训练模型的函数

def train_model(model, dataset, checkpoint_dir = './training_checkpoints' , EPOCHS = 50):
    # 定义模型存放的路径
    checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
    checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)

    for epoch in range(EPOCHS):
        start = time.time()
        hidden = model.reset_states()

        for (batch, (inp, target)) in enumerate(dataset):
            with tf.GradientTape() as tape:
                predictions = model(inp)
                loss = loss_function(target, predictions)
            grads = tape.gradient(loss, model.variables)
            optimizer.apply_gradients(zip(grads, model.variables))

        if (epoch + 1) % 5 == 0:
            checkpoint.save(file_prefix = checkpoint_prefix)

        if (epoch + 1) % 10 == 0:
            print ('Epoch {} Loss {:.4f}'.format(epoch+1, loss))
            print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

数据集的加载

定义加载数据集的函数

# 把数据集分为 input 和 target
def split_input_target(chunk):
    input_ = chunk[:-1]
    target = chunk[1:]
    return input_, target

# 生成数据集
def load_dataset(chunks, seq_length, BATCH_SIZE, BUFFER_SIZE = 10000):
    dataset = chunks.map(split_input_target)
    # 输出一组 input 和 target
    for inp, target in dataset.take(1):
        print('INPUT: ',inp.numpy(), '\nTARGET: ', target.numpy())
    return dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)

我们在模型的训练中,会训练两个 RNN 模型, 一个根据历史的 R1 数据预测下一个 R1, 另一个是根据 R1 来预测 R2-R6 的数值。所以我们准备了两份训练集:

R2-6 预测训练集

chunks = tf.data.Dataset.from_tensor_slices(df.loc[:,['R1','R2','R3','R4','R5','R6']].values)
r26_dataset = load_dataset(chunks, seq_length = 5, BATCH_SIZE = 32)
r26_model = Model(rball_size, embedding_dim, units)
r26_model.build(tf.TensorShape([32, 5]))
INPUT:  [ 6  8 14 15 24] 
TARGET:  [ 8 14 15 24 25]

R1 预测训练集

chunks = tf.data.Dataset.from_tensor_slices(df['R1'].values).batch(33, drop_remainder=True)
r1_dataset = load_dataset(chunks, seq_length = 32, BATCH_SIZE = 3)
r1_model = Model(rball_size, embedding_dim, units)
r1_model.build(tf.TensorShape([3, 32]))
INPUT:  [ 6  1 22  6  1  9  2  3  1  1  3  6  5  2  5  2  4  2  1  1  1  2  3  4
 16  4  1  7  6  7  3  4] 
TARGET:  [ 1 22  6  1  9  2  3  1  1  3  6  5  2  5  2  4  2  1  1  1  2  3  4 16
  4  1  7  6  7  3  4  5]

模型的训练:

训练前面提到的两个模型,保存位置分别为:

  • R2-6 预测:'./r26_training_checkpoints'
  • R1 预测:'./r1_training_checkpoints'
# 训练 r26_model
train_model(r26_model, r26_dataset, checkpoint_dir = './r26_training_checkpoints', EPOCHS = 60)
Epoch 10 Loss 2.3451
Time taken for 1 epoch 0.33402252197265625 sec

Epoch 20 Loss 2.0390
Time taken for 1 epoch 0.33139848709106445 sec

Epoch 30 Loss 1.3000
Time taken for 1 epoch 0.33975768089294434 sec

Epoch 40 Loss 1.1307
Time taken for 1 epoch 1.2251815795898438 sec

Epoch 50 Loss 1.0488
Time taken for 1 epoch 0.3994264602661133 sec

Epoch 60 Loss 1.1033
Time taken for 1 epoch 0.344959020614624 sec
# 训练 r1_model
train_model(r1_model, r1_dataset, checkpoint_dir = './r1_training_checkpoints', EPOCHS = 60)
Epoch 10 Loss 2.6247
Time taken for 1 epoch 0.16772770881652832 sec

Epoch 20 Loss 2.3420
Time taken for 1 epoch 0.17430996894836426 sec

Epoch 30 Loss 2.2850
Time taken for 1 epoch 0.9093096256256104 sec

Epoch 40 Loss 2.2243
Time taken for 1 epoch 0.580397367477417 sec

Epoch 50 Loss 1.9372
Time taken for 1 epoch 0.18292641639709473 sec

Epoch 60 Loss 1.5689
Time taken for 1 epoch 0.18114113807678223 sec

模型的加载与预测

模型的加载

def load_latest_model(checkpoint_dir, seq_len = None):
    model = Model(rball_size, embedding_dim, units)
    checkpoint = tf.train.Checkpoint(model=model)
    checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
    model.build(tf.TensorShape([1, seq_len]))
    return model

# 加载 model_r1
model_r1 =  load_latest_model('./r1_training_checkpoints', seq_len = 30)
# 加载 model_r26
model_r26 =  load_latest_model('./r26_training_checkpoints', seq_len = None)

彩票的预测

# 根据先验数据预测 r1
def predict_r1(input_eval, model, temperature = 2):
    hidden = model.reset_states()
    input_eval = tf.expand_dims(input_eval, 0)
    predictions = model(input_eval)
    predictions = tf.squeeze(predictions, 0)
    predictions = predictions / temperature
    return tf.multinomial(predictions, num_samples=1)[-1,0].numpy()

# 根据 r1 预测 r2-6
def predict_with_r1(r1, model, temperature = 2):
    hidden = model.reset_states()
    input_eval = [r1]
    result = [r1]
    for _ in range(5):
        input_eval = tf.expand_dims(input_eval, 0)
        predictions = model(input_eval)
        predictions = tf.squeeze(predictions, 0)
        predictions = predictions / temperature
        predicted_id = tf.multinomial(predictions, num_samples=1)[-1,0].numpy()
        input_eval = [predicted_id]
        result.append(predicted_id)
    return result

# 根据先验30天数据预测下一组彩票
def predict_with_history(data, temperature = 1):
    return predict_with_r1(predict_r1(data, model_r1, 
                               temperature = temperature), 
                                model_r26, temperature = temperature)
  
# 测试预测一组数据:
predict_with_history(np.random.randint(1,10,[30]))
[1, 7, 13, 19, 21, 29]

模型的评价

彩票中奖金额计算代码:

函数 “eval_balls” 返回一个评奖的值,未获奖为 0

def eval_balls(my, ans):
    my = my[:7]
    ans = ans[:7]
    nums = 0
    for i in my[:6]:
        if i in ans[:6]:
            nums += 1
    result = (nums, 1 if my[6] == ans[6] else 0)
    if result == (6, 1):
        return 1
    elif result == (6, 0):
        return 2
    elif result == (5, 1):
        return 3
    elif result == (5, 0) or result == (4, 1):
        return 4
    elif result == (4, 0) or result == (3, 1):
        return 5
    elif result == (2, 1) or result == (1, 1) or result == (0, 1):
        return 6
    else:
        pass
    return 0

模型的比较(相较于随机购买):

这一轮,我们来比较两种购买彩票的策略:

  • RNN 预测法(因为本次预测暂不考虑蓝球,所以蓝球随机购买)
  • 念佛随机购买法
# RNN 预测法
def buy_with_prediction(history, temp = 1):
    a = predict_with_history(history, temperature=temp)
    a.append(np.random.randint(1,17))
    return a
# 念佛随机购买法
def buy_random_with_belief():
    # 默念:南无阿弥陀佛
    while True:
        a = list(np.random.randint(1,34,6))
        if len(set(a)) == 6:
            break
    return a+list(np.random.randint(1,17,1))

我们从网上获取了 2018 年从 18001 到 18137 共137 期双色球数据,保存为 “2018ssq.txt” 作为测试数据

读取测试数据

test_data = pd.read_csv('./2018ssq.txt', sep = ' ').loc[:,['R1','R2','R3','R4','R5','R6','Blue']]
# 测试数据展示:
test_data[:5]

一个提供先验数据与中奖号码的生成器

data = pd.concat([df.loc[:,['R1','R2','R3','R4','R5','R6','Blue']][-30:], test_data])
def test_generator():
    for i in range(137):
        history = data[i:i+30]['R1'].values
        target = data[i+30:i+31].values[0]
        yield history, target

预测并显示结果:

我们使用两种策略购买这137期彩票,并统计其中每种奖的次数,为避免出现偶然性,我们购买100次:

from tqdm import tqdm

def cmp(times, temp):
    pred_result = np.zeros([7])
    rand_result = np.zeros([7])
    for _ in tqdm(range(times)):
        for h, t in test_generator():
            pred_result[eval_balls(buy_with_prediction(h, temp = 2), t)] += 1
            rand_result[eval_balls(buy_random_with_belief(), t)] += 1
    return pred_result,rand_result

当 temperture < 1 时,预测输出更加偏向先验结果,当 temperture > 1 是,预测输出更加新颖

图表表示的是在各个奖项中,预测法比随机法多中奖的数量:(下方代码多次尝试,结论相似)

times = 100
plt.figure(figsize=(15, 3))
m = np.array([-2,5000000,5000000,298,198,8,3])

pred_result,rand_result = cmp(times = times, temp = 0.5)
ax = plt.subplot(1,3,1)
ax.bar([1,2,3,4,5,6], pred_result[1:] - rand_result[1:])
print('Temp = 0.5,合计盈亏', (m*pred_result).sum())

pred_result,rand_result = cmp(times = times, temp = 1)
ax = plt.subplot(1,3,2)
ax.bar([1,2,3,4,5,6], pred_result[1:] - rand_result[1:])
print('Temp = 1,合计盈亏', (m*pred_result).sum())

pred_result,rand_result = cmp(times = times, temp = 2)
ax = plt.subplot(1,3,3)
ax.bar([1,2,3,4,5,6], pred_result[1:] - rand_result[1:])
print('Temp = 2,合计盈亏', (m*pred_result).sum())
100%|██████████| 100/100 [04:03<00:00,  2.44s/it]
  0%|          | 0/100 [00:00<?, ?it/s]

Temp = 0.5,合计盈亏 -19555.0


100%|██████████| 100/100 [04:04<00:00,  2.44s/it]
  0%|          | 0/100 [00:00<?, ?it/s]

Temp = 1,合计盈亏 -19475.0


100%|██████████| 100/100 [04:01<00:00,  2.39s/it]


Temp = 2,合计盈亏 -19740.0

png

结论

  • 由上面的图表可看出,经过预测的购买方式明显比随机购买的收益高,我们的预测方法有效。
  • 对比三张图以及合计盈亏来看,参数 temperature 应当选用 1 最为恰当。
  • 无法做到提高 1、2、3 等奖的中奖率,但若只是小奖,无法做到盈利
  • 另外,关于 RNN 的训练次数,这里的 EPOCH 设置在 60,大概是 loss 下降变慢的时候结束,但并没有训练至 loss 收敛为止,因为怕过拟合,预测值如果与之前太过相似不利于整体的预测,但这也仅仅是猜想,我们的电脑没有足够的计算力让我们去尝试各种参数,目前的预测方法有效就已经足够。

参考文献

[1] Text generation using a RNN with eager execution (opens new window)