主页 > 下载安卓版imtoken钱包 > Netty+WebSocket获取火币兑换数据项

Netty+WebSocket获取火币兑换数据项

下载安卓版imtoken钱包 2023-03-24 07:36:38

Netty+WebSocket从火币交易所获取数据项

先附上项目项目GitHub地址 spring-boot-netty-websocket-huobi

项目介绍

本项目使用SpringBoot+Netty开发WebSocket服务器,与火币交易所Websocket建立连接,不定时获取火币交易所推送的交易对的最新数据。

本项目可作为项目直接用于实际开发,获取各大交易所最新的交易对相关数据。

项目本身也是我在前公司开发的一个项目,用来获取各大交易所的数据。现在我只是重新组织了代码。现在更像是一个脚手架项目,可以在此基础上轻松添加其他交易所。

技术架构

SpringBoot2.1.5 +Netty4.1.25 + Maven3.5.4 + lombok(插件)

项目测试

通过直接启动Springboot启动类Application.java,可以随时获取火币网推送的交易对数据。

如图所示

1090617-20190731184311639-1781494089.gif

一、项目概览1、项目启动入口

项目启动时,开始连接火币交易所Websocket订阅数据。

   /**
     * 首次启动并订阅火币websocket数据
     */
    @PostConstruct
    public void firstSub() {
        try {
            huobiProMainService.start();
        } catch (Exception e) {
            log.error("huobi 首次启动订阅异常", e);
        }
    }

2、获取交易对数据

我们首先需要获取火币交易所的所有交易对数据,然后告诉火币交易所我需要订阅哪些交易对数据。

是订阅所有交易对数据还是订阅部分交易对数据。

    @Override
    public synchronized List getChannelCache() {
        // 假设这里是从远处拉取交易对数据
        List list = Lists.newArrayList("btcusdt");
        return list;
    }

3、连接火币交易所Websocket从火币网提币显示地址无效,订阅指定交易对。

首先与火币WebSocket建立连接。连接成功后,告诉它我要订阅哪些交易对和主题。成功后,火币交易所会根据我们订阅的话题和交易对,不定期的给我们推送消息。.

 /**
     * 首次订阅交易对数据
     *
     * @param channelList 交易对列表
     * @param topicFormat 交易对订阅主题格式
     */
    private void firstSub(List channelList, String topicFormat) {
        //封装huoBiProWebSocketService对象
        klineClient = new HuoBiProWebSocketClient(huoBiProWebSocketService);
        //启动连接火币网websocket
        klineClient.start();
        for (String channel : channelList) {
            //订阅具体交易对
            klineClient.addSub(formatChannel(topicFormat, channel));
        }
    }

启动连接火币websocket的核心代码

很明显,我们是获取服务端数据的客户端,所以这里的Bootstrap是用来和服务端交互的,而不是使用ServerBootstrap。

还有一点,作为客户端,我们要获取服务端推送的消息,所以我们自定义的handler就是inbound Handler,所以这里选择了SimpleChannelInboundHandler。

   /**
         * 连接WebSocket,
         *
         * @param uri url构造出URI
         * @param handler 处理消息
         */
        protected void connectWebSocket(final URI uri, SimpleChannelInboundHandler handler) {
            try {
                String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
                final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
                final int port;
                if (uri.getPort() == -1) {
                    if ("http".equalsIgnoreCase(scheme) || "ws".equalsIgnoreCase(scheme)) {
                        port = 80;
                    } else if ("wss".equalsIgnoreCase(scheme)) {
                        port = 443;
                    } else {
                        port = -1;
                    }
                } else {
                    port = uri.getPort();
                }
                if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
                    System.out.println("Only WS(S) is supported");
                    throw new UnsupportedAddressTypeException();
                }
                final boolean ssl = "wss".equalsIgnoreCase(scheme);
                final SslContext sslCtx;
                if (ssl) {
                    sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
                } else {
                    sslCtx = null;
                }
                group = new NioEventLoopGroup(2);
                //构建客户端Bootstrap
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        if (sslCtx != null) {
                            pipeline.addLast(sslCtx.newHandler(ch.alloc(), host, port));
                        }
                        //pipeline可以同时放入多个handler,最后一个为自定义hanler
                        pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler);
                    }
                });
                channel = bootstrap.connect(host, port).sync().channel();
            } catch (Exception e) {
                log.error(" webSocketClient start error.", e);
                if (group != null) {
                    group.shutdownGracefully();
                }
            }
    }

4、自定义处理程序

自定义 Handler 是核心。作为入站数据,我们选择继承 SimpleChannelInboundHandler。要继承它,我们必须实现一个名为 channelRead0 的方法。通过这个方法的msg从火币网提币显示地址无效,我们可以得到火币交易所不定时推送的消息。

/**
 * @Description: 火币网WebSocket 消息处理类
 * 自定义入站的handler 这个也是核心类
 */
@Slf4j
public class HuoBiProWebSocketClientHandler extends SimpleChannelInboundHandler {
    private WebSocketClientHandshaker handshaker;
    private HuoBiProWebSocketClient client;
    
    /**
     * 该handel获取消息的方法
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel channel = ctx.channel();
        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof BinaryWebSocketFrame) {
            //火币网的数据是压缩过的,所以需要我们进行解压
            BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
            //获取数据、保存数据
            client.onReceive(decodeByteBuf(binaryFrame.content()));
        } else if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
            client.onReceive(textWebSocketFrame.text());
        } 
     }
  }

二、项目说明1、服务器问题

一般交易所的服务器都在国外,所以我们无法在本地建立Websocket连接,除非我们在本地翻墙。

同一个项目不能部署到阿里云等国内服务器上。您只能选择香港或外国服务器来部署项目。

这里是火币网专门提供的国内测试地址,可以在本地获取数据。

2、从交易所获取最新交易对数据

当我们订阅交易所的Websocket交易对时,我们首先要知道交易所有哪些交易对。这个数据需要单独获取,一次获取是不够的。

因为交易所可能会增加或删除交易对。因此,我们需要通过定时任务获取和更新最新的交易对数据。

我这里只模拟了一个交易对btcusdt,并没有提供获取最新交易对数据的服务。

3、数据存储问题

这也是一个值得思考的问题。我们已经获得了数据,但是如果我们保存它!

正常合理的开发应该将数据获取为微服务,将获取的数据处理为微服务。然后你只需要获取数据并调整数据处理微服务来保存数据。

但是在这里,如果这不起作用。

由于火币网向我们推送消息的速度比我们传输其他服务保存的数据要快,因此会有数据丢失。

这只是输出一个btcusdt交易对,订阅一个k线主题而已,但实际上交易所会有上百个交易对和几个订阅主题,

这样的消息推送速度比上面要快上百倍。所以你会发现,如果你不做任何改变,对于一些大的交易所,你的数据来不及存储。

补充

之前也写过Netty和Websocket相关的博客文章,可以作为参考。

1、Netty话题(共9篇)

2、Websocket话题(共5篇)

 我相信,无论今后的道路多么坎坷,只要抓住今天,迟早会在奋斗中尝到人生的甘甜。抓住人生中的一分一秒,胜过虚度中的一月一年!(1)

转载于: