Websocket WORKING CODE for Coinbase Advanced Trade

I’m trying to get the websocket up and running, but its not working as I expect it to. When I run the code the terminal just sits there, blank, and then the connection closes. Can anyone fix my code for me that would be amazing for me and anyone else trying to get the websocket up and running:

CoinbaseAPI is a file that stores my sensitive credentials

import websocket
import json
import time
import hashlib
import hmac
import CoinbaseAPI

# Derived from your Coinbase Retail API Key
# SIGNING_KEY: the signing key provided as a part of your API key. Also called the "SECRET KEY"
# API_KEY: the api key provided as a part of your API key. Also called the "PUBLIC KEY"
SIGNING_KEY = CoinbaseAPI.API_SECRET
API_KEY = CoinbaseAPI.API_KEY

if not SIGNING_KEY or not API_KEY:
    raise ValueError('missing mandatory environment variable(s)')

CHANNEL_NAMES = {
    'level2': 'level2',
    'user': 'user',
    'tickers': 'ticker',
    'ticker_batch': 'ticker_batch',
    'status': 'status',
    'market_trades': 'market_trades',
}

# The base URL of the API
WS_API_URL = 'wss://advanced-trade-ws.coinbase.com'

def sign(message, secret):
    hmac_obj = hmac.new(secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256)
    return hmac_obj.hexdigest()

def timestamp_and_sign(message, channel, products=[]):
    timestamp = str(int(time.time()))
    str_to_sign = f"{timestamp}{channel}{','.join(products)}"
    signature = sign(str_to_sign, SIGNING_KEY)
    message['timestamp'] = timestamp
    message['signature'] = signature
    return message

def on_message(ws, message):
    parsed_data = json.loads(message)
    with open('Output1.txt', 'a') as file:
        file.write(json.dumps(parsed_data) + '\n')

def subscribe_to_products(products, channel_name, ws):
    message = {
        'type': 'subscribe',
        'channel': channel_name,
        'api_key': API_KEY,
        'product_ids': products,
    }
    subscribe_msg = timestamp_and_sign(message, channel_name, products)
    ws.send(json.dumps(subscribe_msg))

def unsubscribe_from_products(products, channel_name, ws):
    message = {
        'type': 'unsubscribe',
        'channel': channel_name,
        'api_key': API_KEY,
        'product_ids': products,
    }
    unsubscribe_msg = timestamp_and_sign(message, channel_name, products)
    ws.send(json.dumps(unsubscribe_msg))

connections = []
sent_unsub = [False]  # Use a mutable variable to track unsubscribe status

for _ in range(1):
    date1 = time.time()
    ws = websocket.WebSocketApp(
        WS_API_URL,
        on_message=on_message
    )

    def on_open(ws):
        products = ['BTC-USD']
        subscribe_to_products(products, CHANNEL_NAMES['user'], ws)

    def on_close(ws):
        date2 = time.time()
        diff_time = abs(date2 - date1)
        if diff_time > 5 and not sent_unsub[0]:
            unsubscribe_from_products(['BTC-USD'], CHANNEL_NAMES['user'], ws)
            sent_unsub[0] = True

    ws.on_open = on_open
    ws.on_close = on_close

    connections.append(ws)

    ws.run_forever()
1 Like

I found the answer, after like a million hours, heres the code to create a working websocket connection, if you are using a different programming language, ask CHATGPT to convert it into your desired programming language:

if you want to buy me a coffee, heres my ETH address:

0xC1d8A70E000C8F12e9b0aC7F7250FE62658ef705

import websocket
import json
import time
import hashlib
import hmac
import CoinbaseAPI
from threading import Thread

# Derived from your Coinbase Retail API Key
# SIGNING_KEY: the signing key provided as a part of your API key. Also called the "SECRET KEY"
# API_KEY: the api key provided as a part of your API key. Also called the "PUBLIC KEY"
SIGNING_KEY = CoinbaseAPI.API_SECRET
API_KEY = CoinbaseAPI.API_KEY

if not SIGNING_KEY or not API_KEY:
    raise ValueError('missing mandatory environment variable(s)')

CHANNEL_NAMES = {
    'level2': 'level2',
    'user': 'user',
    'tickers': 'ticker',
    'ticker_batch': 'ticker_batch',
    'status': 'status',
    'market_trades': 'market_trades',
}

# The base URL of the API
WS_API_URL = 'wss://advanced-trade-ws.coinbase.com'

def sign_message(message):
    message = hmac.new(SIGNING_KEY.encode('utf-8'), message.encode('utf-8'), digestmod=hashlib.sha256).hexdigest()
    return message

def on_message(ws, message):
    parsed_data = json.loads(message)
    print(parsed_data)

