Tutorial: Auth, Websocket, Send Order, Get Order [Python]

This is working code.

Required: pip install numpy websocket-client

  1. Paste your API_KEY and SECRET in the Auth class

  2. For websocket thread: change args=('BTC-USDT',) to the market you want

  3. For send order: change send_order('BTC-USDT', 'BUY', quote_size='2.8') to the market, side, and size you want.

Questions/improvements welcome.

auth.py

import time, hmac, hashlib

class Auth:
    def __init__(self):
        self.API_KEY = ''
        self.SECRET = ''
    
    def __call__(self, message):
        timestamp = str(int(time.time()))
        signature = hmac.new(self.SECRET.encode('utf-8'), (timestamp + message).encode('utf-8'), digestmod=hashlib.sha256).hexdigest()
        headers = {
            'CB-ACCESS-KEY': self.API_KEY,
            'CB-ACCESS-TIMESTAMP': str(int(time.time())),
            'CB-ACCESS-SIGN': signature,
            'Content-Type': 'application/json'
        }
        return headers, signature, self.API_KEY, timestamp

main.py

import json, http.client, websocket
from threading import Thread
import numpy as np
from auth import Auth


def create_websocket(market):
    global current_price
    current_price = None
    channel = 'ticker'
    auth = Auth()
    r = auth(channel + market)
    signature, API_KEY, timestamp = r[1], r[2], r[3]
    subscribe_msg = {
        'type': 'subscribe',
        'product_ids': [
            market
            ],
        'channel': 'ticker',
        'api_key': API_KEY,
        'timestamp': timestamp,
        'signature': signature
    }
    subscribe_msg = json.dumps(subscribe_msg)
    ws = websocket.create_connection('wss://advanced-trade-ws.coinbase.com')
    ws.send(subscribe_msg)
    while True:
        message = json.loads(ws.recv())
        if 'tickers' in message['events'][0]:
            current_price = message['events'][0]['tickers'][0]['price']
            print(current_price)


def send_request(method, path, payload):
    auth = Auth()
    headers = auth(method + path + payload)[0]
    conn = http.client.HTTPSConnection('api.coinbase.com')
    conn.request(method, path, payload, headers)
    data = json.loads(conn.getresponse().read())
    return data


def send_order(market, side, quote_size=None, base_size=None):
    if side == 'BUY':
        amount_id = 'quote_size'
        size_id = quote_size
    elif side == 'SELL':
        amount_id = 'base_size'
        size_id = base_size
    method = 'POST'
    path = '/api/v3/brokerage/orders'
    client_order_id = str(np.random.randint(2**30))
    payload = {
        'client_order_id': client_order_id,
        'product_id': market,
        'side': side,
        'order_configuration': {
            'market_market_ioc': {
                amount_id: size_id
            }
        }
    }
    payload = json.dumps(payload)
    data = send_request(method, path, payload)
    success = data['success']
    print(success)
    order_id = data['order_id']
    return order_id


def get_order(order_id):
    method = 'GET'
    path = '/api/v3/brokerage/orders/historical/' + order_id
    payload = ''
    data = send_request(method, path, payload)
    print(data)



""" websocket thread """ # change args to the market you want
# for testing
Thread(target=create_websocket, args=('BTC-USDT',)).start()
# for production run as daemon=True so it will stop after the main thread is done
#Thread(target=create_websocket, daemon=True, args=('BTC-USDT',)).start()


""" send order """ # quote_size required for BUY, base_size required for SELL
order_id = send_order('BTC-USDT', 'BUY', quote_size='2.8') # BUY
#order_id = send_order('BTC-USDT', 'SELL', base_size='0.0001') # SELL

get_order(order_id)
1 Like

Hello @Mike1! We appreciate your interest in Coinbase Cloud products. We would also like to thank you for your effort in bringing this up to our attention and sharing this information with the community.

1 Like