тест
риваитватв
Пробуем создать с нуля сканер арбитража криптовалют на phyton
Что такое сканер арбитража криптовалют, или по другому - криптосканер?
Сканеры арбитража - это автоматизированные сервисы для поиска, анализа и отображения сигналов в реальном времени, по различным торговым инструментам, таким как - криптовалюты, токены, мемкоины.
Криптоарбитраж - процесс покупки крипто-актива на одной бирже по меньшей цене и перепродажа на другой бирже по большей цене. Различают несколько видов криптоарбитража - стандартный и треугольный (внутрибиржевой), в данной статье речь пойдет о стандартном виде арбитража.
Одно время искал в интернете информацию о том как сделать сканер для арбитража, натыкался, в основном на теоретические статьи, в которых изложен принцип работы, и очень мало деталей. Также, в интернете можно найти множество различных криптоарбитражных ботов и сервисов, к сожалению, большинство из них распространяется на платной основе, либо предоставляется урезанный функционал. По итогу, но ничего стоящего на мой взгляд не нашлось, и было принято решение попробовать самостоятельно создать минимальный функционал для криптоарбитража.
Краткая логика программы.
По началу я собираюсь использовать данные самых популярных CEX (централизованных) бирж, быстренько отбираю по списку капитализации на популярном сайте, и по принципу универсальности и простоты API. У меня получился список из 9 бирж. Из этих бирж нам нужно получить списки инструментов (криптовалютных пар), BIDs и ASKs, затем записать все данные в один упорядоченный массив. В полученном массиве мы ищем расхождения в ценах между биржами, и в случае если расхождения существенны получаем дополнительную статистику по паре, а именно - по 30 последних операций и информацию о сетях доступных для ввода-вывода (данная информация крайне важна при арбитраже между биржами).
Далее я постараюсь подробно описать суть и логику программы, весь код рабочей программы прикрепляю в конце статьи.
Реализация программы. Сбор данных с бирж через API.
Для начала определяем переменную G_pair_data, сюда будем собирать все наши данные о парах и ценах с бирж, также определим переменную с ссылками на API выбранных мною ранее бирж. Нам нужны будут данные только по спотовому рынку, данные методы общедоступные, поэтому API ключ для получения информации пока не требуется.
countDataBr, G_pair_data, symbols, Info_Data = {}, {}, {}, {}
urls = {'binance': 'https://api.binance.com/api/v3/exchangeIfo?permissions=SPOT',
'huobi': 'https://api.huobi.pro/v1/settings/common/symbols',
'bybit': 'https://api.bybit.com/v5/market/instruments-info?category=spot',
'okx': 'https://www.okx.com/api/v5/public/instruments?instType=SPOT',
'kucoin': 'https://api.kucoin.com/api/v2/symbols',
'kraken': 'https://api.kraken.com/0/public/AssetPairs',
'mexc': 'https://api.mexc.com/api/v3/exchangeInfo',
'gateio': 'https://api.gateio.ws/api/v4/spot/currency_pairs',
'crypto': 'https://api.crypto.com/exchange/v1/public/get-instruments',
}
Теперь для каждой биржи объявим свою персональную функцию, так-как каждая биржа отдает данные немного по своему, а нам нужно собрать единый массив в котором мы сможем просто и быстро оперировать этими данными. Ниже я приведу код на примере одной биржи с подробными комментариями, остальные функции аналогичны, за исключением значений обращений и работы с исходными данными. Полный код будет в конце статьи.
def binance_Data(data):
b = 'binance'
symbols[b] = {} # создаем доп массив для конкретной биржи
for symbol in data['symbols']: # перебираем полученные данные
if (symbol['status'] != "BREAK") and symbol['isMarginTradingAllowed']: # проверям на подходящий статус
base_asset = symbol['baseAsset'] # базовая монета
quote_asset = symbol['quoteAsset'] # второстепенная монета
bb = base_asset + '/' + quote_asset
if bb not in G_pair_data.keys(): # если ранее данные не были записаны
G_pair_data[bb] = {"bir": [], "bid": [], "ask": []} # создаем пустую запись с ключем по данной паре
symbols[b][symbol['symbol']] = bb #дописываем ключ пары для текущей биржи
response = session.get('https://binance.com/api/v3/ticker/bookTicker') #делаем запрос для получения BID, ASK цен
dataBook = response.json()
for entry in dataBook:
symbol = entry['symbol']
try: #если есть цены, пишем их, иначе приравниваем к 0
price_bid = float(entry['bidPrice'])
price_ask = float(entry['askPrice'])
except:
price_bid, price_ask = 0, 0
AddPricesData(symbol, price_bid, price_ask, b) # вызываем универасльную функцию для записи полученных данных в глобальный массив
print(b)
После подготовки данных, записываем их в глобальный массив при помощи еще одной функции:
def AddPricesData(symbol, price_bid, price_ask, b): # Дописываем закупочные цены
k1 = 0
if symbol in symbols[b].keys():
bb = symbols[b][symbol]
base_currency, quote_currency = bb.split('/')
if price_bid > 0 and price_ask > 0: # оставляем только те наборы, где есть цены
if base_currency + "/" + quote_currency in G_pair_data.keys():
bbq = base_currency + "/" + quote_currency
k1 = 1
elif quote_currency + "/" + base_currency in G_pair_data.keys():
bbq = quote_currency + "/" + base_currency
k1 = 1
if k1 == 1: # дописываем данные в глобальный массив
G_pair_data[bbq]["bir"].append(b)
G_pair_data[bbq]["bid"].append(float(get_normal_curr(price_bid)))
G_pair_data[bbq]["ask"].append(float(get_normal_curr(price_ask)))
else: # если цен нет, очищаем переменные
if base_currency + "/" + quote_currency in G_pair_data.keys():
del G_pair_data[base_currency + "/" + quote_currency]
elif quote_currency + "/" + base_currency in G_pair_data.keys():
del G_pair_data[quote_currency + "/" + base_currency]
Теперь, выполнив персональные функции для каждой биржи мы получим глобальный массив со всеми парами, отсортированными по биржам с ценами покупки продажи. Но нам по прежнему не хватает некоторых данных, к тому же нужно как то анализировать весь этот массив, поэтому продолжаем.
Реализация глобальной логики. Потоки, персональные функции
Чтобы сократить время на получение данных, реализуем вызов каждой персональной функции в потоке. Каждый поток пишет данные по своей бирже в общий массив и закрывается, освобождая место другому.
Предварительно объявим функции, которые будут на основе аргументов вызывать персональные функции для бирж:
def Test_Trading_Info(Info_Data): # получаем дополнительные статистические данные
for item in Info_Data.keys(): # Дописываем данные о %% разнице стаканов
bir_name = Info_Data[item]['bir']
globals()[bir_name + '_TBA'](item) # вызов персональной функции для получения стаканов
# print(str(item) + "---" + str(Info_Data[item]))
for item in Info_Data.keys():
bir_name = Info_Data[item]['bir']
globals()[bir_name + '_TradeData'](Info_Data[item],
item) # вызов персональной функции для получения истории цен
def get_networks(): # получаем сети
for item in main_bir:
globals()[item + '_GetNetwork']() # вызов персональной функции для получения сетей
with open('data.json', 'w') as f:json.dump(Networks_Data, f, indent=4)
print('Получение сетей')
def ping(url, b): # получаем пары, BID, ASKs
res = session.get(url)
countDataBr[b] = res.json() # общие данные по валютам
globals()[b + '_Data'](countDataBr[b]) # вызываем персональную функцию для каждой биржи
Затем пишем главную логику программ. Получение данных по биржам и расчеты производим в бесконечном цикле. Количество пересечений между биржами ставим больше 3, т.е. одинаковая пара должны быть на 3 и более биржах, тогда для таких пар будем рассчитывать разницу котировок в % и дополнительные статистические данные. Получение информации о сетях производим заранее, вряд-ли в течении нашей сессии что то изменится в статусе сетей.
if __name__ == '__main__':
get_networks() #получаем сети
while True: # цикл проверки курсов
start = datetime.datetime.now() #замеряем время выполнения
threads = [] # массив потоков
for url in urls.keys():
u = urls[url]
thread = threading.Thread(target=ping, args=(u, url,)) # вызываем персональные функции
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
for item in G_pair_data.keys(): # перебираем полученный глобальный массив
count_it = len(G_pair_data[item]['ask'])
if (count_it > 3) and ('USDT' in item): # если кол-во пересечений по биржам больше 3
CheckProcN(item, G_pair_data[item]['ask']) # находим разницу в % между курсами
Test_Trading_Info(Info_Data) # получаем дополнительные статистические данные для пары
finish = datetime.datetime.now()
print('Время работы: ' + str(finish - start))
countDataBr, G_pair_data, symbols, Info_Data = {}, {}, {}, {} #очищаем переменные в конце цикла
print("Ждем 35 сек")
sleep(35)
Получаем список сетей и статистику
Список сетей пишем в отдельную переменную, чтобы в дальнейшем по ключу узнавать возможность арбитражного окна.
Что такое сеть криптовалюты и для чего она нужно?
Одна криптовалюта или токен может иметь как одну, так и несколько поддерживаемых сетей. Очень важно при переводе валюты из одного места в другое сверять названия и информацию о сети, т.к. если вы отправите криптовалюту на адрес в другой сети, то средства будут навсегда утеряны без возможности восстановления.
В контексте нашей программы информация о сетях играет важную роль, так-как нам важно понимать в какой сети мы можем направить криптовалюту в другую биржу, и какой процент комиссии в данной сети. Кстати, процент комиссии сети играет очень важную роль в межбиржевом арбитраже, так-как, зачастую небольшая разница комиссии может фактически составить заработок на арбитражной сделке.
Сложностью при получении сетей из официальных API бирж является то, что нужно регистрироваться на биржах и создавать закрытый API ключ и SYCRET ключ, мы не будем рассматривать в данной статье подключение к API каждой биржи, у каждой биржи есть свои ньюансы при создании этих ключей и правилам их использования. Вся основная информация и техническая документация указана на официальной странице API интересующей нас биржи.
Стоит отметить, что если в дальнейшем планируется полностью автоматизировать программу, добавить вохможность автоматической покупки криптовалюты, перевода на другую биржу и продажи, то придется проводить интеграцию с каждой биржей и получать эти закрытые ключи.
Для удобства использования, запишем все ключи в один массив:
api_keys = {'bybit': {'key': '************eT8CuQUq', 'sycret': 'xA5K1ccv2******************riHnqPASWh'},
'huobi': {'key': '5***********87512-a8451e9b-mk0lklo0de', 'sycret': 'b6c84***************-74151534-f298b'},
'okx': {'key': '9cc*************d-add7-a827f29052b8', 'sycret': '5958B*************3E064BA73D9332'},
'mexc': {'key': 'mx0************HK1', 'sycret': 'a2d2dc772f**********************3aa95f6'},
'kucoin': {'key': '665b**************a93', 'sycret': '887c6********************51ba273cf16f'},
}
Далее опишем персональную функцию для получения сетей для одной биржи, для остальных бирж функции аналогичные, за исключением обращений и формата данных:
def huobi_GetNetwork(): # получение сетей HTX
b = 'huobi'
Networks_Data[b] = {}
response = session.get(urls_Networks[b]) # отправляем запрос к API
dataBook = response.json()
for item in dataBook['data']: # перебор полученных данных
if item['instStatus'] == 'normal': # проверка на статус
curr = item['currency'].upper() # приводим к единому виду
is_data_net = 0
Networks_Data[b][curr] = {'chains': [], 'deposit': [], 'withdraw': [], 'fee': [], }
for one_net in item['chains']: # перебираем список доступных сетей
if one_net['depositStatus'] == 'allowed' or one_net['withdrawStatus'] == 'allowed': # ввод или вывод должен быть доступен
is_data_net = 1
net_name = one_net['displayName']
try:
fee = one_net['transactFeeWithdraw'] # фикс комиссии
except:
fee = one_net['transactFeeRateWithdraw']+'_rate' # комиссии с расчетом в %
dep_stat, with_stat = 'no', 'no' # проверка на доступность ввода-вывода
if one_net['depositStatus'] == "allowed":
dep_stat = 'yes'
if one_net['withdrawStatus'] == "allowed":
with_stat = 'yes'
Networks_Data[b][curr]['chains'].append(net_name) # записываем данные
Networks_Data[b][curr]['deposit'].append(dep_stat)
Networks_Data[b][curr]['withdraw'].append(with_stat)
Networks_Data[b][curr]['fee'].append(fee)
if is_data_net == 0: # чистим пустые данные
del Networks_Data[b][curr]
После того как сети были получены, а данные о парах и ценах были собраны и отсортированы, а также были отобраны интересующие нас пары с частым пересечением и был произведен расчет разницы котировок в процентах, можно дополнительными запросами получить чуть больше статистики для анализа.
Тут, на самом деле, можно придумать различные стратегии, написать сложные алгоритмы анализа и так далее. В своей программе я описал 2 дополнительных метода - это получение данных о процентной разницы стаканов на покупку и продажу, и получение краткой истории цен с расчетом сумм продаж и покупок.
Соответствующие функции приведу ниже, функции персональные для каждой биржи, они аналогичны, между собой, как и в примерах выше.
def huobi_TBA(key): # функция получение процентной разницы стаканов покупок и продаж
if Info_Data[key]['status'] == 0:
b = 'huobi'
pair = key.replace('/', '').lower()
response = session.get(urls_Bids_Asks[b] + '?symbol=' + pair + '&type=step0') # запрос к API
dataBook = response.json()
bids = dataBook["tick"]["bids"]
asks = dataBook["tick"]["asks"]
total_bids_amount, total_asks_amount = 0, 0
for item in bids: # перебираем и перемножаем суммы всех BID-ов
price = item[0]
coin_amount = item[1]
total_bids_amount += price * coin_amount
for item in asks:# перебираем и перемножаем суммы всех ASK-ов
price = item[0]
coin_amount = item[1]
total_asks_amount += price * coin_amount
Info_Data[key]['tba'] = total_bids_amount # пишем данные
Info_Data[key]['taa'] = total_asks_amount
def huobi_TradeData(data, key): # функция для получения истории цен
if Info_Data[key]['status'] == 0:
b = 'huobi'
pair = key.replace('/', '').lower()
sstr = b + ", " + key + ', в работе'
response = session.get(urls_LastTrade[b] + '?symbol=' + pair + '&size=20') # запрос на 20 последних операций
dataBook = response.json()
stt = 0
sell_his_ct, buy_his_ct = 1, 1
for entry in reversed(dataBook['data']):
if entry['data'][0]['direction'] == 'buy': # расчет суммы истории покупок-продаж
price_b = float(entry['data'][0]['price'])
size_b = float(entry['data'][0]['amount'])
buy_his_ct = buy_his_ct + (price_b * size_b)
if entry['data'][0]['direction'] == 'sell':
price_b = float(entry['data'][0]['price'])
size_b = float(entry['data'][0]['amount'])
sell_his_ct = sell_his_ct + (price_b * size_b)
if (entry['data'][0]['direction'] == 'buy') and (int(data['time']) < int(entry['ts'])): # только покупки
price_b = float(entry['data'][0]['price'])
normal_time = datetime.datetime.utcfromtimestamp(int(data['time'] / 1000)).strftime('%Y.%m.%d %H:%M')
if price_b >= data['sell']:
sstr = "Профит!! Пара:" + str(key) + " Время: " + str(normal_time) + " Цена продажи:" + str(
price_b) + " Массив:" + str(data)
stt = 1
elif price_b <= data['stop']:
sstr = "Минус!! Пара:" + str(key) + " Время: " + str(normal_time) + " Цена продажи:" + str(
price_b) + " Массив:" + str(data)
stt = 1
dev_history_prices = round(100 - ((buy_his_ct / sell_his_ct) * 100), 2) * -1
Info_Data[key]['buy_his_ct'], Info_Data[key]['sell_his_ct'], Info_Data[key][
'dev_history_prices'] = buy_his_ct, sell_his_ct, dev_history_prices
if stt == 1:
log_in_file(sstr)
Info_Data[key]['status'] = 1
elif stt == 0:
sstr += " История покупок: " + str(Info_Data[key]['buy_his_ct']) + " История продаж: " + str(
Info_Data[key]['sell_his_ct']) + " Разница: " + str(Info_Data[key]['dev_history_prices']) + "%"
Go_data_server(Info_Data[key], key) # отправляем данные на сервер
# print(sstr)
В конце функции huobi_TradeData() мы отправляем полученные результаты себе на сервер, там реализована логика приема и вывода информации в красивом виде, но это не обязательно, можно получить все необходимые данные в консольном виде.