Add messages indexing

This commit is contained in:
Dmitriy Simushev 2013-04-30 09:42:45 +00:00
parent 3594485267
commit fa33342dbc
8 changed files with 239 additions and 10 deletions

View File

@ -15,7 +15,10 @@
* limitations under the License. * limitations under the License.
*/ */
// Initialize librariess
require_once('libs/init.php'); require_once('libs/init.php');
require_once('libs/classes/thread.php');
require_once('libs/cron.php');
$cron_key = empty($_GET['cron_key']) ? '' : $_GET['cron_key']; $cron_key = empty($_GET['cron_key']) ? '' : $_GET['cron_key'];
@ -24,6 +27,11 @@ if ($cron_key != Settings::get('cron_key')) {
die(); die();
} }
set_time_limit(0);
// Run cron jobs of the core
cron_index_messages();
// Trigger cron event // Trigger cron event
$dispatcher = EventDispatcher::getInstance(); $dispatcher = EventDispatcher::getInstance();
$dispatcher->triggerEvent('cronRun'); $dispatcher->triggerEvent('cronRun');

View File

@ -71,6 +71,10 @@ $dbtables = array(
"arguments" => "varchar(1024)" "arguments" => "varchar(1024)"
), ),
// Temporal message storage. Contain only messages for for open threads.
// There is cron job that clean up this table and move messages of the core
// kinds to ${mysqlprefix}indexedchatmessage. Plugins messages must be
// converted to one of the core kinds before move.
"${mysqlprefix}chatmessage" => array( "${mysqlprefix}chatmessage" => array(
"messageid" => "int NOT NULL auto_increment PRIMARY KEY", "messageid" => "int NOT NULL auto_increment PRIMARY KEY",
"threadid" => "int NOT NULL references ${mysqlprefix}chatthread(threadid)", "threadid" => "int NOT NULL references ${mysqlprefix}chatthread(threadid)",
@ -81,6 +85,18 @@ $dbtables = array(
"tname" => "varchar(64)" "tname" => "varchar(64)"
), ),
// Indexed messages used for search, history and statistics.
// Contain messages only of the Core kinds.
"${mysqlprefix}indexedchatmessage" => array(
"messageid" => "int NOT NULL auto_increment PRIMARY KEY",
"threadid" => "int NOT NULL references ${mysqlprefix}chatthread(threadid)",
"ikind" => "int NOT NULL",
"agentId" => "int NOT NULL DEFAULT 0",
"tmessage" => "text NOT NULL",
"dtmcreated" => "int NOT NULL DEFAULT 0",
"tname" => "varchar(64)"
),
"${mysqlprefix}chatoperator" => array( "${mysqlprefix}chatoperator" => array(
"operatorid" => "int NOT NULL auto_increment PRIMARY KEY", "operatorid" => "int NOT NULL auto_increment PRIMARY KEY",
"vclogin" => "varchar(64) NOT NULL", "vclogin" => "varchar(64) NOT NULL",
@ -174,6 +190,9 @@ $dbtables_indexes = array(
"${mysqlprefix}chatmessage" => array( "${mysqlprefix}chatmessage" => array(
"idx_agentid" => "agentid" "idx_agentid" => "agentid"
), ),
"${mysqlprefix}indexedchatmessage" => array(
"agentid" => "agentid"
),
"${mysqlprefix}chatsitevisitor" => array( "${mysqlprefix}chatsitevisitor" => array(
"threadid" => "threadid" "threadid" => "threadid"
), ),
@ -197,6 +216,7 @@ $dbtables_can_update = array(
"${mysqlprefix}chatthread" => array("agentId", "userTyping", "agentTyping", "messageCount", "nextagent", "shownmessageid", "userid", "userAgent", "groupid", "dtmchatstarted"), "${mysqlprefix}chatthread" => array("agentId", "userTyping", "agentTyping", "messageCount", "nextagent", "shownmessageid", "userid", "userAgent", "groupid", "dtmchatstarted"),
"${mysqlprefix}requestbuffer" => array("requestid", "requestkey", "request"), "${mysqlprefix}requestbuffer" => array("requestid", "requestkey", "request"),
"${mysqlprefix}chatmessage" => array("agentId"), "${mysqlprefix}chatmessage" => array("agentId"),
"${mysqlprefix}indexedchatmessage" => array(),
"${mysqlprefix}chatoperator" => array("vcavatar", "vcjabbername", "iperm", "istatus", "idisabled", "vcemail", "dtmrestore", "vcrestoretoken"), "${mysqlprefix}chatoperator" => array("vcavatar", "vcjabbername", "iperm", "istatus", "idisabled", "vcemail", "dtmrestore", "vcrestoretoken"),
"${mysqlprefix}chatban" => array(), "${mysqlprefix}chatban" => array(),
"${mysqlprefix}chatgroup" => array("vcemail", "iweight", "parent", "vctitle", "vcchattitle", "vclogo", "vchosturl"), "${mysqlprefix}chatgroup" => array("vcemail", "iweight", "parent", "vctitle", "vcchattitle", "vclogo", "vchosturl"),

View File

@ -225,6 +225,10 @@ if ($act == "silentcreateall") {
runsql("ALTER TABLE ${mysqlprefix}chatmessage ADD INDEX idx_agentid (agentid)", $link); runsql("ALTER TABLE ${mysqlprefix}chatmessage ADD INDEX idx_agentid (agentid)", $link);
} }
if (in_array("${mysqlprefix}indexedchatmessage.agentid", $absent_indexes)) {
runsql("ALTER TABLE ${mysqlprefix}indexedchatmessage ADD INDEX (agentid)", $link);
}
if (in_array("${mysqlprefix}chatsitevisitor.threadid", $absent_indexes)) { if (in_array("${mysqlprefix}chatsitevisitor.threadid", $absent_indexes)) {
runsql("ALTER TABLE ${mysqlprefix}chatsitevisitor ADD INDEX (threadid)", $link); runsql("ALTER TABLE ${mysqlprefix}chatsitevisitor ADD INDEX (threadid)", $link);
} }

View File

@ -656,6 +656,8 @@ Class Thread {
* @param boolean $is_user Boolean TRUE if messages loads for user * @param boolean $is_user Boolean TRUE if messages loads for user
* and boolean FALSE if they loads for operator. * and boolean FALSE if they loads for operator.
* @param int $lastid ID of the last loaded message. * @param int $lastid ID of the last loaded message.
* @param boolean $indexed Indicates if indexed messages be should returned.
* Default value is false.
* @return array Array of messages. Every message is associative array with * @return array Array of messages. Every message is associative array with
* following keys: * following keys:
* - 'id': int, message id; * - 'id': int, message id;
@ -666,16 +668,18 @@ Class Thread {
* Thread::KIND_PLUGIN and message text string otherwise. * Thread::KIND_PLUGIN and message text string otherwise.
* @see Thread::postMessage() * @see Thread::postMessage()
*/ */
public function getMessages($is_user, &$last_id) { public function getMessages($is_user, &$last_id, $indexed = false) {
global $webim_encoding; global $webim_encoding;
$db = Database::getInstance(); $db = Database::getInstance();
$messages_table = $indexed ? '{indexedchatmessage}' : '{chatmessage}';
// Load messages // Load messages
$messages = $db->query( $messages = $db->query(
"select messageid as id, ikind as kind, dtmcreated as created, " . "select messageid as id, ikind as kind, dtmcreated as created, " .
" tname as name, tmessage as message " . " tname as name, tmessage as message " .
"from {chatmessage} " . "from " . $messages_table . " " .
"where threadid = :threadid and messageid > :lastid " . "where threadid = :threadid and messageid > :lastid " .
($is_user ? "and ikind <> " . self::KIND_FOR_AGENT : "") . ($is_user ? "and ikind <> " . self::KIND_FOR_AGENT : "") .
" order by messageid", " order by messageid",

View File

@ -0,0 +1,193 @@
<?php
/*
* Copyright 2005-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Index messages for closed threads
*
* History and serarch works only for indexed messages.
*
* Trigger 'pluginMessageIndex' event. Plugins can use this event to
* index their own messages.
*
* Event listener receives as argument an associative array with following keys:
* - 'message': associative array with message data;
* - 'result': array of indexed message data. Default value is boolean false.
*
* The 'message' element contains following keys:
* - 'plugin': string, name of the plugin which sent the message;
* - 'data': array, data sent by the plugin.
*
* Plugin can use the 'result' element to return indexed message data. By default it
* equals to boolean false. To index message plugin should set this element to
* an associative array with following keys:
* - 'kind': int, indexed message kind. It must be one of the core messages
* kinds, not Thread::KIND_PLUGIN.
* - 'message': string, text of indexed message.
*
* If the 'result' element equals to boolean false message will be skipped.
*
* Example of event listener is listed below:
* <code>
* public function pluginMessageIndexListener(&$args) {
* // Create shortcut for stored data
* $data = $args['message']['data'];
* // Check if message was sent by current plugin
* if ($args['message']['plugin'] == 'example') {
* $args['result'] = array(
* // Plugin should set one of the core message kinds to indexed message
* 'kind' => Thread::KIND_INFO,
* // Plugin should set arbitrary text for indexed message
* 'message' => $data['title'] . ': ' . $data['body']
* );
* }
* }
* </code>
*/
function cron_index_messages() {
$db = Database::getInstance();
$db->throwExeptions(true);
try {
// Start transaction
$db->query('START TRANSACTION');
// Select messages from closed threads
$messages = $db->query(
"SELECT {chatmessage}.* FROM {chatmessage}, {chatthread} " .
"WHERE {chatmessage}.threadid = {chatthread}.threadid AND " .
"({chatthread}.istate = :closed OR {chatthread}.istate = :left)",
array(
':closed' => Thread::STATE_CLOSED,
':left' => Thread::STATE_LEFT
),
array('return_rows' => Database::RETURN_ALL_ROWS)
);
// Prevent race condition. Remove only moved to {indexedchatmessage}
// messages. Get last loaded message ID for that.
$last_id = 0;
$dispatcher = EventDispatcher::getInstance();
foreach($messages as $key => $message) {
// Find last message ID
if ($message['messageid'] > $last_id) {
$last_id = $message['messageid'];
}
// Leave core messages kind as is
if ($message['ikind'] != Thread::KIND_PLUGIN) {
continue;
}
// Provide an ability for plugins to index own messages
$event_args = array(
'message' => unserialize($message['tmessage']),
'result' => false
);
$dispatcher->triggerEvent('pluginMessageIndex', $event_args);
// Check if message was processed by a plugin correctly
$update_message = true;
if (empty($event_args['result']['message'])) {
$update_message = false;
}
// Check if kind set and correct
if (empty($event_args['result']['kind'])
|| $event_args['result']['kind'] == Thread::KIND_PLUGIN) {
$update_message = false;
}
// Check if message should be updated
if (! $update_message) {
unset($messages[$key]);
continue;
}
// Update message
$messages[$key]['ikind'] = $event_args['result']['kind'];
$messages[$key]['tmessage'] = $event_args['result']['message'];
}
// Check is there some messages that should be saved
if (count($messages) != 0) {
// Reindex messages array
$messages = array_values($messages);
// Prepare SQL query template
$message_fields = array_keys($messages[0]);
$placeholders = '(' .
implode(', ', array_fill(0, count($message_fields), '?')) . ')';
$sql_template = 'INSERT INTO {indexedchatmessage} (' .
implode(', ', $message_fields) . ') VALUES ';
// Insert indexed messages into database by $block_size messages per
// sql query
$block_size = 20;
$iteration_count = ceil(count($messages) / $block_size);
for($i = 0; $i < $iteration_count; $i++) {
// Get messages block
$messages_block = array_slice(
$messages,
$i * $block_size,
$block_size
);
// Count of $messages_block can be less than $block_size for
// the last block of messages.
$real_block_size = count($messages_block);
// Build array of inserted values
$fields_to_insert = array();
foreach($messages_block as $message) {
foreach($message_fields as $field_name) {
$fields_to_insert[] = $message[$field_name];
}
}
// Build query
$sql = $sql_template . implode(
', ',
array_fill(0, $real_block_size, $placeholders)
);
// Run query
$db->query($sql, $fields_to_insert);
}
}
// Check is there some processed messages that should be deleted
if ($last_id != 0) {
// Delete indexed messages
$db->query(
'DELETE FROM {chatmessage} where messageid <= :last_id',
array(':last_id' => $last_id)
);
}
} catch (Exception $e) {
// Something went wrong: warn and rollback transaction.
trigger_error(
'Messages indexing faild: ' . $e->getMessage(),
E_USER_WARNING
);
$db->query('ROLLBACK');
return;
}
// Commit transaction
$db->query('COMMIT');
}
?>

View File

@ -53,8 +53,8 @@ if ($query !== false) {
$searchConditions = array(); $searchConditions = array();
if ($searchType == 'message' || $searchType == 'all') { if ($searchType == 'message' || $searchType == 'all') {
$searchConditions[] = "({chatmessage}.tmessage LIKE :query" . $searchConditions[] = "({indexedchatmessage}.tmessage LIKE :query" .
($searchInSystemMessages?'':" AND ({chatmessage}.ikind = :kind_user OR {chatmessage}.ikind = :kind_agent)") . ($searchInSystemMessages?'':" AND ({indexedchatmessage}.ikind = :kind_user OR {indexedchatmessage}.ikind = :kind_agent)") .
")"; ")";
if (! $searchInSystemMessages) { if (! $searchInSystemMessages) {
$values[':kind_user'] = Thread::KIND_USER; $values[':kind_user'] = Thread::KIND_USER;
@ -71,9 +71,9 @@ if ($query !== false) {
// Load threads // Load threads
select_with_pagintation("DISTINCT {chatthread}.*", select_with_pagintation("DISTINCT {chatthread}.*",
"{chatthread}, {chatmessage}", "{chatthread}, {indexedchatmessage}",
array( array(
"{chatmessage}.threadid = {chatthread}.threadid", "{indexedchatmessage}.threadid = {chatthread}.threadid",
"(" . implode(' or ', $searchConditions) . ")" "(" . implode(' or ', $searchConditions) . ")"
), ),
"order by {chatthread}.dtmcreated DESC", "order by {chatthread}.dtmcreated DESC",

View File

@ -75,7 +75,7 @@ $db = Database::getInstance();
if ($statisticstype == 'bydate') { if ($statisticstype == 'bydate') {
$page['reportByDate'] = $db->query( $page['reportByDate'] = $db->query(
"select DATE(FROM_UNIXTIME(t.dtmcreated)) as date, COUNT(distinct t.threadid) as threads, SUM(m.ikind = :kind_agent) as agents, SUM(m.ikind = :kind_user) as users, ROUND(AVG(t.dtmchatstarted-t.dtmcreated),1) as avgwaitingtime, ROUND(AVG(tmp.lastmsgtime - t.dtmchatstarted),1) as avgchattime " . "select DATE(FROM_UNIXTIME(t.dtmcreated)) as date, COUNT(distinct t.threadid) as threads, SUM(m.ikind = :kind_agent) as agents, SUM(m.ikind = :kind_user) as users, ROUND(AVG(t.dtmchatstarted-t.dtmcreated),1) as avgwaitingtime, ROUND(AVG(tmp.lastmsgtime - t.dtmchatstarted),1) as avgchattime " .
"from {chatmessage} m, {chatthread} t, (SELECT i.threadid, MAX(i.dtmcreated) AS lastmsgtime FROM {chatmessage} i WHERE (ikind = :kind_user OR ikind = :kind_agent) GROUP BY i.threadid) tmp " . "from {indexedchatmessage} m, {chatthread} t, (SELECT i.threadid, MAX(i.dtmcreated) AS lastmsgtime FROM {indexedchatmessage} i WHERE (ikind = :kind_user OR ikind = :kind_agent) GROUP BY i.threadid) tmp " .
"where m.threadid = t.threadid AND tmp.threadid = t.threadid AND t.dtmchatstarted <> 0 AND m.dtmcreated >= :start AND m.dtmcreated < :end group by DATE(FROM_UNIXTIME(m.dtmcreated)) order by m.dtmcreated desc", "where m.threadid = t.threadid AND tmp.threadid = t.threadid AND t.dtmchatstarted <> 0 AND m.dtmcreated >= :start AND m.dtmcreated < :end group by DATE(FROM_UNIXTIME(m.dtmcreated)) order by m.dtmcreated desc",
array( array(
':kind_agent' => Thread::KIND_AGENT, ':kind_agent' => Thread::KIND_AGENT,
@ -88,7 +88,7 @@ if ($statisticstype == 'bydate') {
$page['reportByDateTotal'] = $db->query( $page['reportByDateTotal'] = $db->query(
"select DATE(FROM_UNIXTIME(t.dtmcreated)) as date, COUNT(distinct t.threadid) as threads, SUM(m.ikind = :kind_agent) as agents, SUM(m.ikind = :kind_user) as users, ROUND(AVG(t.dtmchatstarted-t.dtmcreated),1) as avgwaitingtime, ROUND(AVG(tmp.lastmsgtime - t.dtmchatstarted),1) as avgchattime " . "select DATE(FROM_UNIXTIME(t.dtmcreated)) as date, COUNT(distinct t.threadid) as threads, SUM(m.ikind = :kind_agent) as agents, SUM(m.ikind = :kind_user) as users, ROUND(AVG(t.dtmchatstarted-t.dtmcreated),1) as avgwaitingtime, ROUND(AVG(tmp.lastmsgtime - t.dtmchatstarted),1) as avgchattime " .
"from {chatmessage} m, {chatthread} t, (SELECT i.threadid, MAX(i.dtmcreated) AS lastmsgtime FROM {chatmessage} i WHERE (ikind = :kind_user OR ikind = :kind_agent) GROUP BY i.threadid) tmp " . "from {indexedchatmessage} m, {chatthread} t, (SELECT i.threadid, MAX(i.dtmcreated) AS lastmsgtime FROM {indexedchatmessage} i WHERE (ikind = :kind_user OR ikind = :kind_agent) GROUP BY i.threadid) tmp " .
"where m.threadid = t.threadid AND tmp.threadid = t.threadid AND t.dtmchatstarted <> 0 AND m.dtmcreated >= :start AND m.dtmcreated < :end", "where m.threadid = t.threadid AND tmp.threadid = t.threadid AND t.dtmchatstarted <> 0 AND m.dtmcreated >= :start AND m.dtmcreated < :end",
array( array(
':kind_agent' => Thread::KIND_AGENT, ':kind_agent' => Thread::KIND_AGENT,
@ -103,7 +103,7 @@ if ($statisticstype == 'bydate') {
$page['reportByAgent'] = $db->query( $page['reportByAgent'] = $db->query(
"select vclocalename as name, COUNT(distinct threadid) as threads, " . "select vclocalename as name, COUNT(distinct threadid) as threads, " .
"SUM(ikind = :kind_agent) as msgs, AVG(CHAR_LENGTH(tmessage)) as avglen " . "SUM(ikind = :kind_agent) as msgs, AVG(CHAR_LENGTH(tmessage)) as avglen " .
"from {chatmessage}, {chatoperator} " . "from {indexedchatmessage}, {chatoperator} " .
"where agentId = operatorid AND dtmcreated >= :start " . "where agentId = operatorid AND dtmcreated >= :start " .
"AND dtmcreated < :end group by operatorid", "AND dtmcreated < :end group by operatorid",
array( array(

View File

@ -52,7 +52,7 @@ if (isset($_GET['threadid'])) {
// Build messages list // Build messages list
$lastid = -1; $lastid = -1;
$messages = $thread_info['thread']->getMessages(false, $lastid); $messages = $thread_info['thread']->getMessages(false, $lastid, true);
$page['threadMessages'] = json_encode($messages); $page['threadMessages'] = json_encode($messages);
} }