From fa33342dbcea5e4c2261d9c7dbaeb34ef85f503e Mon Sep 17 00:00:00 2001 From: Dmitriy Simushev Date: Tue, 30 Apr 2013 09:42:45 +0000 Subject: [PATCH] Add messages indexing --- src/messenger/webim/cron.php | 8 + src/messenger/webim/install/dbinfo.php | 20 ++ src/messenger/webim/install/dbperform.php | 4 + src/messenger/webim/libs/classes/thread.php | 8 +- src/messenger/webim/libs/cron.php | 193 ++++++++++++++++++ src/messenger/webim/operator/history.php | 8 +- src/messenger/webim/operator/statistics.php | 6 +- .../webim/operator/threadprocessor.php | 2 +- 8 files changed, 239 insertions(+), 10 deletions(-) create mode 100644 src/messenger/webim/libs/cron.php diff --git a/src/messenger/webim/cron.php b/src/messenger/webim/cron.php index eb0942bb..2715a157 100644 --- a/src/messenger/webim/cron.php +++ b/src/messenger/webim/cron.php @@ -15,7 +15,10 @@ * limitations under the License. */ +// Initialize librariess require_once('libs/init.php'); +require_once('libs/classes/thread.php'); +require_once('libs/cron.php'); $cron_key = empty($_GET['cron_key']) ? '' : $_GET['cron_key']; @@ -24,6 +27,11 @@ if ($cron_key != Settings::get('cron_key')) { die(); } +set_time_limit(0); + +// Run cron jobs of the core +cron_index_messages(); + // Trigger cron event $dispatcher = EventDispatcher::getInstance(); $dispatcher->triggerEvent('cronRun'); diff --git a/src/messenger/webim/install/dbinfo.php b/src/messenger/webim/install/dbinfo.php index 9f52b8f8..9b6a1e1e 100644 --- a/src/messenger/webim/install/dbinfo.php +++ b/src/messenger/webim/install/dbinfo.php @@ -71,6 +71,10 @@ $dbtables = array( "arguments" => "varchar(1024)" ), + // Temporal message storage. Contain only messages for for open threads. + // There is cron job that clean up this table and move messages of the core + // kinds to ${mysqlprefix}indexedchatmessage. Plugins messages must be + // converted to one of the core kinds before move. "${mysqlprefix}chatmessage" => array( "messageid" => "int NOT NULL auto_increment PRIMARY KEY", "threadid" => "int NOT NULL references ${mysqlprefix}chatthread(threadid)", @@ -81,6 +85,18 @@ $dbtables = array( "tname" => "varchar(64)" ), + // Indexed messages used for search, history and statistics. + // Contain messages only of the Core kinds. + "${mysqlprefix}indexedchatmessage" => array( + "messageid" => "int NOT NULL auto_increment PRIMARY KEY", + "threadid" => "int NOT NULL references ${mysqlprefix}chatthread(threadid)", + "ikind" => "int NOT NULL", + "agentId" => "int NOT NULL DEFAULT 0", + "tmessage" => "text NOT NULL", + "dtmcreated" => "int NOT NULL DEFAULT 0", + "tname" => "varchar(64)" + ), + "${mysqlprefix}chatoperator" => array( "operatorid" => "int NOT NULL auto_increment PRIMARY KEY", "vclogin" => "varchar(64) NOT NULL", @@ -174,6 +190,9 @@ $dbtables_indexes = array( "${mysqlprefix}chatmessage" => array( "idx_agentid" => "agentid" ), + "${mysqlprefix}indexedchatmessage" => array( + "agentid" => "agentid" + ), "${mysqlprefix}chatsitevisitor" => array( "threadid" => "threadid" ), @@ -197,6 +216,7 @@ $dbtables_can_update = array( "${mysqlprefix}chatthread" => array("agentId", "userTyping", "agentTyping", "messageCount", "nextagent", "shownmessageid", "userid", "userAgent", "groupid", "dtmchatstarted"), "${mysqlprefix}requestbuffer" => array("requestid", "requestkey", "request"), "${mysqlprefix}chatmessage" => array("agentId"), + "${mysqlprefix}indexedchatmessage" => array(), "${mysqlprefix}chatoperator" => array("vcavatar", "vcjabbername", "iperm", "istatus", "idisabled", "vcemail", "dtmrestore", "vcrestoretoken"), "${mysqlprefix}chatban" => array(), "${mysqlprefix}chatgroup" => array("vcemail", "iweight", "parent", "vctitle", "vcchattitle", "vclogo", "vchosturl"), diff --git a/src/messenger/webim/install/dbperform.php b/src/messenger/webim/install/dbperform.php index 1e150343..8de2431a 100644 --- a/src/messenger/webim/install/dbperform.php +++ b/src/messenger/webim/install/dbperform.php @@ -225,6 +225,10 @@ if ($act == "silentcreateall") { runsql("ALTER TABLE ${mysqlprefix}chatmessage ADD INDEX idx_agentid (agentid)", $link); } + if (in_array("${mysqlprefix}indexedchatmessage.agentid", $absent_indexes)) { + runsql("ALTER TABLE ${mysqlprefix}indexedchatmessage ADD INDEX (agentid)", $link); + } + if (in_array("${mysqlprefix}chatsitevisitor.threadid", $absent_indexes)) { runsql("ALTER TABLE ${mysqlprefix}chatsitevisitor ADD INDEX (threadid)", $link); } diff --git a/src/messenger/webim/libs/classes/thread.php b/src/messenger/webim/libs/classes/thread.php index e0d88eba..679c6e06 100644 --- a/src/messenger/webim/libs/classes/thread.php +++ b/src/messenger/webim/libs/classes/thread.php @@ -656,6 +656,8 @@ Class Thread { * @param boolean $is_user Boolean TRUE if messages loads for user * and boolean FALSE if they loads for operator. * @param int $lastid ID of the last loaded message. + * @param boolean $indexed Indicates if indexed messages be should returned. + * Default value is false. * @return array Array of messages. Every message is associative array with * following keys: * - 'id': int, message id; @@ -666,16 +668,18 @@ Class Thread { * Thread::KIND_PLUGIN and message text string otherwise. * @see Thread::postMessage() */ - public function getMessages($is_user, &$last_id) { + public function getMessages($is_user, &$last_id, $indexed = false) { global $webim_encoding; $db = Database::getInstance(); + $messages_table = $indexed ? '{indexedchatmessage}' : '{chatmessage}'; + // Load messages $messages = $db->query( "select messageid as id, ikind as kind, dtmcreated as created, " . " tname as name, tmessage as message " . - "from {chatmessage} " . + "from " . $messages_table . " " . "where threadid = :threadid and messageid > :lastid " . ($is_user ? "and ikind <> " . self::KIND_FOR_AGENT : "") . " order by messageid", diff --git a/src/messenger/webim/libs/cron.php b/src/messenger/webim/libs/cron.php new file mode 100644 index 00000000..7e7d3978 --- /dev/null +++ b/src/messenger/webim/libs/cron.php @@ -0,0 +1,193 @@ + + * public function pluginMessageIndexListener(&$args) { + * // Create shortcut for stored data + * $data = $args['message']['data']; + * // Check if message was sent by current plugin + * if ($args['message']['plugin'] == 'example') { + * $args['result'] = array( + * // Plugin should set one of the core message kinds to indexed message + * 'kind' => Thread::KIND_INFO, + * // Plugin should set arbitrary text for indexed message + * 'message' => $data['title'] . ': ' . $data['body'] + * ); + * } + * } + * + */ +function cron_index_messages() { + $db = Database::getInstance(); + $db->throwExeptions(true); + + try { + // Start transaction + $db->query('START TRANSACTION'); + + // Select messages from closed threads + $messages = $db->query( + "SELECT {chatmessage}.* FROM {chatmessage}, {chatthread} " . + "WHERE {chatmessage}.threadid = {chatthread}.threadid AND " . + "({chatthread}.istate = :closed OR {chatthread}.istate = :left)", + array( + ':closed' => Thread::STATE_CLOSED, + ':left' => Thread::STATE_LEFT + ), + array('return_rows' => Database::RETURN_ALL_ROWS) + ); + + // Prevent race condition. Remove only moved to {indexedchatmessage} + // messages. Get last loaded message ID for that. + $last_id = 0; + $dispatcher = EventDispatcher::getInstance(); + foreach($messages as $key => $message) { + // Find last message ID + if ($message['messageid'] > $last_id) { + $last_id = $message['messageid']; + } + + // Leave core messages kind as is + if ($message['ikind'] != Thread::KIND_PLUGIN) { + continue; + } + + // Provide an ability for plugins to index own messages + $event_args = array( + 'message' => unserialize($message['tmessage']), + 'result' => false + ); + $dispatcher->triggerEvent('pluginMessageIndex', $event_args); + + // Check if message was processed by a plugin correctly + $update_message = true; + if (empty($event_args['result']['message'])) { + $update_message = false; + } + + // Check if kind set and correct + if (empty($event_args['result']['kind']) + || $event_args['result']['kind'] == Thread::KIND_PLUGIN) { + $update_message = false; + } + + // Check if message should be updated + if (! $update_message) { + unset($messages[$key]); + continue; + } + + // Update message + $messages[$key]['ikind'] = $event_args['result']['kind']; + $messages[$key]['tmessage'] = $event_args['result']['message']; + } + + // Check is there some messages that should be saved + if (count($messages) != 0) { + // Reindex messages array + $messages = array_values($messages); + // Prepare SQL query template + $message_fields = array_keys($messages[0]); + $placeholders = '(' . + implode(', ', array_fill(0, count($message_fields), '?')) . ')'; + $sql_template = 'INSERT INTO {indexedchatmessage} (' . + implode(', ', $message_fields) . ') VALUES '; + + // Insert indexed messages into database by $block_size messages per + // sql query + $block_size = 20; + $iteration_count = ceil(count($messages) / $block_size); + for($i = 0; $i < $iteration_count; $i++) { + // Get messages block + $messages_block = array_slice( + $messages, + $i * $block_size, + $block_size + ); + + // Count of $messages_block can be less than $block_size for + // the last block of messages. + $real_block_size = count($messages_block); + + // Build array of inserted values + $fields_to_insert = array(); + foreach($messages_block as $message) { + foreach($message_fields as $field_name) { + $fields_to_insert[] = $message[$field_name]; + } + } + + // Build query + $sql = $sql_template . implode( + ', ', + array_fill(0, $real_block_size, $placeholders) + ); + + // Run query + $db->query($sql, $fields_to_insert); + } + } + + // Check is there some processed messages that should be deleted + if ($last_id != 0) { + // Delete indexed messages + $db->query( + 'DELETE FROM {chatmessage} where messageid <= :last_id', + array(':last_id' => $last_id) + ); + } + } catch (Exception $e) { + // Something went wrong: warn and rollback transaction. + trigger_error( + 'Messages indexing faild: ' . $e->getMessage(), + E_USER_WARNING + ); + $db->query('ROLLBACK'); + return; + } + + // Commit transaction + $db->query('COMMIT'); +} + +?> \ No newline at end of file diff --git a/src/messenger/webim/operator/history.php b/src/messenger/webim/operator/history.php index c5ac9e21..acada596 100644 --- a/src/messenger/webim/operator/history.php +++ b/src/messenger/webim/operator/history.php @@ -53,8 +53,8 @@ if ($query !== false) { $searchConditions = array(); if ($searchType == 'message' || $searchType == 'all') { - $searchConditions[] = "({chatmessage}.tmessage LIKE :query" . - ($searchInSystemMessages?'':" AND ({chatmessage}.ikind = :kind_user OR {chatmessage}.ikind = :kind_agent)") . + $searchConditions[] = "({indexedchatmessage}.tmessage LIKE :query" . + ($searchInSystemMessages?'':" AND ({indexedchatmessage}.ikind = :kind_user OR {indexedchatmessage}.ikind = :kind_agent)") . ")"; if (! $searchInSystemMessages) { $values[':kind_user'] = Thread::KIND_USER; @@ -71,9 +71,9 @@ if ($query !== false) { // Load threads select_with_pagintation("DISTINCT {chatthread}.*", - "{chatthread}, {chatmessage}", + "{chatthread}, {indexedchatmessage}", array( - "{chatmessage}.threadid = {chatthread}.threadid", + "{indexedchatmessage}.threadid = {chatthread}.threadid", "(" . implode(' or ', $searchConditions) . ")" ), "order by {chatthread}.dtmcreated DESC", diff --git a/src/messenger/webim/operator/statistics.php b/src/messenger/webim/operator/statistics.php index a664c036..0f6b41e6 100644 --- a/src/messenger/webim/operator/statistics.php +++ b/src/messenger/webim/operator/statistics.php @@ -75,7 +75,7 @@ $db = Database::getInstance(); if ($statisticstype == 'bydate') { $page['reportByDate'] = $db->query( "select DATE(FROM_UNIXTIME(t.dtmcreated)) as date, COUNT(distinct t.threadid) as threads, SUM(m.ikind = :kind_agent) as agents, SUM(m.ikind = :kind_user) as users, ROUND(AVG(t.dtmchatstarted-t.dtmcreated),1) as avgwaitingtime, ROUND(AVG(tmp.lastmsgtime - t.dtmchatstarted),1) as avgchattime " . - "from {chatmessage} m, {chatthread} t, (SELECT i.threadid, MAX(i.dtmcreated) AS lastmsgtime FROM {chatmessage} i WHERE (ikind = :kind_user OR ikind = :kind_agent) GROUP BY i.threadid) tmp " . + "from {indexedchatmessage} m, {chatthread} t, (SELECT i.threadid, MAX(i.dtmcreated) AS lastmsgtime FROM {indexedchatmessage} i WHERE (ikind = :kind_user OR ikind = :kind_agent) GROUP BY i.threadid) tmp " . "where m.threadid = t.threadid AND tmp.threadid = t.threadid AND t.dtmchatstarted <> 0 AND m.dtmcreated >= :start AND m.dtmcreated < :end group by DATE(FROM_UNIXTIME(m.dtmcreated)) order by m.dtmcreated desc", array( ':kind_agent' => Thread::KIND_AGENT, @@ -88,7 +88,7 @@ if ($statisticstype == 'bydate') { $page['reportByDateTotal'] = $db->query( "select DATE(FROM_UNIXTIME(t.dtmcreated)) as date, COUNT(distinct t.threadid) as threads, SUM(m.ikind = :kind_agent) as agents, SUM(m.ikind = :kind_user) as users, ROUND(AVG(t.dtmchatstarted-t.dtmcreated),1) as avgwaitingtime, ROUND(AVG(tmp.lastmsgtime - t.dtmchatstarted),1) as avgchattime " . - "from {chatmessage} m, {chatthread} t, (SELECT i.threadid, MAX(i.dtmcreated) AS lastmsgtime FROM {chatmessage} i WHERE (ikind = :kind_user OR ikind = :kind_agent) GROUP BY i.threadid) tmp " . + "from {indexedchatmessage} m, {chatthread} t, (SELECT i.threadid, MAX(i.dtmcreated) AS lastmsgtime FROM {indexedchatmessage} i WHERE (ikind = :kind_user OR ikind = :kind_agent) GROUP BY i.threadid) tmp " . "where m.threadid = t.threadid AND tmp.threadid = t.threadid AND t.dtmchatstarted <> 0 AND m.dtmcreated >= :start AND m.dtmcreated < :end", array( ':kind_agent' => Thread::KIND_AGENT, @@ -103,7 +103,7 @@ if ($statisticstype == 'bydate') { $page['reportByAgent'] = $db->query( "select vclocalename as name, COUNT(distinct threadid) as threads, " . "SUM(ikind = :kind_agent) as msgs, AVG(CHAR_LENGTH(tmessage)) as avglen " . - "from {chatmessage}, {chatoperator} " . + "from {indexedchatmessage}, {chatoperator} " . "where agentId = operatorid AND dtmcreated >= :start " . "AND dtmcreated < :end group by operatorid", array( diff --git a/src/messenger/webim/operator/threadprocessor.php b/src/messenger/webim/operator/threadprocessor.php index e3fd2272..5e301276 100644 --- a/src/messenger/webim/operator/threadprocessor.php +++ b/src/messenger/webim/operator/threadprocessor.php @@ -52,7 +52,7 @@ if (isset($_GET['threadid'])) { // Build messages list $lastid = -1; - $messages = $thread_info['thread']->getMessages(false, $lastid); + $messages = $thread_info['thread']->getMessages(false, $lastid, true); $page['threadMessages'] = json_encode($messages); }