handle_connections_sockets()
while (!abort_loop) {
retval = poll(fds, socket_count, -1); // fds: ip_sock, unix_sock
new_sock = accept(sock, ...);
thd = new THD;
vio_tmp = vio_new(new_sock, VIO_TYPE_SOCKET/TCPIP...);
my_net_init(&thd->net, vio_tmp);
create_new_thread(thd);
}
create_new_thread(THD *thd)
// 首先检测当前的connection_count是否超出了max_connections,如果超出,close_connection(thd, ER_CON_COUNT_ERROR);
close_connection(THD *thd, uint sql_errno);
if (sql_errno)
net_send_error(thd, sql_errno, ER_DEFAULT(sql_errno), NULL);
net_send_error_packet(thd, sql_errno, errmsg, sqlstate);
mysql_mutex_lock(&LOCK_thread_count);
thd->thread_id = thread_id++;
thread_count++;
// MYSQL_CALLBACK(thread_scheduler, add_connection, (thd));
if (thread_scheduler && thread_scheduler->add_connection) {
thread_scheduler->add_connection(thd);
}
// thread_scheduler = &one_thread_per_connection_scheduler_functions;
// add_connection = create_thread_to_handle_connection
create_thread_to_handle_connection(THD *thd);
if (cached_thread_count > wake_thread) {
thread_cache.push_back(thd);
wake_thread++;
mysql_cond_signal(&COND_thread_cache);
} else {
threads.append(thd);
mysql_thread_create(
key_thread_one_connection,
&thd->real_id,
&connection_attrib,
handle_one_connection,
thd
);
// mysql_thread_create() => pthread_create()
pthread_create(&thd->real_id, &connection_attrib, handle_one_connection, thd);
}
mysql_mutex_unlock(&LOCK_thread_count);
handle_one_connection(thd) => do_handle_one_connection(thd)
// MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0)
thread_scheduler && thread_scheduler->init_new_connection_thread
? thread_scheduler->init_new_connection_thread()
: 0
// thread_scheduler->init_new_connection_thread = init_new_connection_handler_thread()
pthread_detach_this_thread();
my_thread_init();
for (;;) {
thd_prepare_connection(thd);
while (thd_is_connection_alive(thd)) {
if (do_command(thd)) {
break;
}
}
end_connection(thd);
close_connection(thd);
// MYSQL_CALLBACK_ELSE(thread_scheduler, end_thread, (thd, 1), 0)
// thread_scheduler->end_thread = one_thread_per_connection_end;
if (one_thread_per_connection_end(thd, 1)) {
return;
}
thd = current_thd;
}
one_thread_per_connection_end(THD *thd, bool put_in_cache)
if (put_in_cache) {
put_in_cache = cache_thread();
}
if (put_in_cache) {
return 0;
}
cache_thread()
while (!abort_loop && !wake_thread && !kill_cached_threads) {
mysql_cond_wait(&COND_thread_cache, &LOCK_thread_count);
}
cached_thread_count--;
if (wake_thread) {
wake_thread--;
THD *thd = thread_cache.get();
threads.append(thd);
}
thd_prepare_connection(THD *thd)
login_connection(thd);
check_connection(thd);
acl_authenticate(); // Perform the handshake, authorize the client and update thd sctx variables.
do_auth_once();
prepare_new_connection_state(thd);
if (thd->client_capabilities & CLIENT_COMPRESS) {
thd->net.compress = 1;
}
thd->init_for_queries();
do_command(THD *thd)
// 读取第一个字节,判断command是否超出范围,如果超出范围,就设command = COM_END.
// 然后 dispatch_command()
dispatch_command(enum enum_server_command command, THD *thd, char *packet, uint packet_length);