【google源码版】【隐私雷达源码】【彩虹2源码】pthread_mutex_lock 源码

1.如何让 Qt 程序 Sleep
2.LockSupport的park等待的底层实现
3.多线程死锁检测的源码分析与实现(linux c)-有向图的应用
4.如何让 Qt 的程序使用 Sleep
5.如何杀掉空闲事务

pthread_mutex_lock 源码

如何让 Qt 程序 Sleep

       ä½¿ç”¨å¹³å°ç›¸å…³çš„ Sleep 或 nanosleep 以后,界面为什么没有反应?QThread 中提供了protected 权限的 sleep 函数,如何用到主线程中?使用QTest 中的 qSleep,在windows下如何隐藏控制台?这些问题其实归结为一点:在主线程中使用这些函数是一种错误,这会直接导致界面无法刷新,用户与程序无法交互。Qt不提供,是因为你不需要在主线程中使用 sleep 函数。如何让程序等待一段时间QTimeQTime t; t.start(); while(t.elapsed()<);这种死循环也是一种常见错误用法。但改成正确的还是比较简单的:QTime t; t.start(); while(t.elapsed()<) QCoreApplication::processEvents();不停地处理事件,以使得程序保持响应。QElapsedTimer这是Qt4.7引入的新的类,和QTime相比,它提供了更快的计算 elapsed 时间的方法。QElapsedTimer t; t.start(); while(t.elapsed()<) QCoreApplication::processEvents();QTest::qWait这是QTest模块提供的等待函数下面是其源代码(和我们前面的代码很像吧?):namespace QTest { inline static void qWait(int ms) { Q_ASSERT(QCoreApplication::instance()); QElapsedTimer timer; timer.start(); do { QCoreApplication::processEvents(QEventLoop::AllEvents, ms); QTest::qSleep(); } while (timer.elapsed() < ms); } ...其实没什么魔力,对吧?但是因为它QTest模块,所以在程序中我们不要使用它。QEventLoop配合QTimer使用局部的 eventLoop 也是一个不错的选择。例子: QEventLoop eventloop; QTimer::singleShot(, &eventloop, SLOT(quit())); eventloop.exec();QTimer 和 QBasicTimer这两个和本文没有什么直接关系,QTimer估计大家都很熟了。而QBasicTimer估计很少有人用。与QTimer相比,QBasicTimer更快速、轻量、底层。与QTimer相比,它不是QObject的派生类。跨平台的sleep尽管一开始我们就说了,不需要这个东西。但不排除某种场合下,你确实需要这个东西。橹械暮��芗虻ィ╳indows下调用Sleep,其他平台调用 nanosleep):void QTest::qSleep(int ms) { QTEST_ASSERT(ms > 0); #ifdef Q_OS_WIN Sleep(uint(ms)); #else struct timespec ts = { ms / , (ms % ) * * }; nanosleep(&ts, NULL); #endif }看QThread的源码,windows下同样直接调用Sleep,但非windows的实现比这个就复杂多了:/* \internal helper function to do thread sleeps, since usleep()/nanosleep() aren't reliable enough (in terms of behavior and availability)*/staticvoidthread_sleep(structtimespec *ti){ pthread_mutex_tmtx;pthread_cond_tcnd;pthread_mutex_init(&mtx, 0);pthread_cond_init(&cnd, 0);pthread_mutex_lock(&mtx); (void) pthread_cond_timedwait(&cnd, &mtx, ti);pthread_mutex_unlock(&mtx);pthread_cond_destroy(&cnd);pthread_mutex_destroy(&mtx);}voidQThread::sleep(unsignedlongsecs){ structtimevaltv;gettimeofday(&tv, 0);structtimespecti;ti.tv_sec = tv.tv_sec + secs;ti.tv_nsec = (tv.tv_usec * );thread_sleep(&ti);}

LockSupport的park等待的底层实现

       ä»Žä¸Šä¸€ç¯‡æ–‡ç« ä¸­çš„JDK的延迟队列中,最终是通过LockSupport.park实现线程的等待,那么底层是如何实现等待和超时等待的?本文我们来探讨一下。

LockSupport的park和unpark的方法publicstaticvoidpark(){ UNSAFE.park(false,0L);}publicstaticvoidparkNanos(longnanos){ if(nanos>0)UNSAFE.park(false,nanos);}publicstaticvoidunpark(Threadthread){ if(thread!=null)UNSAFE.unpark(thread);}

       ä»Žä¸Šé¢å¯ä»¥çœ‹åˆ°å®žé™…LockSupport.park是通过Unsafe的的park方法实现,从下面的方法可以看出这个是一个native方法.

/***Blockscurrentthread,returningwhenabalancing*{ @codeunpark}occurs,orabalancing{ @codeunpark}has*alreadyoccurred,orthethreadisinterrupted,or,ifnot*absoluteandtimeisnotzero,thegiventimenanosecondshave*elapsed,orifabsolute,thegivendeadlineinmilliseconds*sinceEpochhaspassed,orspuriously(i.e.,returningforno*"reason").Note:ThisoperationisintheUnsafeclassonly*because{ @codeunpark}is,soitwouldbestrangetoplaceit*elsewhere.*/publicnativevoidpark(booleanisAbsolute,longtime);JVM的Unsafe的park方法

       ä»Žä¸‹é¢JDK中代码中可以thread的Parker的对象的park方法进行一段时间的等待。

UNSAFE_ENTRY(void,Unsafe_Park(JNIEnv*env,jobjectunsafe,jbooleanisAbsolute,jlongtime)){ HOTSPOT_THREAD_PARK_BEGIN((uintptr_t)thread->parker(),(int)isAbsolute,time);EventThreadParkevent;JavaThreadParkedStatejtps(thread,time!=0);thread->parker()->park(isAbsolute!=0,time);if(event.should_commit()){ constoopobj=thread->current_park_blocker();if(time==0){ post_thread_park_event(&event,obj,min_jlong,min_jlong);}else{ if(isAbsolute!=0){ post_thread_park_event(&event,obj,min_jlong,time);}else{ post_thread_park_event(&event,obj,time,min_jlong);}}}HOTSPOT_THREAD_PARK_END((uintptr_t)thread->parker());}UNSAFE_END

       Thread.hpp的文件中内部定义的Park对象

private:Parker_parker;public:Parker*parker(){ return&_parker;}

       ä¸‹é¢æ˜¯Os_posix.cpp中是Linux中实现的Park的park的实现方式

       é¦–先将_counter的变量通过CAS设置为0,返回就旧的值,如果之前是大于0,则说明是允许访问,不用阻塞,直接返回。

       èŽ·å–当前线程。

       åˆ¤æ–­çº¿ç¨‹æ˜¯å¦æ˜¯ä¸­æ–­ä¸­ï¼Œå¦‚果是,则直接返回,(也就是说线程处于中断状态下会忽略park,不会阻塞等待)

       åˆ¤æ–­å¦‚果传入的time参数小于0 或者 是绝对时间并且time是0,则直接返回,(上面的Unsafe调用park传入的参数是 false、0,所以不满足这种情况)

       å¦‚æžœtime大于0,则转换成绝对时间。

       åˆ›å»ºThreadBlockInVM对象,并且调用pthread_mutex_trylock获取线程互斥锁,如果没有获取到锁,则直接返回,

       åˆ¤æ–­_counter变量是否大于0,如果是,则重置_counter为0,释放线程锁,直接返回。

       è°ƒç”¨ OrderAccess::fence(); 加入内存屏障,禁止指令重排序,确保加锁和释放锁的指令的顺序。

       åˆ›å»ºOSThreadWaitState对象,

       åˆ¤æ–­time是否大于0,如果是0,则调用pthread_cond_wait进行等待,如果不是0,然后调用pthread_cond_timedwait进行时间参数为absTime的等待,

       è°ƒç”¨pthread_mutex_unlock进行释放_mutex锁,

       å†æ¬¡è°ƒç”¨OrderAccess::fence()禁止指令重排序。

//Parker::parkdecrementscountif>0,elsedoesacondvarwait.Unpark//setscountto1andsignalscondvar.Onlyonethreadeverwaits//onthecondvar.Contentionseenwhentryingtoparkimpliesthatsomeone//isunparkingyou,sodon'twait.Andspuriousreturnsarefine,sothere//isnoneedtotracknotifications.voidParker::park(boolisAbsolute,jlongtime){ //Optionalfast-pathcheck://Returnimmediatelyifapermitisavailable.//WedependonAtomic::xchg()havingfullbarriersemantics//sincewearedoingalock-freeupdateto_counter.if(Atomic::xchg(&_counter,0)>0)return;JavaThread*jt=JavaThread::current();//Optionaloptimization--avoidstatetransitionsifthere's//aninterruptpending.if(jt->is_interrupted(false)){ return;}//Next,demultiplex/decodetimeargumentsstructtimespecabsTime;if(time<0||(isAbsolute&&time==0)){ //don'twaitatallreturn;}if(time>0){ to_abstime(&absTime,time,isAbsolute,false);}//Entersafepointregion//Bewareofdeadlockssuchas.//Theper-threadParker::mutexisaclassicleaf-lock.//InparticularathreadmustneverblockontheThreads_lockwhile//holdingtheParker::mutex.Ifsafepointsarependingboththe//theThreadBlockInVM()CTORandDTORmaygrabThreads_lock.ThreadBlockInVMtbivm(jt);//Can'taccessinterruptstatenowthatweare_thread_blocked.Ifwe've//beeninterruptedsincewecheckedabovethen_counterwillbe>0.//Don'twaitifcannotgetlocksinceinterferencearisesfrom//unparking.if(pthread_mutex_trylock(_mutex)!=0){ return;}intstatus;if(_counter>0){ //nowaitneeded_counter=0;status=pthread_mutex_unlock(_mutex);assert_status(status==0,status,"invariant");//Paranoiatoensureourlockedandlock-freepathsinteract//correctlywitheachotherandJava-levelaccesses.OrderAccess::fence();return;}OSThreadWaitStateosts(jt->osthread(),false/*notObject.wait()*/);assert(_cur_index==-1,"invariant");if(time==0){ _cur_index=REL_INDEX;//arbitrarychoicewhennottimedstatus=pthread_cond_wait(&_cond[_cur_index],_mutex);assert_status(status==0MACOS_ONLY(||status==ETIMEDOUT),status,"cond_wait");}else{ _cur_index=isAbsolute?ABS_INDEX:REL_INDEX;status=pthread_cond_timedwait(&_cond[_cur_index],_mutex,&absTime);assert_status(status==0||status==ETIMEDOUT,status,"cond_timedwait");}_cur_index=-1;_counter=0;status=pthread_mutex_unlock(_mutex);assert_status(status==0,status,"invariant");//Paranoiatoensureourlockedandlock-freepathsinteract//correctlywitheachotherandJava-levelaccesses.OrderAccess::fence();Linux操作系统是如何实现pthread_cond_timedwait进行时间等待的

       pthread_cond_timedwait函数位于glibc中pthread_cond_wait.c, 可以看到是调用__pthread_cond_wait_common实现

/*See__pthread_cond_wait_common.*/int___pthread_cond_timedwait(pthread_cond_t*cond,pthread_mutex_t*mutex,conststruct__timespec*abstime){ /*Checkparametervalidity.ThisshouldalsotellthecompilerthatitcanassumethatabstimeisnotNULL.*/if(!valid_nanoseconds(abstime->tv_nsec))returnEINVAL;/*RelaxedMOissufficebecauseclockIDbitisonlymodifiedinconditioncreation.*/unsignedintflags=atomic_load_relaxed(&cond->__data.__wrefs);clockid_tclockid=(flags&__PTHREAD_COND_CLOCK_MONOTONIC_MASK)?CLOCK_MONOTONIC:CLOCK_REALTIME;return__pthread_cond_wait_common(cond,mutex,clockid,abstime);}

       ä¸‹é¢__pthread_cond_wait_common是实现通过__futex_abstimed_wait_cancelable实现时间等待

static__always_inlineint__pthread_cond_wait_common(pthread_cond_t*cond,pthread_mutex_t*mutex,clockid_tclockid,conststruct__timespec*abstime){ ''省略''`err=__futex_abstimed_wait_cancelable(cond->__data.__g_signals+g,0,clockid,abstime,private);''省略''`}

       __futex_abstimed_wait_cancelable是调用__futex_abstimed_wait_common

int__futex_abstimed_wait_cancelable(unsignedint*futex_word,unsignedintexpected,clockid_tclockid,conststruct__timespec*abstime,intprivate){ return__futex_abstimed_wait_common(futex_word,expected,clockid,abstime,private,true);}

       __futex_abstimed_wait_common下面则是通过判断平台是位或者位,调用__futex_abstimed_wait_common或者__futex_abstimed_wait_common

staticint__futex_abstimed_wait_common(unsignedint*futex_word,unsignedintexpected,clockid_tclockid,conststruct__timespec*abstime,intprivate,boolcancel){ interr;unsignedintclockbit;/*Workaroundthefactthatthekernelrejectsnegativetimeoutvaluesdespitethembeingvalid.*/if(__glibc_unlikely((abstime!=NULL)&&(abstime->tv_sec<0)))returnETIMEDOUT;if(!lll_futex_supported_clockid(clockid))returnEINVAL;clockbit=(clockid==CLOCK_REALTIME)?FUTEX_CLOCK_REALTIME:0;intop=__lll_private_flag(FUTEX_WAIT_BITSET|clockbit,private);#ifdef__ASSUME_TIME_SYSCALLSerr=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);#elseboolneed_time=abstime!=NULL&&!in_time_t_range(abstime->tv_sec);if(need_time){ err=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);if(err==-ENOSYS)err=-EOVERFLOW;}elseerr=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);#endifswitch(err){ case0:case-EAGAIN:case-EINTR:case-ETIMEDOUT:case-EINVAL:case-EOVERFLOW:/*Passedabsolutetimeoutusesbittime_ttype,butunderlyingkerneldoesnotsupportbittime_tfutexsyscalls.*/return-err;case-EFAULT:/*Musthavebeencausedbyaglibcorapplicationbug.*/case-ENOSYS:/*Musthavebeencausedbyaglibcbug.*//*Noothererrorsaredocumentedatthistime.*/default:futex_fatal_error();}}

       __futex_abstimed_wait_common是调用INTERNAL_SYSCALL_CANCEL宏定义实现

staticint__futex_abstimed_wait_common(unsignedint*futex_word,unsignedintexpected,intop,conststruct__timespec*abstime,intprivate,boolcancel){ if(cancel)returnINTERNAL_SYSCALL_CANCEL(futex_time,futex_word,op,expected,abstime,NULL/*Unused.*/,FUTEX_BITSET_MATCH_ANY);elsereturnINTERNAL_SYSCALL_CALL(futex_time,futex_word,op,expected,abstime,NULL/*Ununsed.*/,FUTEX_BITSET_MATCH_ANY);}

       ç³»ç»Ÿè°ƒç”¨çš„的宏定义

/***Blockscurrentthread,returningwhenabalancing*{ @codeunpark}occurs,orabalancing{ @codeunpark}has*alreadyoccurred,orthethreadisinterrupted,or,ifnot*absoluteandtimeisnotzero,thegiventimenanosecondshave*elapsed,orifabsolute,thegivendeadlineinmilliseconds*sinceEpochhaspassed,orspuriously(i.e.,returningforno*"reason").Note:ThisoperationisintheUnsafeclassonly*because{ @codeunpark}is,soitwouldbestrangetoplaceit*elsewhere.*/publicnativevoidpark(booleanisAbsolute,longtime);0总结

       ä¸»è¦å¯¹LockSupport的park等待实现的底层实现的浅析,针对于Linux的系统调用还没有找到源码,后续会继续跟踪,希望有读者知道的满帆可以告知下,谢谢。

链接:/post/

多线程死锁检测的分析与实现(linux c)-有向图的应用

       多线程死锁检测与有向图的应用分析与实现

       死锁的检测与避免是多线程开发中关键的挑战。本文旨在深入理解死锁产生的源码条件、原因,源码以及如何通过有向图模型检测多线程程序中的源码死锁问题。

       ### 一、源码死锁的源码google源码版产生与检测

       死锁指的是多个进程在运行过程中因争夺资源而形成的一种僵局,当进程处于这种状态时,源码若无外力干预,源码它们将无法继续推进执行。源码通过分析死锁产生的源码条件,我们可以发现其根源主要在于竞争资源和进程间推进顺序的源码非法性。利用有向图模型,源码我们可以直观地检测多线程程序中是源码否存在死锁。

       ### 二、源码死锁条件与分析

       死锁的源码隐私雷达源码条件包括:

       - **互斥条件**:资源需要排它性控制,即一段时间内资源仅为一个进程所使用。

       - **请求与保持条件**:进程请求资源时,对已获得的资源保持不释放。

       - **不剥夺条件**:进程持有的资源在未使用完毕前不能被剥夺,只能在释放后由自己管理。

       - **环路等待条件**:系统中存在一个进程-资源的环形依赖结构。

       ### 三、死锁检测方法

       检测死锁的关键在于识别系统中是否存在环形依赖关系。当多线程程序中存在环形锁依赖关系时,即构成了死锁问题。我们可以通过维护有向图的状态,即在加锁前、加锁后、释放锁后进行状态更新,彩虹2源码以此来判断是否有死锁发生。

       ### 四、实现步骤与代码示例

       在检测死锁时,可以通过自定义函数接口,如`pthread_mutex_lock`和`pthread_mutex_unlock`,在调用这些系统API之前和之后,实现加锁前和加锁后操作,以及解锁后清除对应关系。这不仅实现了检测功能,同时也使用户无需改变原有的编程习惯。

       ### 五、图结构与遍历

       对于死锁问题,图模型提供了一种有效的可视化手段。图中的股公式源码节点代表线程和锁,边表示锁的占有关系。通过深度优先遍历(DFS)算法,我们可以检测图中是否存在环路,从而判断是否存在死锁。

       ### 六、源代码实现

       通过编译`deadlock.c`文件,使用`gcc`命令链接`-lpthread`和`-ldl`库后执行,可以生成可执行文件`deadlock`。此步骤验证了死锁检测算法的正确性和有效性。

       ### 结论

       本文通过深入分析死锁产生的原因、条件,以及如何利用有向图模型进行检测,为开发者提供了理论与实践相结合的指导。通过代码实现,T BOX源码不仅解决了多线程死锁检测的问题,也为避免死锁提供了实用的方法。未来,随着多线程应用的日益普及,这种检测方法将发挥越来越重要的作用。

如何让 Qt 的程序使用 Sleep

       Qt 为何没有提供 Sleep

       è®ºå›ä¸Šä¸æ—¶è§åˆ°æœ‰äººé—®ï¼š

       Qt 为什么没有提供跨平台的 sleep 函数?

       ä½¿ç”¨å¹³å°ç›¸å…³çš„ Sleep 或 nanosleep 以后,界面为什么没有反应?

       QThread 中提供了protected 权限的 sleep 函数,如何用到主线程中?

       ä½¿ç”¨ QTest 中的 qSleep,在windows下如何隐藏控制台?

       è¿™äº›é—®é¢˜å…¶å®žå½’结为一点:在主线程中使用这些函数是一种错误,这会直接导致界面无法刷新,用户与程序无法交互。

       Qt不提供,是因为你不需要在主线程中使用 sleep 函数。

       å¦‚何让程序等待一段时间

       QTime

       QTime t;

       t.start();

       while(t.elapsed()<);

       è¿™ç§æ­»å¾ªçŽ¯ä¹Ÿæ˜¯ä¸€ç§å¸¸è§é”™è¯¯ç”¨æ³•ã€‚但改成正确的还是比较简单的:

       QTime t;

       t.start();

       while(t.elapsed()<)

        QCoreApplication::processEvents();

       ä¸åœåœ°å¤„理事件,以使得程序保持响应。

       QElapsedTimer

       è¿™æ˜¯Qt4.7引入的新的类,和QTime相比,它提供了更快的计算 elapsed 时间的方法。

       QElapsedTimer t;

       t.start();

       while(t.elapsed()<)

        QCoreApplication::processEvents();

       QTest::qWait

       è¿™æ˜¯QTest模块提供的等待函数

       ä¸‹é¢æ˜¯å…¶æºä»£ç ï¼ˆå’Œæˆ‘们前面的代码很像吧?):

       namespace QTest

       {

        inline static void qWait(int ms)

        {

        Q_ASSERT(QCoreApplication::instance());

        QElapsedTimer timer;

        timer.start();

        do {

        QCoreApplication::processEvents(QEventLoop::AllEvents, ms);

        QTest::qSleep();

        } while (timer.elapsed() < ms);

        }

       ...

       å…¶å®žæ²¡ä»€ä¹ˆé­”力,对吧?但是因为它QTest模块,所以在程序中我们不要使用它。

       QEventLoop

       é…åˆQTimer使用局部的 eventLoop 也是一个不错的选择。例子:

        QEventLoop eventloop;

        QTimer::singleShot(, &eventloop, SLOT(quit()));

        eventloop.exec();

       QTimer 和 QBasicTimer

       è¿™ä¸¤ä¸ªå’Œæœ¬æ–‡æ²¡æœ‰ä»€ä¹ˆç›´æŽ¥å…³ç³»ï¼ŒQTimer估计大家都很熟了。而QBasicTimer估计很少有人用。

       ä¸ŽQTimer相比,QBasicTimer更快速、轻量、底层。

       ä¸ŽQTimer相比,它不是QObject的派生类。

       è·¨å¹³å°çš„sleep

       å°½ç®¡ä¸€å¼€å§‹æˆ‘们就说了,不需要这个东西。但不排除某种场合下,你确实需要这个东西。如何实现一个跨平台的 sleep 呢?

       æˆ‘们一开始也提到了,QThreadç±» 和 QTest模块都提供了sleep函数,其实我们只需要看看他们的源码就够了:

       QTest 模块中的函数很简单(windows下调用Sleep,其他平台调用 nanosleep):

       void QTest::qSleep(int ms)

       {

        QTEST_ASSERT(ms > 0);

       #ifdef Q_OS_WIN

        Sleep(uint(ms));

       #else

        struct timespec ts = { ms / , (ms % ) * * };

        nanosleep(&ts, NULL);

       #endif

       }

       çœ‹QThread的源码,windows下同样直接调用Sleep,但非windows的实现比这个就复杂多了:

       [cpp] view plain copy

       /* /internal

        helper function to do thread sleeps, since usleep()/nanosleep()

        aren't reliable enough (in terms of behavior and availability)

       */

       static void thread_sleep(struct timespec *ti)

       {

        pthread_mutex_t mtx;

        pthread_cond_t cnd;

        pthread_mutex_init(&mtx, 0);

        pthread_cond_init(&cnd, 0);

        pthread_mutex_lock(&mtx);

        (void) pthread_cond_timedwait(&cnd, &mtx, ti);

        pthread_mutex_unlock(&mtx);

        pthread_cond_destroy(&cnd);

        pthread_mutex_destroy(&mtx);

       }

       void QThread::sleep(unsigned long secs)

       {

        struct timeval tv;

        gettimeofday(&tv, 0);

        struct timespec ti;

        ti.tv_sec = tv.tv_sec + secs;

        ti.tv_nsec = (tv.tv_usec * );

        thread_sleep(&ti);

       }

如何杀掉空闲事务

        本文内容遵从CC版权协议, 可以随意转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明网址: /tech/database/how_to_kill_idle_trx.html 我们经常遇到一个情况,就是网络断开或程序Bug导致COMMIT/ROLLBACK语句没有传到数

        本文内容遵从CC版权协议, 可以随意转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明网址: /tech/database/how_to_kill_idle_trx.html

       我们经常遇到一个情况,就是网络断开或程序Bug导致COMMIT/ROLLBACK语句没有传到数据库,也没有释放线程,但是线上事务锁定等待严重,连接数暴涨,尤其在测试库这种情况很多,线上也偶有发生,于是想为MySQL增加一个杀掉空闲事务的功能。

       那么如何实现呢,通过MySQL Server层有很多不确定因素,最保险还是在存储引擎层实现,我们用的几乎都是InnoDB/XtraDB,所以就基于Percona来修改了,Oracle版的MySQL也可以照着修改。

       需求:

       1. 一个事务启动,如果事务内最后一个语句执行完超过一个时间(innodb_idle_trx_timeout),就应该关闭链接。

       2. 如果事务是纯读事务,因为不加锁,所以无害,不需要关闭,保持即可。

       虽然这个思路被Percona的指出Alexey Kopytov可能存在“Even though SELECT queries do not place row locks by default (there are exceptions), they can still block undo log records from being purged.”的问题,但是我们确实有场景SELECT是绝对不能kill的,除非之后的INSERT/UPDATE/DELETE发生了,所以我根据我们的业务特点来修改。

       跟Percona的Yasufumi Kinoshita和Alexey Kopytov提出过纯SELECT事务不应被kill,但通过一个参数控制的方案还没有被Alexey Kopytov接受,作为通用处理我提出了用两个变量分别控制纯读事务的空闲超时时间和有锁事务的空闲超时时间,还在等待Percona的回复,因为这个方案还在测试,就先不开放修改了,当然如果你很熟悉MYSQL源码,我提出这个思路你肯定知道怎么分成这两个参数控制了。

       根据这两个需求我们来设计方法,首先想到这个功能肯定是放在InnoDB Master Thread最方便,Master Thread每秒调度一次,可以顺便检查空闲事务,然后关闭,因为在事务中操作trx->mysql_thd并不安全,所以一般来说最好在InnoDB层换成Thread ID操作,并且InnoDB中除了ha_innodb.cc,其他地方不能饮用THD,所以Master Thread中需要的线程数值,都需要在ha_innodb中计算好传递整型或布尔型返回值给master thread调用。

       首先,我们要增加一个参数:idle_trx_timeout,它表示事务多久没有下一条语句发生就超时关闭。

       在storage/innodb_plugin/srv/srv0srv.c的“/* plugin options */”注释下增加如下代码注册idle_trx_timeout变量。

       static MYSQL_SYSVAR_LONG(idle_trx_timeout, srv_idle_trx_timeout,

        PLUGIN_VAR_RQCMDARG,

        "If zero then this function no effect, if no-zero then wait idle_trx_timeout seconds this transaction will be closed",

        "Seconds of Idle-Transaction timeout",

        NULL, NULL, 0, 0, LONG_MAX, 0);

       代码往下找在innobase_system_variables结构体内加上:

       MYSQL_SYSVAR(idle_trx_timeout),

       有了这个变量,我们需要在Master Thread(storage/innodb_plugin/srv/srv0srv.c )中执行检测函数查找空闲事务。在loop循环的if (sync_array_print_long_waits(&waiter, &sema)判断后加上这段判断

        if (srv_idle_trx_timeout && trx_sys) {

        trx_t* trx;

        time_t now;

       rescan_idle:

        now = time(NULL);

        mutex_enter(&kernel_mutex);

        trx = UT_LIST_GET_FIRST(trx_sys->mysql_trx_list); # 从当前事务列表里获取第一个事务

        while (trx) { # 依次循环每个事务进行检查

        if (trx->conc_state == TRX_ACTIVE

        && trx->mysql_thd

        && innobase_thd_is_idle(trx->mysql_thd)) { # 如果事务还活着并且它的状态时空闲的

        ib_int_t start_time = innobase_thd_get_start_time(trx->mysql_thd); # 获取线程最后一个语句的开始时间

        ulong thd_id = innobase_thd_get_thread_id(trx->mysql_thd); #获取线程ID,因为存储引擎内直接操作THD不安全

        if (trx->last_stmt_start != start_time) { # 如果事务最后语句起始时间不等于线程最后语句起始时间说明事务是新起的

        trx->idle_start = now; # 更新事务的空闲起始时间

        trx->last_stmt_start = start_time; # 更新事务的最后语句起始时间

        } else if (difftime(now, trx->idle_start) # 如果事务不是新起的,已经执行了一部分则判断空闲时间有多长了

        > srv_idle_trx_timeout) { # 如果空闲时间超过阈值则杀掉链接

        /* kill the session */

        mutex_exit(&kernel_mutex);

        thd_kill(thd_id); # 杀链接

        goto rescan_idle;

        }

        }

        trx = UT_LIST_GET_NEXT(mysql_trx_list, trx); # 检查下一个事务

        }

        mutex_exit(&kernel_mutex);

        }

       其中trx中的变量是新加的,在storage/innodb_plugin/include/trx0trx.h的trx_truct加上需要的变量:

       struct trx_struct{

       ...

        time_t idle_start;

        ib_int_t last_stmt_start;

       ...

       }

       这里有几个函数是自定义的:

       ibool innobase_thd_is_idle(const void* thd);

       ib_int_t innobase_thd_get_start_time(const void* thd);

       ulong innobase_thd_get_thread_id(const void* thd);

       这些函数在ha_innodb.cc中实现,需要在storage/innodb_plugin/srv/srv0srv.c头文件定义下加上这些函数的引用形势。

       然后在storage/innodb_plugin/handler/ha_innodb.cc 中定义这些函数的实现:

       extern "C"

       ibool

       innobase_thd_is_idle(

        const void* thd) /*!{

        return(((const THD*)thd)->command == COM_SLEEP);

       }

       extern "C"

       ib_int_t

       innobase_thd_get_start_time(

        const void* thd) /*!{

        return((ib_int_t)((const THD*)thd)->start_time);

       }

       extern "C"

       ulong

       innobase_thd_get_thread_id(

        const void* thd)

       {

        return(thd_get_thread_id((const THD*) thd));

       }

       还有最重要的thd_kill函数负责杀线程的,在sql/sql_class.cc中,找个地方定义这个函数:

       void thd_kill(ulong id)

       {

        THD *tmp;

        VOID(pthread_mutex_lock(&LOCK_thread_count));

        I_List_iterator it(threads);

        while ((tmp=it++))

        {

        if (tmp->command == COM_DAEMON || tmp->is_have_lock_thd == 0 ) # 如果是DAEMON线程和不含锁的线程就不要kill了

        continue;

        if (tmp->thread_id == id)

        {

        pthread_mutex_lock(&tmp->LOCK_thd_data);

        break;

        }

        }

        VOID(pthread_mutex_unlock(&LOCK_thread_count));

        if (tmp)

        {

        tmp->awake(THD::KILL_CONNECTION);

        pthread_mutex_unlock(&tmp->LOCK_thd_data);

        }

       }

       为了存储引擎能引用到这个函数,我们要把它定义到plugin中:

       include/mysql/plugin.h和include/mysql/plugin.h中加上

       void thd_kill(unsigned long id);

       如何判定线程的is_have_lock_thd值?首先在THD中加上这个变量(sql/sql_class.cc):

       class THD :public Statement,

        public Open_tables_state

       {

       ....

        uint is_have_lock_thd;

       ....

       }

       然后在SQL的必经之路mysql_execute_command拦上一刀,判断是有锁操作发生了还是事务提交或新起事务。

        switch (lex->sql_command) {

        case SQLCOM_REPLACE:

        case SQLCOM_REPLACE_SELECT:

        case SQLCOM_UPDATE:

        case SQLCOM_UPDATE_MULTI:

        case SQLCOM_DELETE:

        case SQLCOM_DELETE_MULTI:

        case SQLCOM_INSERT:

        case SQLCOM_INSERT_SELECT:

        thd->is_have_lock_thd = 1;

        break;

        case SQLCOM_COMMIT:

        case SQLCOM_ROLLBACK:

        case SQLCOM_XA_START:

        case SQLCOM_XA_END:

        case SQLCOM_XA_PREPARE:

        case SQLCOM_XA_COMMIT:

        case SQLCOM_XA_ROLLBACK:

        case SQLCOM_XA_RECOVER:

        thd->is_have_lock_thd = 0;

        break;

        }

       为了尽可能兼容Percona的补丁,能引用的都引用了Percona的操作,有些函数调用是在层次太多看不下去了就简化了。

       另外还有一个版本是我自己弄的,在THD中增加了一个last_sql_end_time,在do_command结束后更新last_sql_end_time,然后在事务中拿到THD查看last_sql_end_time就可以得出idle时间,Oracle版我还是建议这么做,不要去改trx_struct结构体了,那个感觉更危险。

更多内容请点击【焦点】专栏