Websocket Python

The documentation describes how to subscribe to the websocket with JavaScript. Does anyone have an example for Python?

Thanks for any assistance.

Hello @Hawkster! We are only able to provide guidance and troubleshooting for issues with the raw Coinbase APIs. We regret to inform you that we cannot provide specific guidance on how to develop your application such as writing code in specific programming languages and frameworks, and cannot assist further on this request. Thank you for understanding!

1 Like

Understand. The documentation for Advanced Trade has Python examples of how to do things like buy/sell and I was just looking for a Python example for the websocket.

Hello @Hawkster, we sincerely apologize for the inconvenience this has caused you. As of now, only the Javascript is what we can offer as a code sample for subscribing to websocket channel. Although, we can’t provide a specific timeline. Rest assured that we are constantly working to improve our documentation to provide you the guidance that you may need when using Coinbase services.

We really appreciate your patience and support. Have a great day!

in case this is useful to you, here is my python code. Signing the message was the thing that was most frustrating to get right, but this code works…

import hashlib
import hmac
import time

import websockets # find an asyncio websocket client
import json


def add_signature_ws(message:dict, secret:str):
    nonce = int(time.time())
    to_sign = f"{nonce}{message['channel']}{','.join(message['product_ids'])}"
    print(to_sign)
    signature = hmac.new(secret.encode('utf-8'), to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
    message['signature'] = signature
    message['timestamp'] = str(nonce)

    return message

async def yield_prices(sym:str):

    product_ids = [f"{sym}-USD"]
    ticker_batch = {
        "type": "subscribe",
        "product_ids": product_ids,
        "channel": "ticker_batch",
        'api_key' : SecretManager.get('COINBASE_API_KEY')
    }
    message = add_signature_ws(ticker_batch, SecretManager.get('COINBASE_API_SECRET'))

    ws = await websockets.connect("wss://advanced-trade-ws.coinbase.com")
    await ws.send(json.dumps(message))

    while True:
        response = await ws.recv()
        data = json.loads(response)
        yield data

While we are waiting for the Coinbase docs team to provide samples maybe this will be helpful to you.

P.S. this is copy/pasta from my code, so apologies if you have to tidy it up

2 Likes