File: WebClientService.class.php
File: WebClientService.class.php
Role: Class source
Content type: text/plain
Description: Web Client process to handle communication between the web client and the server
Class: Web Socket Service
Handle Web socket accesses using child processes
Author: By
Last change: PHP 5.4 changed pass-by-reference to fully depreciate passing by reference to functions on function call.
Date: 11 years ago
Size: 15,407 bytes



<?php abstract class WebClientService { // Magic Hash const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; // This is the max size a header may be received from a client. Default: 128kb const MAX_HEADER_SIZE = 0x4000; // How long in seconds after not receiving any data to ping client const PING_TIMER = 10; // Number of seconds to close connection after not receiving a pong. const PING_TIMEOUT = 20; // How often it will release the blocking so it can pickup on zombie processes and such. const BLOCKING_TIMER = 1; public $closed = false; public $protocol_version = 0; public $ip; public $port; public $socket; public $lastMsgTime; public $lastPingSent = false; public $lastPing; public $needsHandShake = true; public $master; public $currentData = ''; /** * When data is fully received from the WebSocket connection this function is executed. Extend this function to process the data. * NOTE: you can access the data returned with the variable "$this->currentData" */ abstract public function dataReceived(); /** * When data is received from the parent process it is processed in this function if it is not a type 'c' and 'i' ('c' means close process, 'i' means specific process relay) * @param string $type 1 Character code telling what type of message it is, 'c' is reserved for close connection and 'i' is reserved for specific process relay * @param string $data The data passed from the parent process to the child process. */ abstract public function parentDataRecv($type, $data); public function __construct($socket, $master){ $this->master = $master; $this->socket = $socket; $this->lastMsgTime = time(); socket_getpeername($socket, $this->ip, $this->port); Console::log("Successfully forked process!"); Console::log("WebClient: Connection accepted for: $this->ip:$this->port"); while(true){ while(($child_id = pcntl_wait($status, WNOHANG)) > 0) // Removes zombie children if they exist Console::log("Removed zombie: $child_id"); if($this->closed) return; $connections = array($this->master, $this->socket); socket_select($connections, $write = null, $except = null, static::BLOCKING_TIMER); foreach($connections as $connection){ if($connection === $this->socket){ // Is from web socket $this->recv(); }else{ // Is from parent process Console::log("Receiving master data..."); if($l = socket_recv($connection, $data, 3, MSG_WAITALL)){ $len = (ord($data{0}) << 16) | (ord($data{1}) << 8) | (ord($data{2})); if(!$len) continue; elseif(socket_recv($connection, $data, $len, MSG_WAITALL) == $len){ Console::log("Got master's data..."); $this->parentRecv($data); }else{ // Not enough data Console::log("WebSocket: Error not enough data received!"); $this->close(); return; } }else{ // Connection closed Console::log("WebSocket: Parent process connection closed"); $this->close(); return; } } } $this->check(); } } public function parentRecv($data){ switch($data{0}){ case WebSocket::CHILD_PROCESS_RESPONSE_CLOSE: Console::log("WebSocket: Got close command from parent process"); $this->close(); return; default: Console::log("Received parent data (user defined)"); $this->parentDataRecv($data{0}, substr($data, 1)); return; } } public function close(){ if(!$this->closed){ Console::log("WebClient: Closing socket $this->socket"); @socket_close($this->socket); @socket_close($this->master); $this->closed = true; } } public function recv(){ if($this->needsHandShake){ $lastChr = ''; while(true){ if(!socket_recv($this->socket, $buff, 1, MSG_WAITALL) || $buff === null){ Console::log("WebClient: Looks like client '$this->socket' disconnected"); $this->close(); return false; } if($buff === "\r") continue; elseif($buff === "\n" && $lastChr === "\n"){ $lastChr = $buff; $this->currentData .= $buff; break; } $lastChr = $buff; $this->currentData .= $buff; } $headers = self::getheaders($this->currentData); if(!isset($headers['Connection'])){ Console::log("WebClient: Looks like client '$this->socket' is not a websocket, closing"); $this->close(); return false; } if(isset($headers['Sec-WebSocket-Version'])){ $this->protocol_version = $headers['Sec-WebSocket-Version']; Console::log("WebClient: Client is version {$headers['Sec-WebSocket-Version']}, proceeding to handshake..."); $responseH = "HTTP/1.1 101 Switching Protocols\r\n" . "Upgrade: websocket\r\n" . "Connection: Upgrade\r\n" . "Sec-WebSocket-Accept: " . base64_encode(sha1($headers['Sec-WebSocket-Key'] . static::GUID, true)) . "\r\n\r\n"; }else{ if(isset($headers['Sec-WebSocket-Key1']) && isset($headers['Sec-WebSocket-Key2'])){ $this->protocol_version = 0; Console::log("WebClient: Client is version 00, proceeding to handshake..."); if(!socket_recv($this->socket, $code, 8, MSG_WAITALL) || $code === null){ Console::log("WebClient: Looks like client '$this->socket' disconnected"); $this->close(); return false; } $key1 = preg_match_all('/[0-9]/', $headers['Sec-WebSocket-Key1'], $number) && preg_match_all('/ /', $headers['Sec-WebSocket-Key1'], $space)?implode('', $number[0]) / count($space[0]):''; $key2 = preg_match_all('/[0-9]/', $headers['Sec-WebSocket-Key2'], $number) && preg_match_all('/ /', $headers['Sec-WebSocket-Key2'], $space)?implode('', $number[0]) / count($space[0]):''; $hash = md5(pack('N', $key1).pack('N', $key2).$code, true); $responseH = "HTTP/1.1 101 WebSocket Protocol Handshake\r\n". "Upgrade: WebSocket\r\n". "Connection: Upgrade\r\n". "Sec-WebSocket-Origin: {$headers['Origin']}\r\n". "Sec-WebSocket-Location: ws://{$headers['Host']}{$headers['uri']}\r\n". "\r\n". $hash; }else{ Console::log("WebClient: Looks like client '$this->socket' has version: {$headers['Sec-WebSocket-Version']} which is not a supported websocket version, closing"); $this->close(); return false; } } socket_write($this->socket, $responseH); $this->needsHandShake = false; $this->currentData = ''; }else{ if($this->protocol_version == 0){ $l = socket_recv($this->socket, $buff, 1, MSG_WAITALL); $last_error = socket_last_error($this->socket); if(!$l || !($last_error == 105 || $last_error == 0)){ socket_clear_error($this->socket); Console::log("WebClient: Looks like client '$this->socket' disconnected"); $this->close(); return false; } if($buff !== "\x00"){ Console::log("WebClient: Client did not append a 0xFF byte to head of request"); $this->close(); return false; } $this->currentData = ''; while(true){ $l = socket_recv($this->socket, $buff, 1, MSG_WAITALL); $last_error = socket_last_error($this->socket); if(!$l || !($last_error == 105 || $last_error == 0)){ socket_clear_error($this->socket); Console::log("WebClient: Looks like client '$this->socket' disconnected"); $this->close(); return false; } if($l){ $this->lastMsgTime = time(); $this->lastPingSent = null; } if($buff === "\xFF"){ if(!$this->currentData){ Console::log("WebClient: Received pong from: $this->socket"); $this->currentData = ''; return; } $this->dataReceived(); $this->currentData = ''; break; } $this->currentData .= $buff; } }elseif($this->protocol_version != 0){ $l = socket_recv($this->socket, $buff, 2048, MSG_DONTWAIT); $last_error = socket_last_error($this->socket); if(!$l || !($last_error == 105 || $last_error == 0)){ socket_clear_error($this->socket); Console::log("WebClient: Looks like client '$this->socket' disconnected"); $this->close(); return false; } if($l){ $this->lastMsgTime = time(); $this->lastPingSent = null; } $this->currentData .= $buff; // Not enough data to do anything with if(strlen($this->currentData) < 2) return; static $step, $firstChr, $secondChr, $masked, $dataLen, $maskIndex, $dataIndex, $mask; while(true){ switch($step){ // Step 0 checks data length and decides what length type to use case 0: $firstChr = $this->currentData{0}; $secondChr = $this->currentData{1}; $masked = (bool) ($secondChr & chr(0x80)); $dataLen = ord($secondChr & chr(0x7F)); if($dataLen >= 127){ $step = 1; continue 2; }elseif($dataLen >= 126){ $step = 2; continue 2; }else{ $step = 3; $maskIndex = 2; continue 2; } // If length is for 64bit type. case 1: if(strlen($this->currentData) < 10) return; $dataLen = (ord($this->currentData{2}) << 54) | (ord($this->currentData{3}) << 48) | (ord($this->currentData{4}) << 40) | (ord($this->currentData{5}) << 32) | (ord($this->currentData{6}) << 24) | (ord($this->currentData{7}) << 16) | (ord($this->currentData{8}) << 8) | ord($this->currentData{9}); $maskIndex = 10; $step = 3; continue 2; // If length is 16bit type case 2: if(strlen($this->currentData) < 4) return; $dataLen = (ord($this->currentData{2}) << 8) | ord($this->currentData{3}); $maskIndex = 4; $step = 3; continue 2; // Gets the mask if there is one case 3: // Masking if($masked){ if(strlen($this->currentData) < $maskIndex + 4) return; $mask = substr($this->currentData, $maskIndex, 4); $dataIndex = $maskIndex + 4; }else{ $dataIndex = $maskIndex; } $step = 4; continue 2; // Retreives the data case 4: if($dataLen + $dataIndex > strlen($this->currentData)) return; $firstChrInt = ord($firstChr); $optCodes = $firstChrInt & 0xF; if($optCodes == 0xA){ // Received Pong //Console::log("WebClient: Received pong from: $this->socket"); $this->currentData = ''; $step = 0; }elseif($optCodes == 0x9){ // Received Ping Request socket_write($this->socket, chr(0x8A) . chr(128) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255))); Console::log("WebClient: Received ping request responding with pong from: $this->socket"); $this->currentData = ''; $step = 0; }elseif($optCodes == 0x8){ // Received Close Request Console::log("WebClient:: Received close request from: $this->socket"); $this->close(); return; }else{ if($masked) for($i=0;$i<$dataLen;$i++) $this->currentData{$i + $dataIndex} = $this->currentData{$i + $dataIndex} ^ $mask{$i % 4}; $this->currentData = substr($this->currentData, $dataIndex); $this->dataReceived(); $this->currentData = ''; $step = 0; } break 2; default: $step = 0; continue 2; } } } } } public function ping(){ if($this->protocol_version != 0) @socket_write($this->socket, chr(0x89) . chr(128) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255))); else if(@socket_write($this->socket, "\x00\xFF") === false){ Console::log("Failed to send ping request"); $this->close(); return; } } public function check(){ if(!$this->lastPingSent && $this->lastMsgTime + static::PING_TIMER < time()){ $this->lastPingSent = time(); //Console::log("WebSocket: Sending Ping to client on $this->socket"); $this->ping(); }elseif($this->lastPingSent && $this->lastPingSent + static::PING_TIMEOUT < time() && $this->lastMsgTime + static::PING_TIMEOUT < time()){ Console::log("WebSocket: Ping not received for ". static::PING_TIMEOUT . " seconds on $this->socket"); $this->close(); } } public function fork(){ $id = pcntl_fork(); if($id == -1){ return false; }elseif($id) // parent return false; else // child set_time_limit(30); return true; } public function sendToParent($type, $data, array $pids = array()){ if($pids) // Relay to all processes $data = 'i'.implode(',', $pids).':'.$type.$data; else $data = $type.$data; $i = 0; $length = strlen($data); if($length > 0xFFFFFF){ Console::log("Failed to send packet to master, 16MB limit hit"); return false; } for($i=0;$i<3;$i++) $data = chr(($length >> ($i * 8)) & 0xFF).$data; $length = strlen($data); do{ if(($d = @socket_send($this->master, $data, strlen($data), 0)) === false){ Console::log("WebSocket: Failed to send data to: $this->master"); $this->close(); return false; } if(!$d) usleep(25); $i += $d; }while($length > $i); return true; } public function send($data, $encode = true){ // Fork over to new process to send the data to keep blocking from consumeing the entire service. if(!$this->fork()) return; $i = 0; if($encode) $data = $this->encodeData($data); $length = strlen($data); do{ if(($d = @socket_write($this->socket, $da = substr($data, $i), strlen($da))) === false){ Console::log("WebSocket: Failed to send data to: $this->socket"); $this->close(); break; } if(!$d) usleep(25); $i += $d; }while($length > $i); exit; } public static function getheaders($header){ preg_match("/GET (.+) HTTP\\/([0-9]+\\.[0-9]+)[\r\n]+/", $header, $match); $return = array( 'uri' => $match[1], 'http-version' => $match[2] ); preg_match_all("/([^:]+): ([^\n\r]*)[\r\n]+/", $header, $matches, PREG_SET_ORDER); foreach($matches as $match) $return[$match[1]] = $match[2]; return $return; } public function encodeData($data){ if($this->protocol_version != 0){ $mask = chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)); $frame = chr(0x81); $dataLength = strlen($data); if($dataLength <= 125) $frame .= chr($dataLength | 128); elseif($dataLength < 0xFFFF){ $frame .= chr(254); $frame .= chr($dataLength >> 8); $frame .= chr($dataLength & 0xFF); }else{ $frame .= chr(255); $frame .= chr($dataLength >> 0x30 & 0xFF); $frame .= chr($dataLength >> 0x28 & 0xFF); $frame .= chr($dataLength >> 0x20 & 0xFF); $frame .= chr($dataLength >> 0x18 & 0xFF); $frame .= chr($dataLength >> 0x10 & 0xFF); $frame .= chr($dataLength >> 0x8 & 0xFF); $frame .= chr($dataLength & 0xFF); } $frame .= $mask; for($i = 0; $i < strlen($data); $i++) $frame .= $data[$i] ^ $mask{$i % 4}; return $frame; }elseif($this->protocol_version == 0){ return "\x00$data\xFF"; } } }