Browse Source

Initial commit.

Adam Klvač 4 years ago
commit
aaf480b9c5

+ 4 - 0
.gitattributes

@@ -0,0 +1,4 @@
+.gitattributes export-ignore
+.gitignore export-ignore
+composer.lock export-ignore
+tests/ export-ignore

+ 3 - 0
.gitignore

@@ -0,0 +1,3 @@
+vendor/
+composer.lock
+tests/**/output/

+ 22 - 0
LICENSE.md

@@ -0,0 +1,22 @@
+Copyright (c) 2018 Adawolfa
+
+Permission is hereby granted, free of charge, to any person
+obtaining a copy of this software and associated documentation
+files (the "Software"), to deal in the Software without
+restriction, including without limitation the rights to use,
+copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the
+Software is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+OTHER DEALINGS IN THE SOFTWARE.

+ 184 - 0
README.md

@@ -0,0 +1,184 @@
+# FastCGI server and client
+
+This is asynchronous FastCGI both server and client implementation for PHP, more specifically ReactPHP. CGI requests and responses are converted to standard HTTP interop message implementations developers are already familiar with. That makes writing a server or client very easy and pretty much the same like writing their HTTP counterpart.
+
+## Installing
+
+~~~
+composer install adawolfa/fastcgi
+~~~
+
+## Writing client
+
+Client must be obtained using connector. This is different from HTTP client, since FastCGI isn't limited to one request and response per connection. You can (and should) send multiple requests. Note that their parallel processing is limited by the server you are connecting to (for instance, native PHP CGI doesn't support it, but handles queued requests correctly).
+
+~~~php
+use Adawolfa\FastCGI;
+
+$loop = React\EventLoop\Factory::create();
+$connector = new FastCGI\Connector($loop);
+
+$connector->connect('127.0.0.1:9000')->then(function(FastCGI\Client $client) {
+
+    $request = new FastCGI\Request('GET', '/index.php');
+
+    $client->send($request)->then(function(FastCGI\Response $response) {
+        print (string) $response->getBody();
+    });
+
+});
+
+$loop->run();
+~~~
+
+### Handling errors, aborts and timeouts
+
+You can obtain request wrapper using listener:
+
+~~~php
+$client->on('begin', function(FastCGI\Client\Request $request) {
+
+    $request->on('headers', function(FastCGI\Response $response) {
+        // This is what promise handler is fulfilled with.
+    });
+
+    $request->on('error', function(Throwable $exception) {
+        // Incorrect frame (protocol specific error, you can ignore those).
+    });
+
+    $request->on('abort', function(Throwable $exception) {
+        // Called when server aborts the request.
+    });
+
+    $request->on('end', function() {
+        // Called on response body end, always.
+    });
+
+});
+
+$client->on('error', function(Throwable $exception) {
+    // Protocol error, usually not important.
+});
+
+$client->on('close', function() {
+    // This happens if server or client closes the connection.
+});
+~~~
+
+If you need some sort of timeout, listen for `begin` event, set a timer and cancel it once `headers` occur.
+
+~~~php
+$client->on('begin', function(FastCGI\Client\Request $request) use($loop) {
+
+    $timer = $loop->addTimer(10, function() use($request) {
+        $request->abort();
+    });
+
+    $request->on('headers', function(FastCGI\Response $response) use($loop, $timer) {
+        $loop->cancelTimer($timer);
+    });
+
+});
+~~~
+
+## Writing server
+
+Writing the FastCGI server is very similar to writing its HTTP counterpart. Your handler is expected to return a HTTP response or a promise evaluating into one.
+
+~~~php
+use Adawolfa\FastCGI;
+
+$loop = React\EventLoop\Factory::create();
+
+$server = new FastCGI\Server(function(FastCGI\Request $request) {
+    return new FastCGI\Response(200);
+});
+
+$socket = new React\Socket\Server('127.0.0.1:9000', $loop);
+$server->listen($socket);
+
+$loop->run();
+~~~
+
+### Advanced usage
+
+Server emits a `connection` whenever a client connects. You can use this for writing a limiter or further request handling.
+
+~~~php
+$connected = 0;
+
+$server->on('connection', function(FastCGI\Server\Connection $connection) use(&$connected) {
+
+    $connection->on('close', function() use(&$connected) {
+        $connected--;
+    });
+
+    if (++$connected > 10) {
+        $connection->close();
+    }
+
+});
+~~~
+
+FastCGI request is wrapped in `FastCGI\Server\Connection\Request`, which emits various events you might be interested in.
+
+~~~php
+$server->on('connection', function(FastCGI\Server\Connection $connection) use(&$connected) {
+
+    $connection->on('begin', function(FastCGI\Server\Connection\Request $request) {
+
+        $request->on('abort', function() {
+            // Called when request is aborted by server.
+        });
+
+        $request->on('error', function(Throwable $exception) {
+            // Called on protocol violations, ...
+        });
+
+        $request->on('end', function() {
+            // Called on response body end.
+        });
+
+    });
+
+});
+~~~
+
+Detecting client abortion is not necessary, but encouraged. For ease of use, `FastCGI\Request` emits this event as well:
+
+~~~php
+$server = new FastCGI\Server(function(FastCGI\Request $request) {
+
+    $deferred = new Deferred;
+
+    $request->on('abort', function() use($deferred) {
+        $deferred->reject();
+    });
+
+    // You should handle request here and call resolve() on deferred here.
+
+    return $deferred->promise();
+
+});
+~~~
+
+If you, for whatever reason, need to reject a server request, return `null` or `false` from your handler. Alternatively, return a promise and reject it. Keep in mind that this isn't meant to be used for dropping malformed client requests and your web server will probably return a gateway error.
+
+~~~php
+$server = new FastCGI\Server(function(FastCGI\Request $request) {
+
+    // Not talking with anyone.
+    return false;
+
+});
+~~~
+
+## Tests
+
+~~~
+vendor/bin/tester tests
+~~~
+
+## License
+
+Package is licensed as MIT, see [LICENSE.md](LICENSE.md).

