Source for file sThread.php
Documentation is available at sThread.php
* Project: sThread :: Single Thread Monitoring Agent API<br>
* sThread package는 async 방식의 L7 layer 모니터링 에이전트
* sThread package는 libevent를 이용하여 Multi session과
* 동시에 여러가지 protocol을 처리할 수 있도록 설계가 있다.
* 모듈 구조를 지원하므로, 필요한 모듈을 직접 만들어 추가할 수
* @author JoungKyun.Kim <http://oops.org>
* @copyright (c) 2018, OOPS.ORG
* @link http://pear.oops.org/package/sThread
* @since File available since relase 1.0.0
// {{{ 기본 include package
require_once 'ePrint.php';
* sThread에서 사용하는 공통 변수 관리 Class
require_once 'sThread/Vari.php';
* sThread에서 사용하는 모듈 관리 Class
require_once 'sThread/Module.php';
* sThread에서 사용하는 주소 관리 Class
require_once 'sThread/Addr.php';
require_once 'sThread/Log.php';
* sThread package는 async 방식의 L7 layer 모니터링 에이전트
* sThread package는 libevent를 이용하여 Multi session과
* 동시에 여러가지 protocol을 처리할 수 있도록 설계가 있다.
* 모듈 구조를 지원하므로, 필요한 모듈을 직접 만들어 추가할 수
* @author JoungKyun.Kim <http://oops.org>
* @copyright (c) 2018, OOPS.ORG
* @link http://pear.oops.org/package/sThread
* sThread Major version 번호
* sThread Minor version 번호
* sThread_Module 패키지에서 등록한 모듈 object
* libevent 동작을 sync로 할지 async로 할지 여부.
* data function의 format으로 지정해야 한다.
* <li>0 -> 실패한 로그만 저장</li>
* <li>-1 -> 로그 저장 안함</li>
static private $timer = 200;
// {{{ (sThread) sThread::__construct (void)
* OOP 스타일의 sThread Class 초기화
$this->mod = &self::$mod;
$this->tmout = &self::$tmout;
$this->async = &self::$async;
$this->result = &self::$result;
$this->logfile = &sThread_Log::$fname;
$this->logtype = &sThread_Log::$type;
// {{{ (void) sThread::init ($mod_no_init = false)
* @param bool (optional) true로 설정을 하면 sThread_Module
* class를 호출 하지 않는다. 기본값 false.
function init ($mod_no_init = false) {
if ( $mod_no_init === false ) {
self::$mod = &sThread_Module::$obj;
Vari::$result = &self::$result;
self::$logfile = &sThread_Log::$fname;
self::$logformat = &sThread_Log::$format;
self::$logtype = &sThread_Log::$type;
// {{{ (void) sThread::execute ($host, $tmout = 1, $protocol = 'tcp')
* 실행 후 결과값은 Vari::$res 와 Vari::$sess 의 값을 확인하면
* @param mixed 연결 정보.<br>
* host의 형식은 기본적으로 문자열 또는 배열을 사용할 수 있다.
* 주소 형식은 기본적으로 도메인이름 또는 IP주소와 PORT 번호로
* $host = 'test.domaint.com:80';
* sThread package의 장점은 여러개의 실행을 동시에 할 수 있다는
* 것이므로, 여러개의 주소를 한번에 넘길 경우에는 배열을 이용한다.
* @param int 소켓 연결/읽기/쓰기 타임아웃
* @param string tcp 또는 udp. 기본값 tcp.
function execute ($hosts, $tmout = 1, $protocol = null) {
foreach ( $hosts as $line ) {
list ($host, $port) = explode (':', $line);
$sess->status[$key] = Vari::EVENT_ERROR_CLOSE;
$res->status[$key] = array ("{ $host}:{$port}", false, "Address parsing error");
$sess->addr[$key] = explode (':', $newline);
self::explodeAddr ($host, $port, $type, $newline);
if ( $protocol == null ) {
$sess->proto[$key] = $protocol;
$addr = "{ $protocol}://{$host}:{$port}";
$async = (self::$async === true) ?
STREAM_CLIENT_CONNECT| STREAM_CLIENT_ASYNC_CONNECT : STREAM_CLIENT_CONNECT;
$addr, $errno, $errstr, self::$tmout, $async
if ( self::$async !== true && ! is_resource ($sess->sock[$key]) ) {
if ( ePrint::$debugLevel >= Vari::DEBUG1 ) {
"%s:%d (%02d) Failed socket create: %s on %s:%d[%s::%s]",
$host, $port, $key, $errstr,
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
self::$mod->$type->set_last_status ($sess, $key);
$res->status[$key] = array ("{ $host}:{$port}", false, "Failed socket create: {$errstr}");
Vari::DEBUG1, "[%-12s #%d] %s:%d (%d) Socket create success on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
$sess->send[$key] = Vari::EVENT_READY_SEND;
stream_set_blocking ($sess->sock[$key], 0);
Vari::DEBUG1, "[%-12s #%02d] Make event construct on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
foreach ( $sess->sock as $key => $val ) {
'self::exceptionCallback', array ($key)
Vari::DEBUG1, "[%-12s #%02d] %s:%d (%d) make event buffer on %s:%d[%s::%s]\n",
get_resource_type ($sess->event[$key]), $sess->event[$key], $host_all[$key], $port_all[$key], $key,
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
Vari::DEBUG1, "[%-12s #%02d] set event buffer .. ",
Vari::DEBUG1, "%s #%02d on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
if ( self::currentStatus ($key) === Vari::EVENT_READY_RECV )
Vari::DEBUG1, "[%-12s #%02d] regist event loop on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
Vari::DEBUG1, "[%-15s] destruct event construct on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
* libevent에서 exception callback
* @param resource libevent resource
if ( ($key = array_search ($buf, Vari::$sess->event, true)) !== false ) {
if ( $err == (EVBUFFER_READ| EVBUFFER_EOF) ) {
Vari::DEBUG1, "[%-12s #%02d] free %s on execption callback on %s:%d[%s::%s] {$err}\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
case (EVBUFFER_READ| EVBUFFER_TIMEOUT) :
$errstr = 'Event buffer read timeout';
case (EVBUFFER_WRITE| EVBUFFER_TIMEOUT) :
$errstr = 'Event buffer write timeout';
case (EVBUFFER_READ| EVBUFFER_ERROR) :
$errstr = 'Event buffer read error';
case (EVBUFFER_WRITE| EVBUFFER_ERROR) :
$errstr = 'Event buffer write error';
case (EVBUFFER_CONNECTED) :
$errstr = 'Event buffer connected error';
$errstr = 'Event buffer unknown error';
$res->status[$key] = array (
sprintf ('%s:%d', $sess->addr[$key][0], $sess->addr[$key][1]),
Vari::DEBUG1, "[%-12s #%02d] free %s on execption callback on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
// {{{ (bool) sThread::readCallback ($buf, $arg)
* 이 메소드는 외부에서 사용할 필요가 없다. private로
* 지정이 되어야 하나, event_buffer_new api가 class method를
* callback 함수로 사용을 하지 못하게 되어 있어, 외부 wrapper
* 를 통하여 호출을 하기 때문에 public으로 선언이 되었다.
* @param resource libevent resource
* @param array callback argument
self::explodeAddr ($host, $port, $type, $sess->addr[$key]);
Vari::DEBUG3, "[%-12s #%02d] Calling %s::%s %s:%d on %s:%d\n",
$host, $port, self::__f(__FILE__ ), __LINE__
if ( self::currentStatus ($key) !== Vari::EVENT_READY_RECV )
if ( ($handler = self::getCallname ($key)) === false ) {
self::socketClose ($key);
Vari::DEBUG1, "[%-12s #%02d] free event buffer on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
Vari::DEBUG2, "[%-12s #%02d] %s:%d Recieve %s call on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
Vari::DEBUG3, "[%-12s #%02d] %s:%d Recieved data on %s:%d[%s::%s]\n",
$host, $port, self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
if ( ePrint::$debugLevel >= Vari::DEBUG3 ) {
ePrint::echoi ($msg ? $msg . "\n" : "NONE\n", 8);
$recvR = self::$mod->$type->$handler ($sess, $key, $buffer);
self::$mod->$type->set_last_status ($sess, $key);
self::socketClose ($key);
Vari::DEBUG1, "[%-12s #%02d] free event buffer on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
Vari::DEBUG2, "[%-12s #%02d] %s:%d Complete %s call on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
* Unknown status or Regular connection end
if ( ($is_rw = self::nextStatus ($key)) === false ) {
self::socketClose ($key);
Vari::DEBUG1, "[%-12s #%02d] free event buffer on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
// {{{ (bool) sThread::writeCallback ($buf, $arg)
* libevent write callback
* 이 메소드는 외부에서 사용할 필요가 없다. private로
* 지정이 되어야 하나, event_buffer_new api가 class method를
* callback 함수로 사용을 하지 못하게 되어 있어, 외부 wrapper
* 를 통하여 호출을 하기 때문에 public으로 선언이 되었다.
* @param resource libevent resource
* @param array callback argument
list ($host, $port, $type) = $sess->addr[$key];
self::explodeAddr ($host, $port, $type, $sess->addr[$key]);
if ( $sess->send[$key] === Vari::EVENT_SEND_DONE ) {
$sess->send[$key] = Vari::EVENT_READY_SEND;
* Unknown status or Regular connection end
if ( ($is_rw = self::nextStatus ($key)) === false ) {
self::socketClose ($key);
Vari::DEBUG1, "[%-12s #%02d] free event buffer on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
Vari::DEBUG3, "[%-12s #%02d] Calling %s::%s %s:%d on %s:%d\n",
$host, $port, self::__f(__FILE__ ), __LINE__
if ( self::currentStatus ($key) !== Vari::EVENT_READY_SEND )
if ( ($handler = self::getCallname ($key)) === false ) {
self::socketClose ($key);
Vari::DEBUG1, "[%-12s #%02d] free event buffer on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
Vari::DEBUG2, "[%-12s #%02d] %s:%d Send %s call on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
$send = self::$mod->$type->$handler ($sess, $key);
// make error for send packet
self::$mod->$type->set_last_status ($sess, $key);
self::socketClose ($key);
Vari::DEBUG1, "[%-12s #%02d] free event buffer on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
if ( ePrint::$debugLevel >= Vari::DEBUG1 ) {
"[%-12s #%02d] Error: %s:%d Send error on %s:%d[%s::%s]",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
self::$mod->$type->set_last_status ($sess, $key);
$res->status[$key] = array ("{ $host}:{ $port}", false, "{ $handler} Send error ");
self::socketClose ($key);
Vari::DEBUG1, "[%-12s #%02d] free event buffer on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
Vari::DEBUG3, "[%-12s #%02d] %s:%d Send data on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
if ( ePrint::$debugLevel >= Vari::DEBUG3 ) {
ePrint::echoi ($send . "\n", 8);
Vari::DEBUG2, "[%-12s #%02d] %s:%d Complete %s call on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
$sess->send[$key] = Vari::EVENT_SEND_DONE;
// {{{ private (string) sThread::getCallname ($key)
* 현재 상태의 세션 handler 이름을 반환
private function getCallname ($key) {
self::explodeAddr ($host, $port, $type, $sess->addr[$key]);
$event = self::$mod->$type->call_status ($sess->status[$key], true);
if ( $event === false ) {
if ( ePrint::$debugLevel >= Vari::DEBUG1 ) {
"[%-12s #%02d] Error: %s:%d 'Unknown status => %d (%s)",
$host, $port, $sess->status[$key], $type
self::$mod->$type->set_last_status ($sess, $key);
$res->status[$key] = array (
"Unknown Status: {$sess->status[$key]} ({ $type}) "
// {{{ private (int) sThread::currentStatus ($key, $next = false)
* @param boolean true로 설정이 되면 처리 단계를 다음으로 전환한다.
private function currentStatus ($key, $next = false) {
self::explodeAddr ($host, $port, $type, $sess->addr[$key]);
self::$mod->$type->change_status ($sess, $key);
$is_rw = self::$mod->$type->check_buf_status ($sess->status[$key]);
* $is_wr가 sThread::EVENT_READY_CLOSE 상태가 되면, 모든 작업이
* 완료된 case 이므로 socket을 닫는다. $is_rw 가 false일 경우,
if ( $is_rw === Vari::EVENT_READY_CLOSE || $is_rw === false ) {
if ( $is_rw === false ) {
if ( ePrint::$debugLevel >= Vari::DEBUG1 ) {
"[%-12s #%02d] Error: %s:%d 'Unknown status => %d (%s)' on %s:%d[%s::%s]",
$host, $port, $sess->status[$key], $type,
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
self::$mod->$type->set_last_status ($sess, $key);
$res->status[$key] = array (
"Unknown Status: {$sess->status[$key]} ({ $type}) "
Vari::DEBUG1, "[%-12s #%02d] %s:%d Socket close on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
$res->status[$key] = array ("{ $host}:{ $port}", true, 'Success');
case Vari::EVENT_READY_SEND :
Vari::DEBUG3, "[%-12s #%02d] %s:%d Enable write event on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
Vari::DEBUG3, "[%-12s #%02d] %s:%d Enable read event on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
// {{{ private (void) sThread::explodeAddr (&$host, &$port, &$type, $v)
* 주소 형식 값에서 주소, 분리, 모듈 형식을 분리해 낸다.
* @param reference 주소 값을 저장할 변수 reference
* @param reference 포트 값을 저장할 변수 reference
* @param reference 모듈 이름을 저장할 변수 reference
* @param array|string 분리되지 않은 문자열 or 배열
private function explodeAddr (&$host, &$port, &$type, $v) {
//$type = preg_replace ('/\|.*/', '', $type);
// {{{ private (int) sThread::nextStatus ($key)
private function nextStatus ($key) {
return self::currentStatus ($key, true);
// {{{ private (void) sThread::clearEvent (void)
* 각 세션에서 사용한 변수들 정리 및 소켓 close
private function clearEvent () {
foreach ( $sess->status as $key => $val ) {
self::explodeAddr ($host, $port, $type, $sess->addr[$key]);
if ( $val !== Vari::EVENT_ERROR_CLOSE &&
self::$mod->$type->check_buf_status ($val) !== Vari::EVENT_READY_CLOSE ) {
list ($host, $port, $type) = Vari::$sess->addr[$key];
if ( ! Vari::$res->status[$key] ) {
Vari::$res->status[$key] = array ("{ $host}:{ $port}", false, 'Connection timeout');
Vari::$res->status[$key] = array ("{ $host}:{ $port}", false, 'Protocol timeout');
if ( self::$mod->$type->clearsession === true )
self::$mod->$type->clear_session ($key);
self::socketClose ($key);
if ( ! empty (self::$mod->$type) )
self::$mod->$type->init();
// {{{ private (void) sThread::socketClose ($key)
private function socketClose ($key) {
list ($host, $port, $type) = $sess->addr[$key];
if ( Vari::$res->status[$key][1] === false ) {
$handler = $type . '_quit';
Vari::DEBUG2, "[%-12s #%02d] %s:%d Quit call on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
$send = self::$mod->$type->$handler ($sess, $key);
@fwrite ($sess->sock[$key], $send, strlen ($send));
"[%-12s #%02d] %s:%d Send data on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
if ( ePrint::$debugLevel >= Vari::DEBUG3 ) {
ePrint::echoi ($send . "\n", 8);
Vari::DEBUG2, "[%-12s #%02d] close socket call on %s:%d[%s::%s]\n",
self::__f(__FILE__ ), __LINE__ , __CLASS__ , __FUNCTION__
// {{{ private sThread::timeResult ($key)
private function timeResult ($key) {
if ( isset ($time->pstart[$key]) )
$time->pend[$key] = microtime ();
$sess->ctime[$key] = Vari::chkTime ($time->cstart[$key], $time->cend[$key]);
$sess->ptime[$key] = Vari::chkTime ($time->pstart[$key], $time->pend[$key]);
$sess->time[$key] = Vari::timeCalc (array ($sess->ctime[$key], $sess->ptime[$key]));
unset (Vari::$time->cstart[$key]);
unset (Vari::$time->cend[$key]);
unset (Vari::$time->pstart[$key]);
unset (Vari::$time->pend[$key]);
// {{{ +-- private (string) sThread::__f($f)
private function __f($f) {
return preg_replace ('!.*/!', '', $f);
|