Java手寫線程池之向JDK線程池進發(fā)_第1頁
Java手寫線程池之向JDK線程池進發(fā)_第2頁
Java手寫線程池之向JDK線程池進發(fā)_第3頁
Java手寫線程池之向JDK線程池進發(fā)_第4頁
Java手寫線程池之向JDK線程池進發(fā)_第5頁
已閱讀5頁,還剩4頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認(rèn)領(lǐng)

文檔簡介

第Java手寫線程池之向JDK線程池進發(fā)private

AtomicInteger

ct

=

new

AtomicInteger(0);

//

當(dāng)前在執(zhí)行任務(wù)的線程個數(shù)

private

int

corePoolSize;

private

int

maximumPoolSize;

private

long

keepAliveTime;

private

TimeUnit

unit;

private

BlockingQueueRunnable

taskQueue;

private

RejectPolicy

policy;

private

ArrayListWorker

workers

=

new

ArrayList();

private

volatile

boolean

isStopped;

private

boolean

useTimed;

public

int

getCt()

{

return

ct.get();

public

ThreadPool(int

corePoolSize,

int

maximumPoolSize,

TimeUnit

unit,

long

keepAliveTime,

RejectPolicy

policy

,

int

maxTasks)

{

//

please

add

-ea

to

vm

options

to

make

assert

keyword

enable

assert

corePoolSize

assert

maximumPoolSize

assert

keepAliveTime

assert

maxTasks

this.corePoolSize

=

corePoolSize;

this.maximumPoolSize

=

maximumPoolSize;

this.unit

=

unit;

this.policy

=

policy;

this.keepAliveTime

=

keepAliveTime;

taskQueue

=

new

ArrayBlockingQueueRunnable(maxTasks);

useTimed

=

keepAliveTime

!=

0;

/**

*

@param

runnable

需要被執(zhí)行的任務(wù)

*

@param

max

是否使用

maximumPoolSize

*

@return

boolean

*/

public

synchronized

boolean

addWorker(Runnable

runnable,

boolean

max)

{

if

(ct.get()

=

corePoolSize

!max)

return

false;

if

(ct.get()

=

maximumPoolSize

max)

return

false;

Worker

worker

=

new

Worker(runnable);

workers.add(worker);

Thread

thread

=

new

Thread(worker,

"ThreadPool-"

+

"Thread-"

+

ct.addAndGet(1));

thread.start();

return

true;

//

下面這個方法是向線程池提交任務(wù)

public

void

execute(Runnable

runnable)

throws

InterruptedException

{

checkPoolState();

if

(addWorker(runnable,

false)

//

如果能夠加入新的線程執(zhí)行任務(wù)

加入成功就直接返回

||

!taskQueue.offer(runnable)

//

如果

taskQueue.offer(runnable)

返回

false

說明提交任務(wù)失敗

任務(wù)隊列已經(jīng)滿了

||

addWorker(runnable,

true))

//

使用能夠使用的最大的線程數(shù)

(maximumPoolSize)

看是否能夠產(chǎn)生新的線程

return;

//

如果任務(wù)隊列滿了而且不能夠加入新的線程

則拒絕這個任務(wù)

if

(!taskQueue.offer(runnable))

reject(runnable);

private

void

reject(Runnable

runnable)

throws

InterruptedException

{

switch

(policy)

{

case

ABORT:

throw

new

RuntimeException("task

queue

is

full");

case

CALLER_RUN:

runnable.run();

case

DISCARD:

return;

case

DISCARD_OLDEST:

//

放棄等待時間最長的任務(wù)

taskQueue.poll();

execute(runnable);

}

private

void

checkPoolState()

{

if

(isStopped)

{

//

如果線程池已經(jīng)停下來了,就不在向任務(wù)隊列當(dāng)中提交任務(wù)了

throw

new

RuntimeException("thread

pool

has

been

stopped,

so

quit

submitting

task");

}

public

V

RunnableFutureV

submit(CallableV

task)

throws

InterruptedException

{

checkPoolState();

FutureTaskV

futureTask

=

new

FutureTask(task);

execute(futureTask);

return

futureTask;

//

強制關(guān)閉線程池

public

synchronized

void

stop()

{

isStopped

=

true;

for

(Worker

worker

:

workers)

{

worker.stopWorker();

}

public

synchronized

void

shutDown()

{

//

先表示關(guān)閉線程池

線程就不能再向線程池提交任務(wù)

isStopped

=

true;

//

先等待所有的任務(wù)執(zhí)行完成再關(guān)閉線程池

waitForAllTasks();

stop();

private

void

waitForAllTasks()

{

//

當(dāng)線程池當(dāng)中還有任務(wù)的時候

就不退出循環(huán)

while

(taskQueue.size()

0)

{

Thread.yield();

try

{

Thread.sleep(1000);

}

catch

(InterruptedException

e)

{

e.printStackTrace();

}

}

class

Worker

implements

Runnable

{

private

Thread

thisThread;

private

final

Runnable

firstTask;

private

volatile

boolean

isStopped;

public

Worker(Runnable

firstTask)

{

this.firstTask

=

firstTask;

}

@Override

public

void

run()

{

//

先執(zhí)行傳遞過來的第一個任務(wù)

這里是一個小的優(yōu)化

讓線程直接執(zhí)行第一個任務(wù)

不需要

//

放入任務(wù)隊列再取出來執(zhí)行了

firstTask.run();

thisThread

=

Thread.currentThread();

while

(!isStopped)

{

try

{

Runnable

task

=

useTimed

taskQueue.poll(keepAliveTime,

unit)

:

taskQueue.take();

if

(task

==

null)

{

int

i;

boolean

exit

=

true;

if

(ct.get()

corePoolSize)

{

do{

i

=

ct.get();

if

(i

=

corePoolSize)

{

exit

=

false;

break;

}

}while

(!pareAndSet(i,

i

-

1));

if

(exit)

{

return;

}

}

}else

{

task.run();

}

}

catch

(InterruptedException

e)

{

//

do

nothing

}

}

}

public

synchronized

void

stopWorker()

{

if

(isStopped)

{

throw

new

RuntimeException("thread

has

been

interrupted");

}

isStopped

=

true;

thisTerrupt();

}

線程池測試

package

cscore.concurrent.java.threadpoolv2;

import

java.util.concurrent.ExecutionException;

import

java.util.concurrent.RunnableFuture;

import

java.util.concurrent.TimeUnit;

public

class

Test

{

public

static

void

main(String[]

args)

throws

InterruptedException,

ExecutionException

{

var

pool

=

new

ThreadPool(2,

5,

TimeUnit.SECONDS,

10,

RejectPolicy.ABORT,

100000);

for

(int

i

=

0;

i

i++)

{

RunnableFutureInteger

submit

=

pool.submit(()

-

{

System.out.println(Thread.currentThread().getName()

+

"

output

a");

try

{

Thread.sleep(10);

}

catch

(InterruptedException

e)

{

e.printStackTrace();

}

return

0;

});

System.out.println(submit.get());

}

int

n

=

15;

while

(n--

0)

{

System.out.println("Number

Threads

=

"

+

pool.getCt());

Thread.sleep(1000);

}

pool.shutDown();

上面測試代碼的輸出結(jié)果如下所示:

ThreadPool-Thread-2outputa

ThreadPool-Thread-1outputa

ThreadPool-Thread-3outputa

ThreadPool-Thread-4outputa

NumberThreads=5

ThreadPool-Thread-5outputa

ThreadPool-Thread-2outputa

ThreadPool-Thread-1outputa

ThreadPool-Thread-3outputa

ThreadPool-Thread-4outputa

ThreadPool-Thread-5outputa

ThreadPool-Thread-2outputa

ThreadPool-Thread-1outputa

ThreadPool-Thread-4outputa

Thr

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論