用到的工具:券商的Ptrade。原因是散户容易获得,并且运行在券商的云端,程序放上去后自己的电脑关机也照样运行。
目前已走通,分享给大家,只要在钉钉里建个群,然后建个群机器人,选加签获得“加签密钥”和webhook,代码如下:
### 将这部分导入模块放在你的策略最前面
(图片来自网络侵删)import time as t
import json
import requests
import hmac
import hashlib
import base64
import urllib.parse
import urllib.request
### 在你策略的任意位置放这个消息发送函数,注意填入你自己的加签密钥和webhook
def sendMSG_dd(content):
timestamp = str(round(t.time() 1000))
now = t.strftime('%Y-%m-%d %H:%M:%S')
secret = '你的加签密钥'
webhook = '你的webhook'
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
url = '{}×tamp={}&sign={}'.format(webhook,timestamp,sign)
header = {
"Content-Type": "application/json",
"Charset": "UTF-8"
}
data = {
"msgtype": "text",
"text": {
"content":content+" "+now
},
"at": {
"isAtAll": False # isAtAll:是否@所有人
}
}
sendData = json.dumps(data)
sendDatas = sendData.encode("utf-8")
request = urllib.request.Request(url=url, data=sendDatas, headers=header)
opener = urllib.request.urlopen(request)
ret = opener.read()
# 输出响应结果
print(ret)
### 在任何你想得到通知的点位调用sendMSG_dd即可, 比如我在每天交易初始化时发个通知“程序已正常开始工作”,这样不用每天登录查看,只要ptrade出了故障没有跑起来,我就能第一时间知道
def before_trading_start(context, data):
sendMSG_dd("程序已正常开始工作"):
只要绕过两个坑,实现起来还是很简单的,第一:如果你不是企业,没有域名或者接收消息服务器,不用尝试企业微信了,会因为ip地址不可信失败。第二:钉钉群聊要建内部群,因为只有内部群才可以使用自定义机器人发送通知。