自己部署需要改下机器人token等相关信息。
本代码只能抢购SC2,不能抢购VPS,且不能抢购闪购款。请不要用来强行抢。
可以作为学习telegram机器人交互的代码来学习。
机器人源码
import requests
import redis
import telegram
from telegram.ext import *
# 创建 Redis 连接
r = redis.Redis(host='localhost', port=6379, db=0)
# 处理接收到的文本消息
async def handle_text_message(update, context):
message = update.message
text = message.text
chat_id = message.chat_id
if text == "排队号":
position = None
# 获取列表的长度
position = r.lpos("mylist", chat_id)
if position is not None:
await context.bot.send_message(chat_id=chat_id, text=f"您在队列中的位置是:{position}")
else:
await context.bot.send_message(chat_id=chat_id, text="您当前不在队列中")
else:
# 校验格式
if not validate_format(text):
# 格式不正确,发送提示消息
await context.bot.send_message(chat_id=chat_id, text="格式不正确,请重新发送。")
return
parts = text.split(":")
app_secret = parts[0]
hash_value = parts[1]
# 校验API
url = "https://api.cloudcone.com/api/v2/compute/instances"
headers = {
"accept": "application/json",
"App-Secret": app_secret,
"Hash": hash_value
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
# API校验通过,将chat_id、app_secret和hash数据加入到Redis中
add_to_redis(chat_id, app_secret, hash_value)
await context.bot.send_message(chat_id=chat_id, text="API校验通过,已将数据加入Redis。")
else:
# API校验失败,发送提示消息
await context.bot.send_message(chat_id=chat_id, text="API有误,请检查后重新发送。")
# 校验格式
def validate_format(text):
parts = text.split(":")
if len(parts) != 2:
return False
if not parts[0] or not parts[1]:
return False
return True
def add_to_redis(chat_id, app_secret, hash_value):
position = r.lpos("mylist", chat_id)
# 检查 chat_id 是否不存在于 mylist 中
if position is None:
# 将 chat_id 添加到 mylist 列表
r.rpush('mylist', chat_id)
# 将用户信息和数据存储到 Redis 的 Hash 中
r.hset(chat_id, 'app_secret', app_secret)
r.hset(chat_id, 'hash', hash_value)
# 可选:打印添加的数据
print(f"Added to Redis: {chat_id} - {app_secret} - {hash_value}")
else:
print(f"Data already exists in mylist: {chat_id}")
# 获取最早插入的数据
def get_earliest_data():
# 获取列表中的第一个元素(用户 ID)
chat_id = r.lindex('mylist', 0)
if chat_id:
# 使用 chat_id 从 Redis 的 Hash 中获取用户信息和数据
user_info = r.hgetall(chat_id)
if user_info:
return chat_id.decode(), user_info[b'app_secret'].decode(), user_info[b'hash'].decode()
return None, None, None
if __name__ == '__main__':
# 设置机器人的API令牌
TOKEN = "xx:bbb"
application = Application.builder().token(TOKEN).build()
# 创建一个MessageHandler,处理接收到的文本消息
text_handler = MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message)
# Commands
application.add_handler(text_handler)
# Run bot
application.run_polling(1.0)
抢购页面源码