This is working code.
Required: pip install numpy websocket-client
-
Paste your
API_KEY
andSECRET
in the Auth class -
For websocket thread: change
args=('BTC-USDT',)
to the market you want -
For send order: change
send_order('BTC-USDT', 'BUY', quote_size='2.8')
to the market, side, and size you want.
Questions/improvements welcome.
auth.py
import time, hmac, hashlib
class Auth:
def __init__(self):
self.API_KEY = ''
self.SECRET = ''
def __call__(self, message):
timestamp = str(int(time.time()))
signature = hmac.new(self.SECRET.encode('utf-8'), (timestamp + message).encode('utf-8'), digestmod=hashlib.sha256).hexdigest()
headers = {
'CB-ACCESS-KEY': self.API_KEY,
'CB-ACCESS-TIMESTAMP': str(int(time.time())),
'CB-ACCESS-SIGN': signature,
'Content-Type': 'application/json'
}
return headers, signature, self.API_KEY, timestamp
main.py
import json, http.client, websocket
from threading import Thread
import numpy as np
from auth import Auth
def create_websocket(market):
global current_price
current_price = None
channel = 'ticker'
auth = Auth()
r = auth(channel + market)
signature, API_KEY, timestamp = r[1], r[2], r[3]
subscribe_msg = {
'type': 'subscribe',
'product_ids': [
market
],
'channel': 'ticker',
'api_key': API_KEY,
'timestamp': timestamp,
'signature': signature
}
subscribe_msg = json.dumps(subscribe_msg)
ws = websocket.create_connection('wss://advanced-trade-ws.coinbase.com')
ws.send(subscribe_msg)
while True:
message = json.loads(ws.recv())
if 'tickers' in message['events'][0]:
current_price = message['events'][0]['tickers'][0]['price']
print(current_price)
def send_request(method, path, payload):
auth = Auth()
headers = auth(method + path + payload)[0]
conn = http.client.HTTPSConnection('api.coinbase.com')
conn.request(method, path, payload, headers)
data = json.loads(conn.getresponse().read())
return data
def send_order(market, side, quote_size=None, base_size=None):
if side == 'BUY':
amount_id = 'quote_size'
size_id = quote_size
elif side == 'SELL':
amount_id = 'base_size'
size_id = base_size
method = 'POST'
path = '/api/v3/brokerage/orders'
client_order_id = str(np.random.randint(2**30))
payload = {
'client_order_id': client_order_id,
'product_id': market,
'side': side,
'order_configuration': {
'market_market_ioc': {
amount_id: size_id
}
}
}
payload = json.dumps(payload)
data = send_request(method, path, payload)
success = data['success']
print(success)
order_id = data['order_id']
return order_id
def get_order(order_id):
method = 'GET'
path = '/api/v3/brokerage/orders/historical/' + order_id
payload = ''
data = send_request(method, path, payload)
print(data)
""" websocket thread """ # change args to the market you want
# for testing
Thread(target=create_websocket, args=('BTC-USDT',)).start()
# for production run as daemon=True so it will stop after the main thread is done
#Thread(target=create_websocket, daemon=True, args=('BTC-USDT',)).start()
""" send order """ # quote_size required for BUY, base_size required for SELL
order_id = send_order('BTC-USDT', 'BUY', quote_size='2.8') # BUY
#order_id = send_order('BTC-USDT', 'SELL', base_size='0.0001') # SELL
get_order(order_id)