设为首页 收藏本站
切换语言
简体中文
繁体中文
首页 > 资讯 > 正文

TradingView js api demo (四)websocket服务代码

2021-11-30 10:44:54
 
1585

websocket服务代码,用的hyperf框架,需要swoole扩展,参考这个,创建websocket服务

下面是对应控制器的代码

<?php
declare(strict_types&#61;1);

namespace App\Controller;

use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebSocketServer;
use Hyperf\DbConnection\Db;
use Hyperf\Utils\ApplicationContext;
use function GuzzleHttp\json_decode;


/**
 * 介绍一下大概思路
 * 1,订阅redis中所有的channel
 * 2,当有链接进来,需要实时获取最新数据时,就注册到类的sublist数组里
 * 3,当redis有数据更新时,遍历sublist数组,对订阅此channel的每个链接返回数据
 * 4,取消订阅就是把这个链接从sublist数组里删除
 * 
 * 5,获取历史数据没啥好说的,查询数据库,把数据发给前端就行了
 */
class BasisModeTestController implements OnMessageInterface,OnOpenInterface,OnCloseInterface
{

    //记录订阅者
    public $sublist &#61; [];

    public function __construct()
    {   
        \Swoole\Runtime::enableCoroutine();

        $this->subscribe();
    }

    //订阅redis中的所有path
    public function subscribe()
    {
        go(function(){
            $container &#61; ApplicationContext::getContainer();
            $redis &#61; $container->get(\Hyperf\Redis\Redis::class);

            //订阅redis
            $redis->psubscribe(array('zl_*'),[$this,'callback']);
        });
    }

    //根据接收到的method参数,执行对应的方法
    public function onMessage(WebSocketServer $server, Frame $frame): void
    {   
        $input &#61; json_decode($frame->data,true);

        if(empty($input['method'])){
            return;
        }

        $method &#61; $input['method'];

        $this->{$method}($server,$frame,$input);
    }

    public function onOpen(WebSocketServer $server, Request $request): void
    {
        echo &#34;有新链接{$request->fd}\n&#34;;
    }

    //返回指定时间内的数据
    public function KlineHistory($server,$frame,$map)
    {
        go(function()use($server,$frame,$map){
            $str  &#61; strtolower($map['data']['market']);
            $path &#61; substr($str,0,strlen($str)-3);
            $mode &#61; substr($str,-2,2);

            $startTime &#61; $map['data']['from'] * 1000;
            $endTime   &#61; $map['data']['to'] * 1000;
            
            $interval  &#61; $this->formatInterval($map['data']['resolution']);
            
            $method    &#61; $map['method'];
            
            $sql &#61; &#34;select &#96;create_time&#96;,&#96;open&#96;,&#96;close&#96;,&#96;tallest&#96;,&#96;lowest&#96; from &#96;hedge_basis_mode&#96; where &#96;interval&#96;&#61;'{$interval}' and &#96;path&#96;&#61;'{$path}' and &#96;mode&#96;&#61;'{$mode}' and (&#96;create_time&#96;>&#61;{$startTime} and &#96;create_time&#96;<&#61;{$endTime})&#34;;
            $list &#61; Db::select($sql);
            echo $sql.&#34;\n&#34;;
            $res &#61; [
                'data'&#61;>$list,
                'method'&#61;>$method,
                'name'&#61;>$map['data']['market'],
                'from'&#61;>$map['data']['from'],
                'to'&#61;>$map['data']['to'],
                'resolution'&#61;>$map['data']['resolution']
            ];

            $server->push($frame->fd,json_encode($res));
        });
    }

    //对前端传过来的时间格式转换成数据库存的时间格式
    public function formatInterval($interval)
    {
        if ($interval &#61;&#61; 5) {
            $interval &#61; '5m';
        }else if($interval &#61;&#61; 60){
            $interval &#61; '1h';
        }else if($interval &#61;&#61; '1D'){
            $interval &#61; '1d';
        }
        return $interval;
    }

    //实时更新数据
    public function KlineUpdata($server,$frame,$map)
    {
        //这里没有直接返回数据,而是添加了一个订阅者
        $this->addSubscriber($server,$frame,$map);
    }

    //移除订阅者
    public function removeSubscriber($server,$frame,$map)
    {
        $map['id'] &#61; strtolower($map['id']);
        echo &#34;取消{$frame->fd}下的{$map['id']}\n&#34;;
        foreach ($this->sublist as $redisKey&#61;>$list) {
            foreach ($list as $index &#61;> $sub) {
                if($sub['id'] &#61;&#61; $map['id'] && ($sub['frame'])->fd &#61;&#61; $frame->fd){
                    echo &#34;取消{$map['id']}成功\n&#34;;
                    unset($this->sublist[$redisKey][$index]);
                }
            }
        }
    }

    //添加订阅者
    public function addSubscriber($server,$frame,$map)
    {
        $str  &#61; strtolower($map['data']['market']);
        $path &#61; substr($str,0,strlen($str)-3);
        $mode &#61; substr($str,-2,2);
        $interval  &#61; $this->formatInterval($map['data']['resolution']);
        //整理redis订阅的key
        $redisKey &#61; &#34;zl_{$mode}_{$interval}_{$path}&#34;;

        $this->sublist[$redisKey][] &#61; [
            'server'&#61;>$server,
            'frame'&#61;>$frame,
            'id'&#61;>strtolower($map['data']['id'])
        ];
    }

    public function callback($redis, $pattern, $channel, $message)
    {
        if(empty($this->sublist[$channel])){
            return;
        }

        $row &#61; json_decode(str_replace('\'','&#34;',trim($message,&#34;'&#34;)),true);

         //实时传送到前端
         $list &#61; [
            'time'    &#61;> $row['time'],
            'open'    &#61;> $row['open'],
            'close'   &#61;> $row['close'],
            'tallest' &#61;> $row['tallest'],
            'lowest'  &#61;> $row['lowest']
        ];
        
        //给每个订阅此path的订阅者发送消息
        foreach ($this->sublist[$channel] as $sub) {
            ($sub['server'])->push(($sub['frame'])->fd,json_encode(
                $res &#61; [
                    'data'&#61;>$list,
                    'method'&#61;>'KlineUpdata',
                    'id'&#61;>$sub['id']
                ]
            ));
        }
    }

    public function onClose(Server $server, int $fd, int $reactorId): void
    {
        echo &#34;{$fd}--链接关闭\n&#34;;
    }
}

 

 

声明:
本文内容不代表斑马投诉网站观点,内容仅供参考,不构成投资建议。投资有风险,选择需谨慎! 如涉及内容、版权等问题,请联系我们,我们会在第一时间作出调整!

相关文章