+ 32 - 0
composer.json

@@ -0,0 +1,32 @@
+{
+	"name": "adawolfa/fastcgi",
+	"description": "Asynchronous FastCGI server and client for ReactPHP.",
+	"license": "MIT",
+	"authors": [
+		{
+			"name": "Adam Klvač",
+			"homepage": "https://klva.cz"
+		}
+	],
+	"require": {
+		"php": "^7.1",
+		"lisachenko/protocol-fcgi": "2.x@dev",
+		"react/socket": "^1.0",
+		"react/http": "^0.8",
+		"react/http-client": "^0.5"
+	},
+	"require-dev": {
+		"nette/tester": "^2.0"
+	},
+	"minimum-stability": "dev",
+	"autoload": {
+		"psr-4": {
+			"Adawolfa\\FastCGI\\": "src/"
+		}
+	},
+	"extra": {
+		"branch-alias": {
+			"dev-master": "1.0-dev"
+		}
+	}
+}

+ 149 - 0
src/Client.php

@@ -0,0 +1,149 @@
+<?php
+
+declare(strict_types=1);
+namespace Adawolfa\FastCGI;
+use Evenement\EventEmitter;
+use Lisachenko\Protocol\FCGI;
+use Psr\Http\Message\ServerRequestInterface;
+use React;
+use Exception;
+
+/**
+ * Asynchronous FastCGI client.
+ *
+ * @author Adam Klvač <adam@klva.cz>
+ */
+class Client extends EventEmitter
+{
+
+	/** @var React\Socket\ConnectionInterface */
+	private $connection;
+
+	/** @var int */
+	private $id = 1;
+
+	/** @var string */
+	private $buffer = '';
+
+	/** @var Client\Request[] */
+	private $requests = [];
+
+	/**
+	 * Client constructor.
+	 * @param React\Socket\ConnectionInterface $connection
+	 */
+	public function __construct(React\Socket\ConnectionInterface $connection)
+	{
+		$this->connection = $connection;
+
+		$this->connection->on('data', function(string $buffer): void {
+			$this->buffer .= $buffer;
+			while (FCGI\FrameParser::hasFrame($this->buffer)) {
+				$this->handle(FCGI\FrameParser::parseFrame($this->buffer));
+			}
+		});
+
+		$this->connection->on('close', function(): void {
+			$this->emit('close');
+		});
+	}
+
+	/**
+	 * Sends a server request to FastCGI server.
+	 * @param ServerRequestInterface $serverRequest
+	 * @return React\Promise\PromiseInterface
+	 */
+	public function send(ServerRequestInterface $serverRequest): React\Promise\PromiseInterface
+	{
+		$id = $this->id++;
+
+		$frame = [
+			new FCGI\Record\BeginRequest(FCGI::RESPONDER, FCGI::KEEP_CONN),
+			new FCGI\Record\Params($this->buildParameters($serverRequest)),
+			new FCGI\Record\Params,
+			new FCGI\Record\Stdin,
+		];
+
+		foreach ($frame as $packet) {
+			$packet->setRequestId($id);
+		}
+
+		$this->connection->write(implode('', $frame));
+
+		$request = new Client\Request($id, $this->connection);
+		$request->on('end', function() use($id): void {
+			unset($this->requests[$id]);
+		});
+
+		$this->requests[$id] = $request;
+		$this->emit('begin', [$request]);
+		return $request->promise();
+	}
+
+	/**
+	 * Builds FastCGI parameters.
+	 * @param ServerRequestInterface $serverRequest
+	 * @return array
+	 */
+	private function buildParameters(ServerRequestInterface $serverRequest): array
+	{
+		$parameters = $serverRequest->getServerParams();
+
+		if (!isset($parameters['QUERY_STRING'])) {
+			$parameters['QUERY_STRING'] = $serverRequest->getUri()->getQuery();
+		}
+
+		if (!isset($parameters['REQUEST_URI'])) {
+			$parameters['REQUEST_URI'] = (string) $serverRequest->getUri();
+		}
+
+		if (!isset($parameters['REQUEST_METHOD'])) {
+			$parameters['REQUEST_METHOD'] = $serverRequest->getMethod();
+		}
+
+		if (!isset($parameters['SERVER_PROTOCOL'])) {
+			$parameters['SERVER_PROTOCOL'] = $serverRequest->getProtocolVersion();
+		}
+
+		// TODO: Split duplicate headers into their own packets? FCGI is not capable of this.
+		foreach ($serverRequest->getHeaders() as $header => $_) {
+			$key = 'HTTP_' . strtoupper(str_replace('-', '_', $header));
+			$parameters[$key] = $serverRequest->getHeaderLine($header);
+		}
+
+		return $parameters;
+	}
+
+	/**
+	 * Handles response frame.
+	 * @param FCGI\Record $record
+	 * @return void
+	 */
+	private function handle(FCGI\Record $record): void
+	{
+		$id = $record->getRequestId();
+
+		switch (true) {
+
+			case $record instanceof FCGI\Record\Stdout:
+				$this->requests[$id]->writeBuffer($record->getContentData());
+				break;
+
+			case $record instanceof FCGI\Record\EndRequest:
+				$this->requests[$id]->writeEnd($record->getProtocolStatus());
+				break;
+
+			default:
+
+				$exception = new Exception('Unknown frame.');
+
+				if (isset($this->requests[$id])) {
+					$this->requests[$id]->emit('error', [$exception]);
+				} else {
+					$this->emit('error', [$exception]);
+				}
+
+		}
+	}
+
+}

+ 142 - 0
src/Client/Request.php

