Code Snippets
Last updated
Below are minimal examples demonstrating how to connect and subscribe to streams using socketio and native WebSockets. These snippets are intended as starting points for integration and are not production-ready.
Below is an example implemented using TypeScript and socket.io-client.
Another example using Java to listen on BookDepth.
For Python developers, our Python SDK offers a streamlined approach to WebSocket subscriptions Refer to the Python SDK documentation for more information.
Below is an example using the ws Typescript library.
Another example using Rust with tungstenite to listen on a few feeds.
Last updated
import { io } from 'socket.io-client';
const API_BASE_URL = 'https://api.ethereal.trade';
const WS_BASE_URL = 'wss://ws.ethereal.trade/v1/stream';
const getProducts = async () => {
const res = await fetch(`${API_BASE_URL}/v1/product`);
const json = (await res.json()) as { data: { id: string }[] };
return json.data;
};
const main = async () => {
const ws = io(WS_BASE_URL, { transports: ['websocket'], autoConnect: false });
console.log(`Connecting to ws gateway ${WS_BASE_URL}`);
ws.on('connect', async () => {
console.log(`Connected to ${WS_BASE_URL}`);
const products = await getProducts();
products.forEach((product) => {
const bookDepthSubscriptionMessage = { type: 'BookDepth', productId: product.id };
ws.emit('subscribe', bookDepthSubscriptionMessage);
console.log(`Subscribed BookDepth:${product.id}`);
const marketPriceSubscriptionMessage = { type: 'MarketPrice', productId: product.id };
ws.emit('subscribe', marketPriceSubscriptionMessage);
console.log(`Subscribed MarketPrice:${product.id}`);
});
});
ws.on('connecting', () => console.log('Attempting connection...'));
ws.on('disconnect', () => console.log('Disconnected'));
ws.on('error', (err) => console.log('Error encountered', err));
ws.on('exception', (err) => console.log('Caught exception', err));
ws.on('reconnect_attempt', () => console.log('Attempting to reconnect...'));
// --- Subscription stream handlers --- //
ws.on('BookDepth', (message) => console.log('[BookDepth]', JSON.stringify(message)));
ws.on('MarketPrice', (message) => console.log('[MarketPrice]', JSON.stringify(message)));
// Explicitly connect to ws stream _after_ binding message callbacks.
ws.connect();
};
void main();
package com.example.ethereal;
import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class Example {
private Socket client;
private CountDownLatch connectionLatch = new CountDownLatch(1);
public static void main(String[] args) {
EtherealClient client = new EtherealClient();
try {
client.connect();
client.subscribeToBookDepth("6dae67f4-c502-4cc1-8d1a-38ab021b2c76");
// Keep running for 30 seconds
Thread.sleep(30000);
} catch (Exception e) {
System.err.println("Error: " + e.getMessage());
e.printStackTrace();
} finally {
client.disconnect();
}
}
public void connect() throws InterruptedException {
System.out.println("Connecting to Ethereal Testnet WebSocket...");
URI serverUri = URI.create("wss://ws.etherealtest.net/v1/stream");
IO.Options options = new IO.Options();
options.transports = new String[]{"websocket"};
options.upgrade = false;
options.timeout = 10000;
client = IO.socket(serverUri, options);
client.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
System.out.println("Connected! Socket ID: " + client.id());
connectionLatch.countDown();
}
});
client.on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() {
@Override
public void call(Object... args) {
System.err.println("Connection failed: " + Arrays.toString(args));
connectionLatch.countDown();
}
});
client.on("BookDepth", new Emitter.Listener() {
@Override
public void call(Object... args) {
System.out.println("BookDepth: " + Arrays.toString(args));
}
});
client.on("exception", new Emitter.Listener() {
@Override
public void call(Object... args) {
System.err.println("Server exception: " + Arrays.toString(args));
}
});
client.connect();
if (!connectionLatch.await(15, TimeUnit.SECONDS)) {
throw new RuntimeException("Connection timeout");
}
}
public void subscribeToBookDepth(String productId) {
if (!client.connected()) {
System.err.println("Not connected!");
return;
}
Map<String, Object> request = new HashMap<>();
request.put("type", "BookDepth");
request.put("productId", productId);
System.out.println("Subscribing: " + request);
client.emit("subscribe", request);
}
public void disconnect() {
if (client != null) {
System.out.println("Disconnecting...");
client.disconnect();
client.close();
}
}
}import WebSocket from 'ws';
import axios from 'axios';
axios.defaults.baseURL = 'https://api.ethereal.trade/v1';
const getProducts = async () => {
const res = await axios.get('/product');
return res.data.data;
};
const main = async () => {
const url = 'wss://ws2.ethereal.trade/v1/stream';
const ws = new WebSocket(url);
console.log(`Connecting to ws gateway ${url}`);
ws.on('open', async () => {
console.log(`Connected to ${url}`);
const products = await getProducts();
products.forEach((product: { id: string; ticker: string }) => {
const l2BookSubscriptionMessage = {
event: 'subscribe',
data: { type: 'L2Book', symbol: product.ticker },
};
ws.send(JSON.stringify(l2BookSubscriptionMessage));
console.log(`Subscribed L2Book:${product.ticker}`);
const tickerSubscriptionMessage = {
event: 'subscribe',
data: { type: 'Ticker', symbol: product.ticker },
};
ws.send(JSON.stringify(marketPriceSubscriptionMessage));
console.log(`Subscribed Ticker:${product.ticker}`);
});
});
ws.on('close', () => console.log('Disconnected'));
ws.on('error', (err) => console.log('Error encountered', err));
// --- Subscription stream handlers --- //
ws.on('message', (data) => {
const message = JSON.parse(data.toString());
switch (message.e) {
case 'L2Book':
console.log(`[L2Book] Received ${data.toString()}`);
break;
case 'Ticker':
console.log(`[Ticker] Received ${data.toString()}`);
break;
}
});
};
void main();
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use tokio_tungstenite::{connect_async, tungstenite::Message};
const BASE_URL: &str = "https://api.ethereal.trade/v1";
const WS_URL: &str = "wss://ws2.ethereal.trade/v1/stream";
#[derive(Debug, Deserialize)]
struct ProductPage {
data: Vec<Product>,
}
#[derive(Debug, Deserialize)]
struct Product {
ticker: String,
}
#[derive(Serialize)]
struct WsMessage {
event: &'static str,
data: SubscriptionData,
}
#[derive(Serialize)]
struct SubscriptionData {
r#type: &'static str,
symbol: String,
}
async fn get_products() -> Vec<Product> {
let url = format!("{}/product", BASE_URL);
reqwest::get(&url)
.await
.unwrap()
.json::<ProductPage>()
.await
.unwrap()
.data
}
fn subscribe_msg(kind: &'static str, symbol: String) -> Message {
let msg = WsMessage {
event: "subscribe",
data: SubscriptionData {
r#type: kind,
symbol,
},
};
Message::Text(serde_json::to_string(&msg).unwrap().into())
}
#[tokio::main]
async fn main() {
let products = get_products().await;
let (ws_stream, _) = connect_async(WS_URL).await.unwrap();
let (mut write, mut read) = ws_stream.split();
for p in &products {
for kind in ["L2Book", "Ticker"] {
write
.send(subscribe_msg(kind, p.ticker.clone()))
.await
.unwrap();
println!("Subscribed {kind}:{}", p.ticker);
}
}
while let Some(Ok(msg)) = read.next().await {
match msg {
Message::Text(text) => println!("{text}"),
Message::Ping(data) => {
let _ = write.send(Message::Pong(data)).await;
}
Message::Close(_) => break,
_ => {}
}
}
}