TradingView js api demo (四)websocket服务代码
websocket服务代码,用的hyperf框架,需要swoole扩展,参考这个,创建websocket服务
下面是对应控制器的代码
<?php
declare(strict_types=1);
namespace App\Controller;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebSocketServer;
use Hyperf\DbConnection\Db;
use Hyperf\Utils\ApplicationContext;
use function GuzzleHttp\json_decode;
/**
* 介绍一下大概思路
* 1,订阅redis中所有的channel
* 2,当有链接进来,需要实时获取最新数据时,就注册到类的sublist数组里
* 3,当redis有数据更新时,遍历sublist数组,对订阅此channel的每个链接返回数据
* 4,取消订阅就是把这个链接从sublist数组里删除
*
* 5,获取历史数据没啥好说的,查询数据库,把数据发给前端就行了
*/
class BasisModeTestController implements OnMessageInterface,OnOpenInterface,OnCloseInterface
{
//记录订阅者
public $sublist = [];
public function __construct()
{
\Swoole\Runtime::enableCoroutine();
$this->subscribe();
}
//订阅redis中的所有path
public function subscribe()
{
go(function(){
$container = ApplicationContext::getContainer();
$redis = $container->get(\Hyperf\Redis\Redis::class);
//订阅redis
$redis->psubscribe(array('zl_*'),[$this,'callback']);
});
}
//根据接收到的method参数,执行对应的方法
public function onMessage(WebSocketServer $server, Frame $frame): void
{
$input = json_decode($frame->data,true);
if(empty($input['method'])){
return;
}
$method = $input['method'];
$this->{$method}($server,$frame,$input);
}
public function onOpen(WebSocketServer $server, Request $request): void
{
echo "有新链接{$request->fd}\n";
}
//返回指定时间内的数据
public function KlineHistory($server,$frame,$map)
{
go(function()use($server,$frame,$map){
$str = strtolower($map['data']['market']);
$path = substr($str,0,strlen($str)-3);
$mode = substr($str,-2,2);
$startTime = $map['data']['from'] * 1000;
$endTime = $map['data']['to'] * 1000;
$interval = $this->formatInterval($map['data']['resolution']);
$method = $map['method'];
$sql = "select `create_time`,`open`,`close`,`tallest`,`lowest` from `hedge_basis_mode` where `interval`='{$interval}' and `path`='{$path}' and `mode`='{$mode}' and (`create_time`>={$startTime} and `create_time`<={$endTime})";
$list = Db::select($sql);
echo $sql."\n";
$res = [
'data'=>$list,
'method'=>$method,
'name'=>$map['data']['market'],
'from'=>$map['data']['from'],
'to'=>$map['data']['to'],
'resolution'=>$map['data']['resolution']
];
$server->push($frame->fd,json_encode($res));
});
}
//对前端传过来的时间格式转换成数据库存的时间格式
public function formatInterval($interval)
{
if ($interval == 5) {
$interval = '5m';
}else if($interval == 60){
$interval = '1h';
}else if($interval == '1D'){
$interval = '1d';
}
return $interval;
}
//实时更新数据
public function KlineUpdata($server,$frame,$map)
{
//这里没有直接返回数据,而是添加了一个订阅者
$this->addSubscriber($server,$frame,$map);
}
//移除订阅者
public function removeSubscriber($server,$frame,$map)
{
$map['id'] = strtolower($map['id']);
echo "取消{$frame->fd}下的{$map['id']}\n";
foreach ($this->sublist as $redisKey=>$list) {
foreach ($list as $index => $sub) {
if($sub['id'] == $map['id'] && ($sub['frame'])->fd == $frame->fd){
echo "取消{$map['id']}成功\n";
unset($this->sublist[$redisKey][$index]);
}
}
}
}
//添加订阅者
public function addSubscriber($server,$frame,$map)
{
$str = strtolower($map['data']['market']);
$path = substr($str,0,strlen($str)-3);
$mode = substr($str,-2,2);
$interval = $this->formatInterval($map['data']['resolution']);
//整理redis订阅的key
$redisKey = "zl_{$mode}_{$interval}_{$path}";
$this->sublist[$redisKey][] = [
'server'=>$server,
'frame'=>$frame,
'id'=>strtolower($map['data']['id'])
];
}
public function callback($redis, $pattern, $channel, $message)
{
if(empty($this->sublist[$channel])){
return;
}
$row = json_decode(str_replace('\'','"',trim($message,"'")),true);
//实时传送到前端
$list = [
'time' => $row['time'],
'open' => $row['open'],
'close' => $row['close'],
'tallest' => $row['tallest'],
'lowest' => $row['lowest']
];
//给每个订阅此path的订阅者发送消息
foreach ($this->sublist[$channel] as $sub) {
($sub['server'])->push(($sub['frame'])->fd,json_encode(
$res = [
'data'=>$list,
'method'=>'KlineUpdata',
'id'=>$sub['id']
]
));
}
}
public function onClose(Server $server, int $fd, int $reactorId): void
{
echo "{$fd}--链接关闭\n";
}
}
声明:
本文内容不代表斑马投诉网站观点,内容仅供参考,不构成投资建议。投资有风险,选择需谨慎! 如涉及内容、版权等问题,请联系我们,我们会在第一时间作出调整!