php语言落成redis的客户端

2019-08-04 18:16栏目:编程学习

为了更好的了解redis协议,我们用php来实现一个支持大部份命令的客户端类.

大家都知道redis是用C来实现的,现在我用php来实现一个简单的仅支持SET和GET命令的redis服务端,主要是为了更好的了解redis的服务端和php的网络编程.

redis的协议可参考这个文章

代码如下:

代码如下:

<?php
/**
 * 多进程阻塞式
 */
class Xtgxiso_server
{
    private $socket = false;
    private $process_num = 100;
    public $redis_kv_data = array();
    public $onMessage = null;

<?php
namespace xtgxiso;
class Redis {
    private $redis_socket = false;
    private $cmd = '';
    public function __construct($host='127.0.0.1',$port=6379,$timeout = 3) {
        $this->redis_socket = stream_socket_client("tcp://".$host.":".$port, $errno, $errstr,  $timeout);
        if ( !$this->redis_socket) {
            throw new Exception("{$errno} - {$errstr}");
        }
    }
    public function __destruct() {
        fclose($this->redis_socket);
    }
    public function __call($name, $args) {
        $crlf = "rn";
        array_unshift($args,$name);
        $command = '*' . count($args) . $crlf;
        foreach ($args as $arg) {
            $command .= '$' . strlen($arg) . $crlf . $arg . $crlf;
        }
        $fwrite = fwrite($this->redis_socket,$command);
        if ($fwrite === FALSE || $fwrite <= 0) {
            throw new Exception('Failed to write entire command to stream');
        }
        return $this->readResponse();
    }
    private function readResponse() {
        $reply = trim(fgets($this->redis_socket, 1024));
        switch (substr($reply, 0, 1)) {
            case '-':
                throw new Exception(trim(substr($reply, 4)));
                break;
            case ' ':
                $response = substr(trim($reply), 1);
                if ($response === 'OK') {
                    $response = TRUE;
                }
                break;
            case '$':
                $response = NULL;
                if ($reply == '$-1') {
                    break;
                }
                $read = 0;
                $size = intval(substr($reply, 1));
                if ($size > 0) {
                    do {
                        $block_size = ($size - $read) > 1024 ? 1024 : ($size - $read);
                        $r = fread($this->redis_socket, $block_size);
                        if ($r === FALSE) {
                            throw new Exception('Failed to read response from stream');
                        } else {
                            $read = strlen($r);
                            $response .= $r;
                        }
                    } while ($read < $size);
                }
                fread($this->redis_socket, 2); /* discard crlf */
                break;
            /* Multi-bulk reply */
            case '*':
                $count = intval(substr($reply, 1));
                if ($count == '-1') {
                    return NULL;
                }
                $response = array();
                for ($i = 0; $i < $count; $i ) {
                    $response[] = $this->readResponse();
                }
                break;
            /* Integer reply */
            case ':':
                $response = intval(substr(trim($reply), 1));
                break;
            default:
                throw new RedisException("Unknown response: {$reply}");
                break;
        }
        return $response;
    }
}
/*
$redis = new Client_test();
var_dump($redis->auth("123456"));
var_dump($redis->set("xtgxiso",'abc'));
var_dump($redis->get("xtgxiso"));
*/
通过实现,我们基本了解redis的协议。

    function __construct($host="0.0.0.0",$port=1215)
    {
        $this->socket = stream_socket_server("tcp://".$host.":".$port,$errno, $errstr);
        if (!$this->socket) die($errstr."--".$errno);
        echo "listen $host $port rn";
        ini_set("memory_limit", "128M");
    }

    private function parseRESP(&$conn){
        $line = fgets($conn);
        if($line === '' || $line === false)
        {
            return null;
        }
        $type = $line[0];
        $line = mb_substr($line,1,-2);
        switch ( $type ){
            case "*":
                $count = (int) $line;
                $data = array();
                for ($i = 1; $i <= $count; $i ) {
                    $data[] = $this->parseRESP($conn);
                }
                return $data;
            case "$":
                if ($line == '-1') {
                    return null;
                }
                $length = $line 2;
                $data = '';
                while ($length > 0) {
                    $block = fread($conn, $length);
                    if ($length !== strlen($block)) {
                        throw new Exception('RECEIVING');
                    }
                    $data .= $block;
                    $length -= mb_strlen($block);
                }
                return mb_substr($data, 0, -2);
        }
        return $line;
    }

    private function start_worker_process(){
        $pid = pcntl_fork();
        switch ($pid) {
            case -1:
                echo "fork error : {$i} rn";
                exit;
            case 0:
                while ( 1 ) {
                    echo  "waiting...n";
                    $conn = stream_socket_accept($this->socket, -1);
                    if ( !$conn ){
                        continue;
                    }
                    //"*3rn$3rnSETrn$5rnmykeyrn$7rnmyvaluern"
                    while(1){
                        $arr = $this->parseRESP($conn);
                        if ( is_array($arr) ) {
                            if ($this->onMessage) {
                                call_user_func($this->onMessage, $conn, $arr);
                            }
                        }else if ( $arr ){
                            if ($this->onMessage) {
                                call_user_func($this->onMessage, $conn, $arr);
                            }
                        }else{
                            fclose($conn);
                            break;
                        }
                    }
                }
            default:
                $this->pids[$pid] = $pid;
                break;
        }
    }

    public function run(){
        for($i = 1; $i <= $this->process_num; $i ){
            $this->start_worker_process();
        }

        while(1){
            foreach ($this->pids as $i => $pid) {
                if($pid) {
                    $res = pcntl_waitpid($pid, $status,WNOHANG);

                    if ( $res == -1 || $res > 0 ){
                        $this->start_worker_process();
                        unset($this->pids[$pid]);
                    }
                }
            }
            sleep(1);
        }
    }

}
$server =  new Xtgxiso_server();

$server->onMessage = function($conn,$info) use($server){
    if ( is_array($info) ){
        if ( $info["0"] == "SET" ) {
            $key = $info[1];
            $val = $info[2];
            $server->redis_kv_data[$key] = $val;
            fwrite($conn, " OKrn");
        }else if ( $info["0"] == "GET" ){
            $key = $info[1];
            fwrite($conn, "$".strlen($server->redis_kv_data[$key])."rn".$server->redis_kv_data[$key]."rn");
        }else{
            fwrite($conn," OKrn");
        }
    }else{
        fwrite($conn," OKrn");
    }
};

$server->run();
通过如下命令来测试PHP实现的性能:

redis-benchmark -h 10.170.233.221 -p 1215 -t set -n 80000 -q

图片 1

 

看来还是不错的,大家有兴趣可以再实现其他命令!

版权声明:本文由威尼斯人app发布于编程学习,转载请注明出处:php语言落成redis的客户端