def create_websocket(market):
    channel = 'ticker'
    timestamp = str(int(time.time()))
    subscribe_msg = {
        'type': 'subscribe',
        'product_ids': [
            market
        ],
        'channel': 'ticker',
        'api_key': API_KEY,
        'timestamp': timestamp,
        'signature': sign_message(timestamp + channel + market)
    }
    subscribe_msg = json.dumps(subscribe_msg)

    ws = websocket.WebSocketApp(
        WS_API_URL,
        on_message=on_message
    )

    def on_open(ws):
        ws.send(subscribe_msg)

    def on_close(ws):
        print('Websocket connection closed')

    ws.on_open = on_open
    ws.on_close = on_close

    ws.run_forever()

while True:
    try:
        websocket_thread = Thread(target=create_websocket, args=('BTC-USDT',))
        websocket_thread.start()
        
        # Sleep for 1 second to comply with the rate limit
        time.sleep(1)
    except KeyboardInterrupt:
        # Allow interruption with Ctrl+C
        break

1 Like

Here is the code to create multiple websocket connections

Keep in mind, CoinbaseAPI, is a file that stores my sensitive credentials, you should replace that with a file that stores yours

if you want to buy me a coffee, heres my ETH address:

0xC1d8A70E000C8F12e9b0aC7F7250FE62658ef705

import websocket
import json
import time
import hashlib
import hmac
import CoinbaseAPI
from threading import Thread

# Derived from your Coinbase Retail API Key
# SIGNING_KEY: the signing key provided as a part of your API key. Also called the "SECRET KEY"
# API_KEY: the api key provided as a part of your API key. Also called the "PUBLIC KEY"
SIGNING_KEY = CoinbaseAPI.API_SECRET
API_KEY = CoinbaseAPI.API_KEY

if not SIGNING_KEY or not API_KEY:
    raise ValueError('missing mandatory environment variable(s)')

CHANNEL_NAMES = {
    'level2': 'level2',
    'user': 'user',
    'tickers': 'ticker',
    'ticker_batch': 'ticker_batch',
    'status': 'status',
    'market_trades': 'market_trades',
}

# The base URL of the API
WS_API_URL = 'wss://advanced-trade-ws.coinbase.com'

def sign_message(message):
    message = hmac.new(SIGNING_KEY.encode('utf-8'), message.encode('utf-8'), digestmod=hashlib.sha256).hexdigest()
    return message

def on_message(ws, message):
    parsed_data = json.loads(message)
    print(parsed_data)

def create_websocket(product_id):
    channel = 'ticker'
    timestamp = str(int(time.time()))
    subscribe_msg = {
        'type': 'subscribe',
        'product_ids': [
            product_id
        ],
        'channel': 'ticker',
        'api_key': API_KEY,
        'timestamp': timestamp,
        'signature': sign_message(timestamp + channel + product_id)
    }
    subscribe_msg = json.dumps(subscribe_msg)

    ws = websocket.WebSocketApp(
        WS_API_URL,
        on_message=on_message
    )

    def on_open(ws):
        ws.send(subscribe_msg)

    def on_close(ws):
        print('Websocket connection closed')

    ws.on_open = on_open
    ws.on_close = on_close

    ws.run_forever()

# Iterate over each product ID and create a separate thread with a WebSocket connection
for product_id in CoinbaseAPI.PRODUCT_IDS:
    websocket_thread = Thread(target=create_websocket, args=(product_id,))
    websocket_thread.start()
    time.sleep(1)  # Sleep for 1 second between creating each WebSocket connection

1 Like

@RiyadM

Thank you for posting your working code. How are you getting your Best_Bid and Best_Ask
prices from your Web Socket Feeds. I am still using the Coinbase Exchange for that Data.

Have a Great Day!
Justin

I’m getting the raw websocket data with no filters, so it just gives me everything in one big glob of output

1 Like

Thank you for sharing the code

@RiyadM (@Loop_11 )You do not seem to be subscribing to the ‘heartbeats’ channel, have you noticed unexpected drops on the connection when markets are not so active? My understanding is that one would subscribe to both heartbeats and the channel for the product_id (that’s how I am currently doing it, my code is similar to yours, but I use asyncio instead)
Did you add the 1s based on experimentation (I don’t wait), but I do start all my tasks (i.e. threads) before starting ws_connect() call to avoid losing the first message
I am noticing a ‘PENDING’ status coming through the feed, but it is not documented in the REST API ‘status’ info, do you know if the PENDING is only for order creation?
Do you pay attention to the sequence, I am always seeing missing sequences and I have not yet found a bug in my sequencer

Note that the messages comes in with a l2_data label - can’t see why they introduced this inconsistency

Lots of Qs, a coffee break in order ;p