@@ -0,0 +1,142 @@
+<?php
+
+declare(strict_types=1);
+namespace Adawolfa\FastCGI\Client;
+use Adawolfa\FastCGI\Response;
+use Evenement\EventEmitter;
+use Lisachenko\Protocol\FCGI;
+use React;
+use function RingCentral\Psr7\parse_response;
+
+/**
+ * Client request.
+ *
+ * @author Adam Klvač <adam@klva.cz>
+ */
+class Request extends EventEmitter
+{
+
+	/** @var int */
+	private $id;
+
+	/** @var React\Socket\ConnectionInterface */
+	private $connection;
+
+	/** @var React\Promise\Deferred */
+	private $deferred;
+
+	/** @var string */
+	private $buffer = '';
+
+	/** @var React\Stream\ThroughStream */
+	private $stream;
+
+	/** @var Response|null */
+	private $response;
+
+	/** @var bool */
+	private $aborted = false;
+
+	/** @var bool */
+	private $completed = false;
+
+	/**
+	 * Request constructor.
+	 * @param int $id
+	 * @param React\Socket\ConnectionInterface $connection
+	 */
+	public function __construct(int $id, React\Socket\ConnectionInterface $connection)
+	{
+		$this->id = $id;
+		$this->connection = $connection;
+		$this->deferred = new React\Promise\Deferred;
+	}
+
+	/**
+	 * Writes response buffer.
+	 * @param string $buffer
+	 * @return void
+	 * @internal
+	 */
+	public function writeBuffer(string $buffer): void
+	{
+		if ($this->response === null) {
+
+			$this->buffer .= $buffer;
+
+			if (($pos = strpos($this->buffer, "\r\n\r\n")) !== false) {
+
+				$response = parse_response(substr($this->buffer, 0, $pos));
+
+				$this->stream = new React\Stream\ThroughStream;
+				$this->response = new Response(
+					$response->getStatusCode(),
+					$response->getHeaders(),
+					$this->stream,
+					$response->getProtocolVersion(),
+					$response->getReasonPhrase()
+				);
+
+				$buffer = substr($this->buffer, $pos + 4);
+				$this->buffer = '';
+
+				$this->emit('headers', [$this->response]);
+				$this->deferred->resolve($this->response);
+
+			}
+
+		}
+
+		if (strlen($buffer) > 0) {
+			$this->stream->write($buffer);
+		}
+	}
+
+	/**
+	 * Writes end of buffer.
+	 * @param int $status
+	 * @return void
+	 * @internal
+	 */
+	public function writeEnd(int $status): void
+	{
+		// Request might be aborted.
+		if ($this->stream !== null) {
+			$this->stream->end();
+		}
+
+		if (!$this->completed) {
+			$this->emit('end');
+			$this->completed = true;
+
+			if ($status === FCGI::ABORT_REQUEST) {
+				$this->aborted = true;
+				$this->emit('abort');
+			}
+		}
+	}
+
+	/**
+	 * Aborts the request.
+	 * @return void
+	 */
+	public function abort(): void
+	{
+		if (!$this->aborted) {
+			$abort = new FCGI\Record\EndRequest(FCGI::ABORT_REQUEST);
+			$abort->setRequestId($this->id);
+			$this->connection->write((string) $abort);
+			$this->writeEnd(FCGI::ABORT_REQUEST);
+		}
+	}
+
+	/**
+	 * Returns request promise.
+	 * @return React\Promise\PromiseInterface
+	 */
+	public function promise(): React\Promise\PromiseInterface
+	{
+		return $this->deferred->promise();
+	}
+
+}

+ 48 - 0
src/Connector.php

@@ -0,0 +1,48 @@
+<?php
+
+declare(strict_types=1);
+namespace Adawolfa\FastCGI;
+use Psr\Http\Message\ServerRequestInterface;
+use React;
+
+/**
+ * Asynchronous FastCGI client.
+ *
+ * @author Adam Klvač <adam@klva.cz>
+ */
+class Connector
+{
+
+	/** @var React\EventLoop\LoopInterface */
+	private $loop;
+
+	/**
+	 * Client constructor.
+	 * @param React\EventLoop\LoopInterface $loop
+	 */
+	public function __construct(React\EventLoop\LoopInterface $loop)
+	{
+		$this->loop = $loop;
+	}
+
+	/**
+	 * Connects to a FastCGI server.
+	 * @param string $uri
+	 * @return React\Promise\PromiseInterface
+	 */
+	public function connect(string $uri): React\Promise\PromiseInterface
+	{
+		$connector = new React\Socket\Connector($this->loop);
+		$deferred = new React\Promise\Deferred;
+
+		$connector->connect($uri)->then(
+			function(React\Socket\ConnectionInterface $connection) use($deferred): void {
+				$deferred->resolve(new Client($connection));
+			},
+			[$deferred, 'reject']
+		);
+
+		return $deferred->promise();
+	}
+
+}

+ 18 - 0
src/Request.php

@@ -0,0 +1,18 @@
+<?php
+
+declare(strict_types=1);
+namespace Adawolfa\FastCGI;
+use Evenement\EventEmitterTrait;
+use React\Http\Io\ServerRequest;
+
+/**
+ * Server request (exposes React HTTP implementation).
+ *
+ * @author Adam Klvač <adam@klva.cz>
+ */
+class Request extends ServerRequest
+{
+
+	use EventEmitterTrait;
+
+}

+ 14 - 0
src/Response.php

@@ -0,0 +1,14 @@
+<?php
+
+declare(strict_types=1);
+namespace Adawolfa\FastCGI;
+use React\Http;
+
+/**
+ * HTTP response.
+ *
+ * @author Adam Klvač <adam@klva.cz>
+ */
+class Response extends Http\Response
+{
+}

+ 41 - 0
src/Server.php

@@ -0,0 +1,41 @@
+<?php
+
+declare(strict_types=1);
+namespace Adawolfa\FastCGI;
+use Evenement\EventEmitter;
+use React;
+
+/**
+ * Asynchronous FastCGI server.
+ *
+ * @author Adam Klvač <adam@klva.cz>
+ */
+class Server extends EventEmitter
+{
+
+	/** @var callable */
+	private $handler;
+
+	/**
+	 * Server constructor.
+	 * @param callable $handler
+	 */
+	public function __construct(callable $handler)
+	{
+		$this->handler = $handler;
+	}
+
+	/**
+	 * Listens on socket.
+	 * @param React\Socket\Server $socket
+	 * @return void
+	 */
+	public function listen(React\Socket\Server $socket): void
+	{
+		$socket->on('connection', function(React\Socket\ConnectionInterface $socket): void {
+			$connection = new Server\Connection($socket, $this->handler);
+			$this->emit('connection', [$connection]);
+		});
+	}
+
+}

