php pheanstalk,vendors/Pheanstalk/Pheanstalk.php · sxyunfeng/fcms - Gitee.com

论坛 期权论坛 编程之家     
选择匿名的用户   2021-6-1 06:04   11   0

namespace vendors\Pheanstalk;

/**

* Pheanstalk is a PHP client for the beanstalkd workqueue.

*

* The Pheanstalk class is a simple facade for the various underlying components.

*

* @see http://github.com/kr/beanstalkd

* @see http://xph.us/software/beanstalkd/

*

* @author Paul Annesley

* @package Pheanstalk

* @license http://www.opensource.org/licenses/mit-license.php

*/

class Pheanstalk implements PheanstalkInterface

{

const VERSION = '3.1.1';

private $_connection;

private $_using = PheanstalkInterface::DEFAULT_TUBE;

private $_watching = array(PheanstalkInterface::DEFAULT_TUBE => true);

/**

* @param string $host

* @param int $port

* @param int $connectTimeout

* @param bool $connectPersistent

*/

public function __construct($host, $port = PheanstalkInterface::DEFAULT_PORT, $connectTimeout = null, $connectPersistent = false)

{

$this->setConnection(new Connection($host, $port, $connectTimeout, $connectPersistent));

}

/**

* {@inheritdoc}

*/

public function setConnection(Connection $connection)

{

$this->_connection = $connection;

return $this;

}

/**

* {@inheritdoc}

*/

public function getConnection()

{

return $this->_connection;

}

// ----------------------------------------

/**

* {@inheritdoc}

*/

public function bury($job, $priority = PheanstalkInterface::DEFAULT_PRIORITY)

{

$this->_dispatch(new Command\BuryCommand($job, $priority));

}

/**

* {@inheritdoc}

*/

public function delete($job)

{

$this->_dispatch(new Command\DeleteCommand($job));

return $this;

}

/**

* {@inheritdoc}

*/

public function ignore($tube)

{

if (isset($this->_watching[$tube])) {

$this->_dispatch(new Command\IgnoreCommand($tube));

unset($this->_watching[$tube]);

}

return $this;

}

/**

* {@inheritdoc}

*/

public function kick($max)

{

$response = $this->_dispatch(new Command\KickCommand($max));

return $response['kicked'];

}

/**

* {@inheritdoc}

*/

public function kickJob($job)

{

$this->_dispatch(new Command\KickJobCommand($job));

return $this;

}

/**

* {@inheritdoc}

*/

public function listTubes()

{

return (array) $this->_dispatch(

new Command\ListTubesCommand()

);

}

/**

* {@inheritdoc}

*/

public function listTubesWatched($askServer = false)

{

if ($askServer) {

$response = (array) $this->_dispatch(

new Command\ListTubesWatchedCommand()

);

$this->_watching = array_fill_keys($response, true);

}

return array_keys($this->_watching);

}

/**

* {@inheritdoc}

*/

public function listTubeUsed($askServer = false)

{

if ($askServer) {

$response = $this->_dispatch(

new Command\ListTubeUsedCommand()

);

$this->_using = $response['tube'];

}

return $this->_using;

}

/**

* {@inheritdoc}

*/

public function pauseTube($tube, $delay)

{

$this->_dispatch(new Command\PauseTubeCommand($tube, $delay));

return $this;

}

/**

* {@inheritdoc}

*/

public function resumeTube($tube)

{

// Pause a tube with zero delay will resume the tube

$this->pauseTube($tube, 0);

return $this;

}

/**

* {@inheritdoc}

*/

public function peek($jobId)

{

$response = $this->_dispatch(

new Command\PeekCommand($jobId)

);

return new Job($response['id'], $response['jobdata']);

}

/**

* {@inheritdoc}

*/

public function peekReady($tube = null)

{

if ($tube !== null) {

$this->useTube($tube);

}

$response = $this->_dispatch(

new Command\PeekCommand(Command\PeekCommand::TYPE_READY)

);

return new Job($response['id'], $response['jobdata']);

}

/**

* {@inheritdoc}

*/

public function peekDelayed($tube = null)

{

if ($tube !== null) {

$this->useTube($tube);

}

$response = $this->_dispatch(

new Command\PeekCommand(Command\PeekCommand::TYPE_DELAYED)

);

return new Job($response['id'], $response['jobdata']);

}

/**

* {@inheritdoc}

*/

public function peekBuried($tube = null)

{

if ($tube !== null) {

$this->useTube($tube);

}

$response = $this->_dispatch(

new Command\PeekCommand(Command\PeekCommand::TYPE_BURIED)

);

return new Job($response['id'], $response['jobdata']);

}

/**

* {@inheritdoc}

*/

public function put(

$data,

$priority = PheanstalkInterface::DEFAULT_PRIORITY,

$delay = PheanstalkInterface::DEFAULT_DELAY,

$ttr = PheanstalkInterface::DEFAULT_TTR

) {

$response = $this->_dispatch(

new Command\PutCommand($data, $priority, $delay, $ttr)

);

return $response['id'];

}

/**

* {@inheritdoc}

*/

public function putInTube(

$tube,

$data,

$priority = PheanstalkInterface::DEFAULT_PRIORITY,

$delay = PheanstalkInterface::DEFAULT_DELAY,

$ttr = PheanstalkInterface::DEFAULT_TTR

) {

$this->useTube($tube);

return $this->put($data, $priority, $delay, $ttr);

}

/**

* {@inheritdoc}

*/

public function release(

$job,

$priority = PheanstalkInterface::DEFAULT_PRIORITY,

$delay = PheanstalkInterface::DEFAULT_DELAY

) {

$this->_dispatch(

new Command\ReleaseCommand($job, $priority, $delay)

);

return $this;

}

/**

* {@inheritdoc}

*/

public function reserve($timeout = null)

{

$response = $this->_dispatch(

new Command\ReserveCommand($timeout)

);

$falseResponses = array(

Response::RESPONSE_DEADLINE_SOON,

Response::RESPONSE_TIMED_OUT,

);

if (in_array($response->getResponseName(), $falseResponses)) {

return false;

} else {

return new Job($response['id'], $response['jobdata']);

}

}

/**

* {@inheritdoc}

*/

public function reserveFromTube($tube, $timeout = null)

{

$this->watchOnly($tube);

return $this->reserve($timeout);

}

/**

* {@inheritdoc}

*/

public function statsJob($job)

{

return $this->_dispatch(new Command\StatsJobCommand($job));

}

/**

* {@inheritdoc}

*/

public function statsTube($tube)

{

return $this->_dispatch(new Command\StatsTubeCommand($tube));

}

/**

* {@inheritdoc}

*/

public function stats()

{

return $this->_dispatch(new Command\StatsCommand());

}

/**

* {@inheritdoc}

*/

public function touch($job)

{

$this->_dispatch(new Command\TouchCommand($job));

return $this;

}

/**

* {@inheritdoc}

*/

public function useTube($tube)

{

if ($this->_using != $tube) {

$this->_dispatch(new Command\UseCommand($tube));

$this->_using = $tube;

}

return $this;

}

/**

* {@inheritdoc}

*/

public function watch($tube)

{

if (!isset($this->_watching[$tube])) {

$this->_dispatch(new Command\WatchCommand($tube));

$this->_watching[$tube] = true;

}

return $this;

}

/**

* {@inheritdoc}

*/

public function watchOnly($tube)

{

$this->watch($tube);

$ignoreTubes = array_diff_key($this->_watching, array($tube => true));

foreach ($ignoreTubes as $ignoreTube => $true) {

$this->ignore($ignoreTube);

}

return $this;

}

// ----------------------------------------

/**

* Dispatches the specified command to the connection object.

*

* If a SocketException occurs, the connection is reset, and the command is

* re-attempted once.

*

* @param Command $command

*

* @return Response

*/

private function _dispatch($command)

{

try {

$response = $this->_connection->dispatchCommand($command);

} catch (Exception\SocketException $e) {

$this->_reconnect();

$response = $this->_connection->dispatchCommand($command);

}

return $response;

}

/**

* Creates a new connection object, based on the existing connection object,

* and re-establishes the used tube and watchlist.

*/

private function _reconnect()

{

$new_connection = new Connection(

$this->_connection->getHost(),

$this->_connection->getPort(),

$this->_connection->getConnectTimeout()

);

$this->setConnection($new_connection);

if ($this->_using != PheanstalkInterface::DEFAULT_TUBE) {

$tube = $this->_using;

$this->_using = null;

$this->useTube($tube);

}

foreach ($this->_watching as $tube => $true) {

if ($tube != PheanstalkInterface::DEFAULT_TUBE) {

unset($this->_watching[$tube]);

$this->watch($tube);

}

}

if (!isset($this->_watching[PheanstalkInterface::DEFAULT_TUBE])) {

$this->ignore(PheanstalkInterface::DEFAULT_TUBE);

}

}

}

一键复制

编辑

Web IDE

原始数据

按行查看

历史

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP