How To Use Websocket "Heartbeats" Channel

Hello

Using Spring WebSocketClient, I can successfully subscribe to “Level2” Advanced Trade API websocket channels for individual products (i.e. one product per feed, as recommended).

What I am confused about is the WS Best Practices recommendation to use the “Heartbeats” channel to keep connections alive.

Ref:
WS Best Practices | Coinbase Cloud
WS Channels | Coinbase Cloud

The documentation for WS feeds clearly states that you can only subscribe to ONE channel at a time.

Ref:
WebSocket Overview | Coinbase Cloud

So, my question is: how I am supposed to use the “heartbeats” channel to keep “Level2” subscriptions alive?

Any pointers would be much appreciated :slight_smile:

1 Like

Did you try to send second subscribe message? First you subscribe to level2 then to heartbeats on same websocket connection.

I had not thought of doing that - I will try, and let you know!

Thanks again for the prompt.

send second subscribe message? …

I cannot figure out if it is actually possible to make two subscriptions with one websocket session.

I am using ReactorNettyWebSocketClient in a Spring Boot application, and everything works fine with single-channel subscriptions (to the “level2” channel) with single-product references.

The problem is that some products are not so active, and connections time-out after about a minute with no message updates. (I have tried resetting the ‘timeout’ options available for both the websocket connections and the Flux instances, but that has not solved the problem.)

And I have yet to find any resources that demonstrate multiple channel subscriptions with one and the same Reactor websocket session.

I have no idea what is ReactorNettyWebSocketClient / can not help with that… All I know is that you can send multiple subscribe messages to subscribe to channels you want to use.

Hi @mantheakis! Thank you for using Advanced Trade API. We understand you want to know about subscribing to the heartbeats channel while already using one other channel. We will look into it and get back to you. Please keep in touch.

Hi @mantheakis! Thank you for your patience. For you question on how to subscribe to two channels. You can first send a subcription request for the heartbeats channel and then any other channel that you want. This way you will be able to maintain an active connection and get both heartbeats and other channel data. We hope this helps. Please keep on sharing any feedbacks that you have for Advanced trade API. Do let us know in case you have any more questions or concerns.

Thank you @Loop_11.

I will try running separate connections - one for the heartbeats channel, and another for the level2 channel - and see if that solves the timeout problem.

I will let you know.

1 Like

I think you need heartbeats on same connection! Documentation already says so, no?

Use heartbeats channel to keep your connection alive.

Having heartbeats channel on other connection will keep alive that other connection !

You need find a way to do this:

  • create websocket connection
  • send subscribe message for level2 channel:
{"channel":"level2","product_ids":[],"api_key":"","type":"subscribe","signature":"","timestamp":""}
  • send subscribe message for heartbeats channel:
{"channel":"heartbeats","product_ids":[],"api_key":"","type":"subscribe","signature":"","timestamp":""}

Thank you @muktupavels - I agree. Everything I know tells me, as you say, that the heartbeats session needs to be on the same connection as the level2 session.

You can first send a subcription request for the
heartbeats channel and then any other channel
that you want …

I have tested this using separate websocket connections and (not surprisingly) it failed: the heartbeats channel connection stayed alive until I stopped it, but the level2 channel connection timed-out on its own after a one-minute period of inactivity.

The level2 product was REQ-BTC, deliberately chosen because it generates little traffic.

So, my problem remains: I need to find out how to connect to two different channels with the same websocket connection.

As mentioned before, I am using Java Spring-Framework websockets - and I have yet to find any resources that feature websockets with multiple sessions.

I will keep trying, because this is a deal-breaker issue…

Can you post code part that you use to subscribe to level2 channel? That might help someone to provide answer to you…

/*
 * Initialise WebSocketClient.
 */
WebSocketClient client = 
    new ReactorNettyWebSocketClient( HttpClient.create(), 
                                     WebsocketClientSpec.builder()
                                                        .handlePing( false )
                                                        .maxFramePayloadLength( Integer.MAX_VALUE ) );
/*
 * Timestamp used in message signature.
 */
String timestamp = String.valueOf( Instant.now().getEpochSecond() );
/*
 * Set single-product as list.
 */
List<String> productIdList = List.of( "REQ-BTC" );
/*
 * Signature used in subscribe-message.
 */