+ 185 - 0
src/Server/Connection.php

@@ -0,0 +1,185 @@
+<?php
+
+declare(strict_types=1);
+namespace Adawolfa\FastCGI\Server;
+use Evenement\EventEmitter;
+use Lisachenko\Protocol\FCGI;
+use React;
+use Exception;
+
+/**
+ * Server connection.
+ *
+ * @author Adam Klvač <adam@klva.cz>
+ */
+class Connection extends EventEmitter
+{
+
+	/** @var React\Socket\ConnectionInterface */
+	private $connection;
+
+	/** @var callable */
+	private $handler;
+
+	/** @var string */
+	private $buffer = '';
+
+	/** @var Connection\Request[] */
+	private $requests = [];
+
+	/**
+	 * Connection constructor.
+	 * @param React\Socket\ConnectionInterface $connection
+	 * @param callable $handler
+	 */
+	public function __construct(React\Socket\ConnectionInterface $connection, callable $handler)
+	{
+		$this->connection = $connection;
+		$this->handler = $handler;
+
+		$this->connection->on('data', function(string $data): void {
+
+			$this->buffer .= $data;
+
+			try {
+				// Frame parser ensures buffer won't exhaust memory.
+				while (FCGI\FrameParser::hasFrame($this->buffer)) {
+					$this->handle(FCGI\FrameParser::parseFrame($this->buffer));
+				}
+			} catch (Exception $exception) {
+				$this->emit('error', [$exception]);
+				$this->connection->close();
+			}
+
+		});
+
+		$this->connection->on('close', function(): void {
+			$this->emit('close');
+		});
+	}
+
+	/**
+	 * Closes connection.
+	 * @return void
+	 */
+	public function close(): void
+	{
+		$this->connection->close();
+	}
+
+	/**
+	 * Handles incoming frame.
+	 * @param FCGI\Record $frame
+	 * @throws Exception
+	 */
+	private function handle(FCGI\Record $frame): void
+	{
+		switch (true) {
+
+			case $frame instanceof FCGI\Record\BeginRequest:
+				$this->begin($frame);
+				break;
+
+			case $frame instanceof FCGI\Record\Params:
+				$this->writeParameters($frame);
+				break;
+
+			case $frame instanceof FCGI\Record\Stdin:
+				$this->writeBuffer($frame);
+				break;
+
+			case $frame instanceof FCGI\Record\AbortRequest:
+				$this->writeAbort($frame);
+				break;
+
+			default:
+				$this->emit('error', [new Exception('Unknown frame.')]);
+
+		}
+	}
+
+	/**
+	 * Begins a request.
+	 * @param FCGI\Record\BeginRequest $frame
+	 * @return void
+	 * @throws Exception
+	 */
+	private function begin(FCGI\Record\BeginRequest $frame): void
+	{
+		$id = $frame->getRequestId();
+
+		if (isset($this->requests[$id])) {
+			$this->emit('error', [new Exception('Reused request ID.')]);
+		}
+
+		$request = new Connection\Request($id, $this->connection, $this->handler);
+		$request->on('end', function() use($id): void {
+			$this->emit('end', [$this->requests[$id]]);
+			unset($this->requests[$id]);
+		});
+
+		$this->requests[$id] = $request;
+
+		$this->emit('begin', [$request]);
+	}
+
+	/**
+	 * Writes parameters to request.
+	 * @param FCGI\Record\Params $parameters
+	 * @return void
+	 */
+	private function writeParameters(FCGI\Record\Params $parameters): void
+	{
+		if (!$this->exists($parameters)) {
+			return;
+		}
+
+		$this->requests[$parameters->getRequestId()]->writeParameters($parameters->getValues());
+	}
+
+	/**
+	 * Writes buffer to request.
+	 * @param FCGI\Record\Stdin $buffer
+	 * @return void
+	 */
+	private function writeBuffer(FCGI\Record\Stdin $buffer): void
+	{
+		if (!$this->exists($buffer)) {
+			return;
+		}
+
+		$this->requests[$buffer->getRequestId()]->writeBuffer($buffer->getContentData());
+	}
+
+	/**
+	 * Writes abort to request.
+	 * @param FCGI\Record\AbortRequest $abort
+	 * @return void
+	 */
+	private function writeAbort(FCGI\Record\AbortRequest $abort): void
+	{
+		if (!$this->exists($abort)) {
+			return;
+		}
+
+		$this->requests[$abort->getRequestId()]->writeAbort();
+	}
+
+	/**
+	 * Validates request ID.
+	 * @param FCGI\Record $record
+	 * @return bool
+	 */
+	private function exists(FCGI\Record $record): bool
+	{
+		$id = $record->getRequestId();
+
+		if (isset($this->requests[$id])) {
+			return true;
+		}
+
+		$this->emit('error', [new Exception('Request ID does not exist.')]);
+		return false;
+	}
+
+}

+ 238 - 0
src/Server/Connection/Request.php

