Tutorial: Auth, Websocket, Send Order, Get Order [Python]

This is working code.

Required: pip install numpy websocket-client

  1. Paste your API_KEY and SECRET in the Auth class

  2. For websocket thread: change args=('BTC-USDT',) to the market you want

  3. For send order: change send_order('BTC-USDT', 'BUY', quote_size='2.8') to the market, side, and size you want.

Questions/improvements welcome.


import time, hmac, hashlib

class Auth:
    def __init__(self):
        self.API_KEY = ''
        self.SECRET = ''
    def __call__(self, message):
        timestamp = str(int(time.time()))
        signature = hmac.new(self.SECRET.encode('utf-8'), (timestamp + message).encode('utf-8'), digestmod=hashlib.sha256).hexdigest()
        headers = {
            'CB-ACCESS-KEY': self.API_KEY,
            'CB-ACCESS-TIMESTAMP': str(int(time.time())),
            'CB-ACCESS-SIGN': signature,
            'Content-Type': 'application/json'
        return headers, signature, self.API_KEY, timestamp


import json, http.client, websocket
from threading import Thread
import numpy as np
from auth import Auth

def create_websocket(market):
    global current_price
    current_price = None
    channel = 'ticker'
    auth = Auth()
    r = auth(channel + market)
    signature, API_KEY, timestamp = r[1], r[2], r[3]
    subscribe_msg = {
        'type': 'subscribe',
        'product_ids': [
        'channel': 'ticker',
        'api_key': API_KEY,
        'timestamp': timestamp,
        'signature': signature
    subscribe_msg = json.dumps(subscribe_msg)
    ws = websocket.create_connection('wss://advanced-trade-ws.coinbase.com')
    while True:
        message = json.loads(ws.recv())
        if 'tickers' in message['events'][0]:
            current_price = message['events'][0]['tickers'][0]['price']

def send_request(method, path, payload):
    auth = Auth()
    headers = auth(method + path + payload)[0]
    conn = http.client.HTTPSConnection('api.coinbase.com')
    conn.request(method, path, payload, headers)
    data = json.loads(conn.getresponse().read())
    return data

def send_order(market, side, quote_size=None, base_size=None):
    if side == 'BUY':
        amount_id = 'quote_size'
        size_id = quote_size
    elif side == 'SELL':
        amount_id = 'base_size'
        size_id = base_size
    method = 'POST'
    path = '/api/v3/brokerage/orders'
    client_order_id = str(np.random.randint(2**30))
    payload = {
        'client_order_id': client_order_id,
        'product_id': market,
        'side': side,
        'order_configuration': {
            'market_market_ioc': {
                amount_id: size_id
    payload = json.dumps(payload)
    data = send_request(method, path, payload)
    success = data['success']
    order_id = data['order_id']
    return order_id

def get_order(order_id):
    method = 'GET'
    path = '/api/v3/brokerage/orders/historical/' + order_id
    payload = ''
    data = send_request(method, path, payload)

""" websocket thread """ # change args to the market you want
# for testing
Thread(target=create_websocket, args=('BTC-USDT',)).start()
# for production run as daemon=True so it will stop after the main thread is done
#Thread(target=create_websocket, daemon=True, args=('BTC-USDT',)).start()

""" send order """ # quote_size required for BUY, base_size required for SELL
order_id = send_order('BTC-USDT', 'BUY', quote_size='2.8') # BUY
#order_id = send_order('BTC-USDT', 'SELL', base_size='0.0001') # SELL


