Historical and Live data!

feel free to provide adjustments for the script as need for the best performance but soo fair it has been working!
import hmac
import hashlib
import time
import requests
import pytz
import pandas as pd
import matplotlib.pyplot as plt
import json
import logging
import websocket
from datetime import datetime, timedelta
from threading import Thread
from coinbase.wallet.client import Client
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import matplotlib.dates as mdates

Common Configuration

API_KEY = ‘api_key’
API_SECRET = ‘api_sec’
WS_API_URL = ‘wss://advanced-trade-ws.coinbase.com’

last_df_size = 0
his_days = 5
batch_data =
last_batch_time = datetime.now()
df = pd.DataFrame(columns=[‘start’, ‘high’, ‘low’, ‘open’, ‘close’, ‘volume’, ‘product_id’])

Fetch Historical Data Function

def fetch_historical_data():
client = Client(API_KEY, API_SECRET)
BASE_URL = ‘https://api.coinbase.com
ENDPOINT = ‘/api/v3/brokerage/products/ETH-USD/candles’

def to_unix_timestamp(dt):
    return int(dt.timestamp())

def send_request(start_dt, end_dt):
    params = {'start': to_unix_timestamp(start_dt), 'end': to_unix_timestamp(end_dt), 'granularity': 'ONE_MINUTE'}
    timestamp = str(int(time.time()))
    message = timestamp + 'GET' + ENDPOINT
    signature = hmac.new(API_SECRET.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).hexdigest()
    headers = {'CB-ACCESS-KEY': API_KEY, 'CB-ACCESS-SIGN': signature, 'CB-ACCESS-TIMESTAMP': timestamp, 'Content-Type': 'application/json'}
    response = requests.get(BASE_URL + ENDPOINT, params=params, headers=headers)
    return response.json()

end_datetime = datetime.utcnow()
start_datetime = end_datetime - timedelta(days=his_days)
chunk_size = timedelta(hours=5)

candle_data = []
while start_datetime < end_datetime:
    chunk_end_datetime = min(start_datetime + chunk_size, end_datetime)
    response = send_request(start_datetime, chunk_end_datetime)
    if isinstance(response, dict) and 'candles' in response:
        for candle in response['candles']:
            start_time = datetime.utcfromtimestamp(int(candle['start']))
            candle_data.append([start_time, candle['high'], candle['low'], candle['open'], float(candle['close']), candle['volume'], 'ETH-USD'])

    start_datetime = chunk_end_datetime

global df
df = pd.DataFrame(candle_data, columns=['start', 'high', 'low', 'open', 'close', 'volume', 'product_id'])
df['close'] = df['close'].astype(float)
df['start'] = pd.to_datetime(df['start'])
df.sort_values(by='start', inplace=True)
df.reset_index(drop=True, inplace=True)

Real-Time Data Functions

def get_coinbase_server_time():
client = Client(API_KEY, API_SECRET)
server_time = client.get_time()
iso_format_time = server_time[‘iso’].replace(‘Z’, ‘+00:00’)
return datetime.fromisoformat(iso_format_time).replace(tzinfo=pytz.utc)
except Exception as e:
logging.error(“Failed to fetch server time: %s”, e)
return datetime.now(pytz.utc)

def generate_signature(api_secret, timestamp, channel, products):
message = f’{timestamp}{channel}{“,”.join(products)}’
hmac_key = hmac.new(api_secret.encode(), message.encode(), hashlib.sha256)
return hmac_key.hexdigest()

def calculate_time_offset():
server_time = get_coinbase_server_time()
script_time = datetime.now(pytz.utc)
offset = server_time - script_time
return offset.total_seconds()

def subscribe_to_channel(ws, channel, product_ids):
time_offset = calculate_time_offset()
adjusted_time = datetime.now(pytz.utc) + timedelta(seconds=time_offset)
timestamp = str(int(adjusted_time.timestamp()))
signature = generate_signature(API_SECRET, timestamp, channel, product_ids)