@@ -0,0 +1,238 @@
+<?php
+
+declare(strict_types=1);
+namespace Adawolfa\FastCGI\Server\Connection;
+use Adawolfa\FastCGI;
+use Evenement\EventEmitter;
+use Lisachenko\Protocol\FCGI;
+use Psr\Http\Message\ResponseInterface;
+use React;
+use RingCentral\Psr7\BufferStream;
+use Exception;
+use RingCentral\Psr7\Uri;
+
+/**
+ * Server request.
+ *
+ * @author Adam Klvač <adam@klva.cz>
+ */
+class Request extends EventEmitter
+{
+
+	/** @var int */
+	private $id;
+
+	/** @var React\Socket\ConnectionInterface */
+	private $connection;
+
+	/** @var callable */
+	private $handler;
+
+	/** @var array|null */
+	private $parameters = [];
+
+	/** @var FastCGI\Request|null */
+	private $serverRequest;
+
+	/**
+	 * Request constructor.
+	 * @param int $id
+	 * @param React\Socket\ConnectionInterface $connection
+	 * @param callable $handler
+	 */
+	public function __construct(int $id, React\Socket\ConnectionInterface $connection, callable $handler)
+	{
+		$this->id = $id;
+		$this->connection = $connection;
+		$this->handler = $handler;
+	}
+
+	/**
+	 * Writes parameters.
+	 * @param array $parameters
+	 */
+	public function writeParameters(array $parameters): void
+	{
+		try {
+
+			if ($this->serverRequest !== null) {
+				throw new Exception('Parameters have already been set.');
+			}
+
+			if (!empty($parameters)) {
+				$this->parameters = array_merge($this->parameters, $parameters);
+			} else {
+				$this->createServerRequest();
+			}
+
+		} catch (Exception $exception) {
+			$this->emit('error', [$exception]);
+		}
+	}
+
+	/**
+	 * Writes request buffer.
+	 * @param string $buffer
+	 * @return void
+	 */
+	public function writeBuffer(string $buffer): void
+	{
+		try {
+
+			if ($this->serverRequest === null) {
+				throw new Exception('Parameters not set.');
+			}
+
+			if ($buffer !== '') {
+
+				if ($this->serverRequest->getBody()->isWritable()) {
+					$this->serverRequest->getBody()->write($buffer);
+				}
+
+			} else {
+				$this->serverRequest->getBody()->close();
+			}
+
+		} catch (Exception $exception) {
+			$this->emit('error', [$exception]);
+		}
+	}
+
+	/**
+	 * Aborts request processing.
+	 * @return void
+	 */
+	public function writeAbort(): void
+	{
+		$this->emit('abort');
+
+		if ($this->serverRequest !== null) {
+			$this->serverRequest->emit('abort');
+		}
+	}
+
+	/**
+	 * Creates server request.
+	 * @return void
+	 * @throws Exception
+	 */
+	private function createServerRequest(): void
+	{
+		$headers = [];
+
+		foreach ($this->parameters as $parameter => $value) {
+			if (strpos($parameter, 'HTTP_') === 0) {
+				$header = str_replace(' ', '-', ucwords(str_replace('_', ' ', substr($parameter, 5))));
+				$headers[$header] = [$value];
+			}
+		}
+
+		$this->serverRequest = new FastCGI\Request(
+			$this->parameters['REQUEST_METHOD'] ?? null,
+			$this->parameters['REQUEST_URI'] ?? new Uri,
+			$headers,
+			new BufferStream,
+			$this->parameters['SERVER_PROTOCOL'] ?? '1.1',
+			$this->parameters
+		);
+
+		$this->emit('headers', [$this->serverRequest]);
+
+		$return = call_user_func($this->handler, $this->serverRequest);
+
+		if ($return instanceof ResponseInterface) {
+			$this->respond($return);
+		} elseif ($return instanceof React\Promise\PromiseInterface) {
+			$return->then(
+				function (ResponseInterface $response) {
+					$this->respond($response);
+				},
+				function ($error = null) {
+					$this->abort();
+				}
+			);
+		} elseif ($return === null || $return === false) {
+			$this->abort();
+		} else {
+			throw new Exception('Server request handler should return ' . ResponseInterface::class . ' or promise.');
+		}
+	}
+
+	/**
+	 * Sends a response.
+	 * @param ResponseInterface $response
+	 * @return void
+	 * @throws Exception
+	 */
+	private function respond(ResponseInterface $response): void
+	{
+		$this->emit('response', [$response]);
+
+		$headers = 'HTTP/' . $response->getProtocolVersion();
+		$headers .= ' ' . $response->getStatusCode();
+		$headers .= ' ' . $response->getReasonPhrase();
+
+		foreach ($response->getHeaders() as $header => $values) {
+			foreach ($values as $value) {
+				$headers .= "\r\n$header: $value";
+			}
+		}
+
+		$headers .= "\r\n\r\n";
+
+		$frame = new FCGI\Record\Stdout($headers);
+		$frame->setRequestId($this->id);
+
+		$this->connection->write((string) $frame);
+
+		$stream = $response->getBody();
+
+		if (!$stream instanceof React\Stream\ReadableStreamInterface) {
+
+			if ($stream->getSize() === null) {
+				throw new Exception('Response body should be instance of ' . React\Stream\ReadableStreamInterface::class . ' or should have content present.');
+			}
+
+			$stream = new React\Stream\ThroughStream;
+
+		}
+
+		$stream->on('data', function(string $data): void {
+			$frame = new FCGI\Record\Stdout($data);
+			$frame->setRequestId($this->id);
+			$this->connection->write((string) $frame);
+		});
+
+		$stream->on('end', function(): void {
+
+			$frame = new FCGI\Record\Stdout;
+			$frame->setRequestId($this->id);
+
+			$this->connection->write((string) $frame);
+
+			$frame = new FCGI\Record\EndRequest;
+			$frame->setRequestId($this->id);
+
+			$this->connection->write((string) $frame);
+
+			$this->emit('end');
+
+		});
+
+		if (!$response->getBody() instanceof React\Stream\ReadableStreamInterface) {
+			$stream->end($response->getBody()->getContents());
+		}
+	}
+
+	/**
+	 * Aborts rejected response.
+	 */
+	private function abort(): void
+	{
+		$end = new FCGI\Record\EndRequest(FCGI::UNKNOWN_ROLE);
+		$end->setRequestId($this->id);
+		$this->connection->write((string) $end);
+		$this->emit('end');
+	}
+
+}

+ 155 - 0
tests/FastCGI/Client.phpt