String signature = WebSocketCoinbaseUtils.getMessageSignature( MY_API_SECRET, timestamp, "level2", productIdList );
/*
 * Set subscribe-message.
 */
String subscribeMessage = WebSocketCoinbaseUtils.getSubscribeMessageJSON( new SubscribeMessage( productIdList, "level2", MY_API_KEY, timestamp, signature ) );
/*
 * Start websocket session.
 */
Disposable disposable = client.execute( URI.create( WebSocketCoinbaseUtils.URI_COINBASE ),
                                        session -> session.send( Flux.just( session.textMessage( subscribeMessage ) ) )
                                                          .thenMany( session.receive().doOnCancel(    () -> { print(); print( "Level2 cancelled" ); print(); } ) // NOTE 4
                                                                                      .doOnComplete(  () -> { print(); print( "Level2 TIMEOUT" ); print(); } ) // NOTE 4
                                                                                      .doOnError( exception ->  { print(); print( "Level2 ERROR" ); print( exception.getMessage() ); print(); } ) // NOTE 4
                                                                                      .timeout( Duration.ofMinutes( 5 ) ) // NOTE 5
                                                                                      .map( WebSocketMessage::getPayloadAsText ) // NOTE 1
                                                                                      .doOnNext( ThisClass::print ) // NOTE 2
                                                                                      .then() )
                                                          .then() )
                               .subscribe(); // NOTE 3

The above is contained within a test class with a main method.

NOTE 1: WebSocketMessage is a POJO mapped to the JSON content for Level2 updates. FasterXML/jackson is used to map messages into WebSocketMessage instances.

NOTE 2: Production version calls a method that processes message payloads.

NOTE 3: Production version stores the Disposable that is returned by subscribe() - and uses it to stop the websocket connection.

NOTE 4: Production version has targeted implementations for each of these events.

NOTE 5: This timeout setting was tested recently and does NOT extend the websocket timeout period, which is about 1 minute.

The session implementation is for one channel (namely, level2). I cannot figure out how to include a second channel (i.e. heartsbeat) and I have not been able to find any resources that point to such a possibility.

I found this:

reactor.core.publisher.Mono<Void> send(Publisher<WebSocketMessage> messages)
Give a source of outgoing messages, write the messages and return a Mono<Void> that completes when the source completes and writing is done.

Sounds like send accepts multiple messages. Just create second message for heartbeats.

I have been digging into websockets, in particular those provided by WebSocketClient in Spring WebFlux (and the Reactive Streams it uses) and it looks like it is NOT possible for a single websocket connection to stream data from two different channels with two different URLs.

WebSocket connections are established at - and limited to - specific URLs. Hence, after establishing a websocket connection to (say) the “level2” channel, the only way to subscribe to a different URL - e.g. the “heartbeats” channel - is to create a NEW websocket connection. This, of course, does not have the intended effect of keeping the first (level2) websocket connection alive.

Perhaps other languages (or, possibly, other Java libraries) provide websocket connections with multiple URLs, though I doubt it. I do not intend to abandon Spring, so I have no option but to move on to plan B, so to speak, which is to cater for websockets that terminate unexpectedly. (I simply plan to restart them.)

The websocket protocol does specify a keep-alive mechanism (ping and pong: RFC 6455 - The WebSocket Protocol) but Coinbase does not support this, which is a pity.

Thank you again for your interest and responses @muktupavels and @Loop_11 - it always helps to know others have taken an interest.

reactor.core.publisher.Mono send(Publisher messages)

Just for the record, I am not sure where you got that from, @muktupavels.

Here is a link to the Reactor Project Mono jabvadoc - there is no method named “send”.

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html

The method I use is named “just” and it is static.

But there is only one url, no?:

wss://advanced-trade-ws.coinbase.com

You create connection with that url. Then send subscribe message! In your example it is subscribeMessage, you need other for other channel. Don’t confuse subscribe message/messages Coinbase is expecting with subscribe method (line that has NOTE 3)!

Send method is on WebSocketSession object:

That is the protocol (wss == websockets) and the host.

The channel still needs to be specified to complete the URL.

As far as I know - that is wrong! You can open connection without sending any message!!! Of course after 5 seconds you will be disconnected, as documented.

Channel is specified in subscribe message that you are sending to Coinbase after connection has been created.