subscribe_message = json.dumps({
    "type": "subscribe",
    "product_ids": product_ids,
    "channel": channel,
    "signature": signature,
    "api_key": API_KEY,
    "timestamp": timestamp


def on_message(ws, message):
global df, batch_data, last_batch_time

message_data = json.loads(message)
if message_data.get('channel') == 'candles' and message_data['events'][0]['type'] == 'update':
    candle_data = message_data['events'][0]['candles']
    current_timestamp = get_coinbase_server_time().replace(second=0, microsecond=0)

    for candle in candle_data:
        candle_time = pd.to_datetime(current_timestamp.strftime('%Y-%m-%d %H:%M:%S'))
        if candle_time.second == 0 and not candle_time in df['start'].values:
            new_row = [

    if batch_data:
        new_df = pd.DataFrame(batch_data, columns=['start', 'high', 'low', 'open', 'close', 'volume', 'product_id'])
        df = pd.concat([df, new_df], ignore_index=True)
        batch_data = []
        print(f"DataFrame updated. Size: {len(df)}")
        print(df.tail(1))  # Print the last row after updating

print("New data received at", datetime.utcnow())

def on_error(ws, error):
logging.error(“WebSocket error: {}”.format(error))

def on_close(ws, close_status_code, close_msg):
logging.info(“WebSocket Closed. Status: {}, Message: {}”.format(close_status_code, close_msg))

def reconnect_with_backoff():
retries = 0
delay = 1 # Initial delay in seconds
while retries < MAX_RETRIES:
logging.info(“Attempting to reconnect, retry {}”.format(retries + 1))
except Exception as e:
logging.error(“Reconnection failed: {}”.format(e))
retries += 1
delay *= 2 # Exponential backoff
if retries >= MAX_RETRIES:
logging.error(“Max reconnection attempts reached. Stopping.”)

def start_websocket():
ws = websocket.WebSocketApp(WS_API_URL,
on_open=lambda ws: subscribe_to_channel(ws, “candles”, [“ETH-USD”]),

Initialize the plot

fig, ax = plt.subplots()
line, = ax.plot(df[‘start’], df[‘close’], color=‘blue’) # Plotting only the close price

def update_plot(frame=None):
global last_df_size
current_df_size = df.shape[0]

if current_df_size > last_df_size:
    ax.clear()  # Clear the current axes.
    ax.plot(df['start'], df['close'], color='blue')  # Plot the updated data.

    ax.set_ylabel('Close Price')
    ax.set_title('ETH-USD Close Price Over Time')

    # Use a DateFormatter that includes only the day and month.

    # Set a locator to determine the x-axis tick frequency.
    ax.xaxis.set_minor_locator(mdates.HourLocator(interval=6))  # Minor ticks every 6 hours

    # Rotate date labels for better readability.
    plt.setp(ax.get_xticklabels(), rotation=45, ha='right')

    # Set limits for x and y axis based on the data
    ax.set_xlim([df['start'].min(), df['start'].max()])
    ax.set_ylim([df['close'].min(), df['close'].max()])

    # Draw thin vertical lines at the start of each new day
    unique_dates = df['start'].dt.date.unique()
    for date in unique_dates:
        # Convert date to datetime format and add it as a vertical line
        datetime_date = pd.to_datetime(date)
        ax.axvline(datetime_date, color='gray', linestyle='--', linewidth=0.5)

    plt.tight_layout()  # Adjust layout to prevent overlap
    plt.draw()  # Redraw the canvas for the updates to show.

    last_df_size = current_df_size  # Update the last_df_size for the next call.

return line,

Reduced the interval for more frequent updates

ani = animation.FuncAnimation(fig, update_plot, interval=10000, blit=False, save_count=1000) # 10 seconds interval

if name == “main”:
fetch_historical_data() # Fetch historical data first
start_websocket() # Then start real-time data streaming