@@ -0,0 +1,155 @@
+<?php
+
+/**
+ * TEST: Client test.
+ */
+
+use Adawolfa\FastCGI;
+use Tester\Assert;
+use Lisachenko\Protocol\FCGI;
+use Psr\Http\Message\ResponseInterface;
+
+require(__DIR__ . '/../bootstrap.php');
+
+$connection = new class extends Evenement\EventEmitter implements React\Socket\ConnectionInterface
+{
+
+	public function getRemoteAddress()
+	{
+		return '127.0.0.1';
+	}
+
+	public function getLocalAddress()
+	{
+		return '127.0.0.1';
+	}
+
+	public function isReadable()
+	{
+		return true;
+	}
+
+	public function pause()
+	{
+	}
+
+	public function resume()
+	{
+	}
+
+	public function pipe(React\Stream\WritableStreamInterface $dest, array $options = [])
+	{
+	}
+
+	public function close()
+	{
+	}
+
+	public function isWritable()
+	{
+		return true;
+	}
+
+	public function write($data)
+	{
+		$this->emit('write', [$data]);
+		return true;
+	}
+
+	public function end($data = null)
+	{
+	}
+
+};
+
+$writeBuffer = '';
+
+$connection->on('write', function($data) use(&$writeBuffer): void {
+	$writeBuffer .= $data;
+});
+
+$client = new FastCGI\Client($connection);
+
+$response = null;
+
+$client->on('begin', function(FastCGI\Client\Request $v) use(&$request) {
+	$request = $v;
+});
+
+$client->send(new React\Http\Io\ServerRequest('GET', '/', ['Content-Type' => 'text/plain']))
+	->then(function(React\Http\Response $v) use(&$response, &$body): void {
+		$response = $v;
+		$stream = $v->getBody();
+		assert($stream instanceof React\Stream\ReadableStreamInterface);
+		$stream->on('data', function(string $data) use(&$body): void {
+			$body = $data;
+		});
+	});
+
+$written = [];
+
+while (FCGI\FrameParser::hasFrame($writeBuffer)) {
+	$written[] = FCGI\FrameParser::parseFrame($writeBuffer);
+}
+
+Assert::count(4, $written);
+
+Assert::true($written[0] instanceof FCGI\Record\BeginRequest);
+Assert::same(1, $written[0]->getRequestId());
+
+Assert::true($written[1] instanceof FCGI\Record\Params);
+Assert::same([
+	'QUERY_STRING' => '',
+	'REQUEST_URI' => '/',
+	'REQUEST_METHOD' => 'GET',
+	'SERVER_PROTOCOL' => '1.1',
+	'HTTP_CONTENT_TYPE' => 'text/plain',
+], $written[1]->getValues());
+
+Assert::null($response);
+
+$stdout = new FCGI\Record\Stdout("HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World!");
+$stdout->setRequestId(1);
+
+$connection->emit('data', [(string) $stdout]);
+
+$stdout = new FCGI\Record\Stdout;
+$stdout->setRequestId(1);
+
+$connection->emit('data', [(string) $stdout]);
+
+$begin = new FCGI\Record\Stdin;
+$begin->setRequestId(1);
+
+$request->on('error', function(Throwable $e) use(&$exception) {
+	$exception = $e;
+});
+
+$connection->emit('data', [(string) $begin]);
+Assert::same('Unknown frame.', $exception->getMessage());
+
+$end = new FCGI\Record\EndRequest;
+$end->setRequestId(1);
+
+$connection->emit('data', [(string) $end]);
+
+/** @var $response ResponseInterface */
+Assert::true($response instanceof ResponseInterface);
+Assert::same('12', $response->getHeaderLine('Content-Length'));
+Assert::same('Hello World!', $body);
+
+$client->on('error', function(Throwable $e) use(&$exception) {
+	$exception = $e;
+});
+
+$exception = null;
+$connection->emit('data', [(string) new FCGI\Record\BeginRequest]);
+Assert::same('Unknown frame.', $exception->getMessage());
+
+$client->on('close', function() use(&$closed): void {
+	$closed = true;
+});
+
+$connection->emit('close');
+
+Assert::true($closed);

+ 100 - 0
tests/FastCGI/Client/Request.phpt

@@ -0,0 +1,100 @@
+<?php
+
+/**
+ * TEST: Client request test.
+ */
+
+use Adawolfa\FastCGI;
+use Tester\Assert;
+
+require(__DIR__ . '/../../bootstrap.php');
+
+$connection = new class extends Evenement\EventEmitter implements React\Socket\ConnectionInterface
+{
+
+	public function getRemoteAddress()
+	{
+		return '127.0.0.1';
+	}
+
+	public function getLocalAddress()
+	{
+		return '127.0.0.1';
+	}
+
+	public function isReadable()
+	{
+		return true;
+	}
+
+	public function pause()
+	{
+	}
+
+	public function resume()
+	{
+	}
+
+	public function pipe(React\Stream\WritableStreamInterface $dest, array $options = [])
+	{
+	}
+
+	public function close()
+	{
+	}
+
+	public function isWritable()
+	{
+		return true;
+	}
+
+	public function write($data)
+	{
+		$this->emit('write', [$data]);
+		return true;
+	}
+
+	public function end($data = null)
+	{
+	}
+
+};
+
+$request = new FastCGI\Client\Request(10, $connection);
+
+$request->promise()->then(function(React\Http\Response $v) use(&$response): void {
+	$response = $v;
+});
+
+$finished = false;
+$request->on('end', function() use(&$finished): void {
+	$finished = true;
+});
+
+$request->writeBuffer("HTTP/1.1 200 OK\r\n\r\n");
+
+/** @var $response React\Http\Response */
+$body = $response->getBody();
+
+assert($body instanceof React\Http\Io\HttpBodyStream);
+$body->on('data', function($data) use(&$contents): void {
+	$contents = $data;
+});
+
+$request->writeBuffer('Hello World!');
+
+Assert::same(200, $response->getStatusCode());
+Assert::same('Hello World!', $contents);
+
+Assert::false($finished);
+$request->writeEnd(Lisachenko\Protocol\FCGI::REQUEST_COMPLETE);
+Assert::true($finished);
+
+$request = new FastCGI\Client\Request(10, $connection);
+
+$connection->on('write', function(string $data) use(&$end): void {
+	$end = Lisachenko\Protocol\FCGI\FrameParser::parseFrame($data);
+});
+
+$request->abort();
+Assert::true($end instanceof Lisachenko\Protocol\FCGI\Record\EndRequest);

+ 63 - 0
tests/FastCGI/Connector.phpt

@@ -0,0 +1,63 @@
+<?php
+
+/**
+ * TEST: Connector test.
+ */
+
+use Adawolfa\FastCGI;
+use Tester\Assert;
+
+require(__DIR__ . '/../bootstrap.php');
+
+$loop = React\EventLoop\Factory::create();
+
+$server = new FastCGI\Server(function(Psr\Http\Message\ServerRequestInterface $serverRequest) use($loop): React\Promise\PromiseInterface {
+
+	$deferred = new React\Promise\Deferred;
+
+	$loop->addTimer(.1, function() use($deferred, $loop): void {
+		$stream = new React\Stream\ThroughStream;
+		$response = new React\Http\Response(200, ['Content-Type' => 'text/plain'], $stream);
+		$deferred->resolve($response);
+		$stream->end(file_get_contents(__FILE__));
+	});
+
+	return $deferred->promise();
+
+});
+
+$socket = new React\Socket\Server('127.0.0.1:9009', $loop);
+$server->listen($socket);
+
+$connector = new FastCGI\Connector($loop);
+$connector->connect('127.0.0.1:9009')->then(function(FastCGI\Client $client) use($loop, &$response, &$body): void {
+	$loop->addTimer(.5, function() use($client, $loop, &$response, &$body): void {
+
+		$serverRequest = new React\Http\Io\ServerRequest('GET', '/');
+
+		$client->send($serverRequest)->then(function(React\Http\Response $v) use($loop, &$response, &$body): void {
+
+			$response = $v;
+			$body = '';
+
+			$stream = $response->getBody();
+			assert($stream instanceof React\Http\Io\HttpBodyStream);
+
+			$stream->on('data', function(string $data) use(&$body) {
+				$body .= $data;
+			});
+
+			$stream->on('end', function() use($loop) {
+				$loop->stop();
+			});
+
+		});
+
+	});
+});
+
+$loop->run();
+
+/** @var $response Psr\Http\Message\ResponseInterface */
+Assert::same('text/plain', $response->getHeaderLine('Content-Type'));
+Assert::same(file_get_contents(__FILE__), $body);

+ 151 - 0
tests/FastCGI/Server/Connection.phpt

@@ -0,0 +1,151 @@
+<?php
+
+/**
+ * TEST: Server connection test.
+ */
+
+use Adawolfa\FastCGI;
+use Tester\Assert;
+use Lisachenko\Protocol\FCGI;
+use Psr\Http\Message\ResponseInterface;
+
+require(__DIR__ . '/../../bootstrap.php');
+
+$connection = new class extends Evenement\EventEmitter implements React\Socket\ConnectionInterface
+{
+
+	public function getRemoteAddress()
+	{
+		return '127.0.0.1';
+	}
+
+	public function getLocalAddress()
+	{
+		return '127.0.0.1';
+	}
+
+	public function isReadable()
+	{
+		return true;
+	}
+
+	public function pause()
+	{
+	}
+
+	public function resume()
+	{
+	}
+
+	public function pipe(React\Stream\WritableStreamInterface $dest, array $options = [])
+	{
+	}
+
+	public function close()
+	{
+		$this->emit('close');
+	}
+
+	public function isWritable()
+	{
+		return true;
+	}
+
+	public function write($data)
+	{
+		$this->emit('write', [$data]);
+		return true;
+	}
+
+	public function end($data = null)
+	{
+	}
+
+};
+
+$request = new FastCGI\Server\Connection($connection, function(React\Http\Io\ServerRequest $v) use(&$serverRequest): React\Http\Response {
+	$serverRequest = $v;
+	return new React\Http\Response(200, ['Content-Type' => 'text/plain']);
+});
+
+$begin = new FCGI\Record\BeginRequest(FCGI::RESPONDER, FCGI::KEEP_CONN);
+$begin->setRequestId(5);
+
+$connection->emit('data', [(string) $begin]);
+
+$params = new FCGI\Record\Params([
+	'REQUEST_METHOD' => 'GET',
+	'REQUEST_URI' => '/',
+	'SERVER_PROTOCOL' => '1.1',
+	'HTTP_CONTENT_TYPE' => 'text/plain',
+]);
+$params->setRequestId(5);
+
+$connection->emit('data', [(string) $params]);
+
+$params = new FCGI\Record\Params;
+$params->setRequestId(5);
+
+$connection->emit('data', [(string) $params]);
+
+$stdin = new FCGI\Record\Stdin;
+$stdin->setRequestId(5);
+
+$connection->emit('data', [(string) $stdin]);
+
+/** @var $serverRequest RingCentral\Psr7\ServerRequest */
+Assert::same('text/plain', $serverRequest->getHeaderLine('Content-Type'));
+
+$errors = [];
+
+$request->on('error', function(Throwable $e) use(&$errors): void {
+	$errors[] = $e;
+});
+
+$connection->emit('data', [(string) new FCGI\Record\Stdout]);
+$connection->emit('data', [(string) new FCGI\Record\Stdin]);
+$connection->emit('data', [(string) new FCGI\Record\Params]);
+$connection->emit('data', [(string) new FCGI\Record\AbortRequest]);
+$connection->emit('data', [(string) new FCGI\Record\BeginRequest]);
+$connection->emit('data', [(string) new FCGI\Record\BeginRequest]);
+
+Assert::count(5, $errors);
+Assert::same('Unknown frame.', $errors[0]->getMessage());
+Assert::same('Request ID does not exist.', $errors[1]->getMessage());
+Assert::same('Request ID does not exist.', $errors[2]->getMessage());
+Assert::same('Request ID does not exist.', $errors[3]->getMessage());
+Assert::same('Reused request ID.', $errors[4]->getMessage());
+
+$request->on('close', function() use(&$closed): void {
+	$closed = true;
+});
+
+$request->close();
+
+Assert::true($closed);
+
+$request = new FastCGI\Server\Connection($connection, function(){});
+
+$request->on('error', function(Throwable $e) use(&$exception): void {
+	$exception = $e;
+});
+
+for ($i = 0; $i < 5; $i++) {
+	$connection->emit('data', [str_repeat('long cat is long', 1000)]);
+}
+
+Assert::same('Invalid FCGI record type 111 received', $exception->getMessage());
+
+$request = new FastCGI\Server\Connection($connection, function(FastCGI\Request $request) use(&$aborted): React\Promise\PromiseInterface {
+	$request->on('abort', function() use(&$aborted): void {
+		$aborted = true;
+	});
+	return (new React\Promise\Deferred)->promise();
+});
+
+$connection->emit('data', [(string) new FCGI\Record\BeginRequest]);
+$connection->emit('data', [(string) new FCGI\Record\Params]);
+$connection->emit('data', [(string) new FCGI\Record\Stdin]);
+$connection->emit('data', [(string) new FCGI\Record\AbortRequest]);
+
+Assert::true($aborted);

+ 145 - 0
tests/FastCGI/Server/Connection/Request.phpt

@@ -0,0 +1,145 @@
+<?php
+
+/**
+ * TEST: Server request test.
+ */
+
+use Adawolfa\FastCGI;
+use Tester\Assert;
+use Lisachenko\Protocol\FCGI;
+use Psr\Http\Message\ResponseInterface;
+
+require(__DIR__ . '/../../../bootstrap.php');
+
+$connection = new class extends Evenement\EventEmitter implements React\Socket\ConnectionInterface
+{
+
+	public function getRemoteAddress()
+	{
+		return '127.0.0.1';
+	}
+
+	public function getLocalAddress()
+	{
+		return '127.0.0.1';
+	}
+
+	public function isReadable()
+	{
+		return true;
+	}
+
+	public function pause()
+	{
+	}
+
+	public function resume()
+	{
+	}
+
+	public function pipe(React\Stream\WritableStreamInterface $dest, array $options = [])
+	{
+	}
+
+	public function close()
+	{
+	}
+
+	public function isWritable()
+	{
+		return true;
+	}
+
+	public function write($data)
+	{
+		$this->emit('write', [$data]);
+		return true;
+	}
+
+	public function end($data = null)
+	{
+	}
+
+};
+
+$request = new FastCGI\Server\Connection\Request(5, $connection, function(React\Http\Io\ServerRequest $v) use(&$serverRequest): React\Http\Response {
+	$serverRequest = $v;
+	return new React\Http\Response(200, ['Content-Type' => 'text/plain']);
+});
+
+$request->on('error', function(Throwable $e) use(&$exception): void {
+	$exception = $e;
+});
+
+$request->writeBuffer('');
+Assert::same('Parameters not set.', $exception->getMessage());
+
+$request->writeParameters([
+	'REQUEST_METHOD' => 'GET',
+	'REQUEST_URI' => '/',
+	'SERVER_PROTOCOL' => '1.1',
+	'HTTP_CONTENT_TYPE' => 'text/plain',
+]);
+
+$request->writeParameters([]);
+
+$request->writeParameters([]);
+Assert::same('Parameters have already been set.', $exception->getMessage());
+
+$request->writeBuffer('Hello World!');
+$request->writeBuffer('');
+
+/** @var $serverRequest RingCentral\Psr7\ServerRequest */
+Assert::same('text/plain', $serverRequest->getHeaderLine('Content-Type'));
+
+$request = new FastCGI\Server\Connection\Request(5, $connection, function(React\Http\Io\ServerRequest $v) use(&$serverRequest): string {
+	return 'Hello!';
+});
+
+$request->on('error', function(Throwable $e) use(&$exception): void {
+	$exception = $e;
+});
+
+$request->writeParameters([]);
+
+Assert::same('Server request handler should return ' . ResponseInterface::class . ' or promise.', $exception->getMessage());
+
+$request = new FastCGI\Server\Connection\Request(5, $connection, function(React\Http\Io\ServerRequest $v) use(&$serverRequest): ?ResponseInterface {
+	return null;
+});
+
+$request->on('end', function() use(&$aborted) {
+	$aborted = true;
+});
+
+$request->writeParameters([]);
+
+Assert::true($aborted);
+
+$request = new FastCGI\Server\Connection\Request(5, $connection, function(React\Http\Io\ServerRequest $v) use(&$serverRequest): ?React\Promise\PromiseInterface {
+	return new React\Promise\RejectedPromise;
+});
+
+$aborted = false;
+$request->on('end', function() use(&$aborted) {
+	$aborted = true;
+});
+
+$request->writeParameters([]);
+
+Assert::true($aborted);
+
+$request = new FastCGI\Server\Connection\Request(5, $connection, function(React\Http\Io\ServerRequest $v) use(&$serverRequest): ?React\Http\Response {
+	$stream = RingCentral\Psr7\stream_for(function() {
+		yield '';
+	});
+	return new React\Http\Response('GET', [], $stream);
+});
+
+$request->on('error', function(Throwable $e) use(&$exception): void {
+	$exception = $e;
+});
+
+$request->writeParameters([]);
+
+Assert::same('Response body should be instance of ' . React\Stream\ReadableStreamInterface::class . ' or should have content present.', $exception->getMessage());

+ 4 - 0
tests/bootstrap.php

@@ -0,0 +1,4 @@
+<?php
+
+require(__DIR__ . '/../vendor/autoload.php');
+Tester\Environment::setup();