日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

作者:京東零售 張賓

1.背景

在后臺(tái)開(kāi)發(fā)中,會(huì)經(jīng)常用到線程池技術(shù),對(duì)于線程池核心參數(shù)的配置很大程度上依靠經(jīng)驗(yàn)。然而,由于系統(tǒng)運(yùn)行過(guò)程中存在的不確定性,我們很難一勞永逸地規(guī)劃一個(gè)合理的線程池參數(shù)。在對(duì)線程池配置參數(shù)進(jìn)行調(diào)整時(shí),一般需要對(duì)服務(wù)進(jìn)行重啟,這樣修改的成本就會(huì)偏高。一種解決辦法就是,將線程池的配置放到配置平臺(tái)側(cè),系統(tǒng)運(yùn)行期間開(kāi)發(fā)人員根據(jù)系統(tǒng)運(yùn)行情況對(duì)核心參數(shù)進(jìn)行動(dòng)態(tài)配置。

本文以公司DUCC配置平臺(tái)作為服務(wù)配置中心,以修改線程池核心線程數(shù)、最大線程數(shù)為例,實(shí)現(xiàn)一個(gè)簡(jiǎn)單的動(dòng)態(tài)化線程池。

2.代碼實(shí)現(xiàn)

當(dāng)前項(xiàng)目中使用的是Spring 框架提供的線程池類ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底層又使用里了JDK中線程池類ThreadPoolExecutor,線程池類ThreadPoolExecutor有兩個(gè)成員方法setCorePoolSize、setMaximumPoolSize可以在運(yùn)行時(shí)設(shè)置核心線程數(shù)和最大線程數(shù)。

setCorePoolSize方法執(zhí)行流程是:首先會(huì)覆蓋之前構(gòu)造函數(shù)設(shè)置的corePoolSize,然后,如果新的值比原始值要小,當(dāng)多余的工作線程下次變成空閑狀態(tài)的時(shí)候會(huì)被中斷并銷毀,如果新的值比原來(lái)的值要大且工作隊(duì)列不為空,則會(huì)創(chuàng)建新的工作線程。流程圖如下:

 

setMaximumPoolSize方法:首先會(huì)覆蓋之前構(gòu)造函數(shù)設(shè)置的maximumPoolSize,然后,如果新的值比原來(lái)的值要小,當(dāng)多余的工作線程下次變成空閑狀態(tài)的時(shí)候會(huì)被中斷并銷毀。

Spring 框架提供的線程池類ThreadPoolTaskExecutor,此類封裝了對(duì)ThreadPoolExecutor有兩個(gè)成員方法setCorePoolSize、setMaximumPoolSize的調(diào)用。

 

基于以上源代碼分析,要實(shí)現(xiàn)一個(gè)簡(jiǎn)單的動(dòng)態(tài)線程池需要以下幾步:

(1)定義一個(gè)動(dòng)態(tài)線程池類,繼承ThreadPoolTaskExecutor,目的跟非動(dòng)態(tài)配置的線程池類ThreadPoolTaskExecutor區(qū)分開(kāi);

(2)定義和實(shí)現(xiàn)一個(gè)動(dòng)態(tài)線程池配置定時(shí)刷的類,目的定時(shí)對(duì)比ducc配置的線程池?cái)?shù)和本地應(yīng)用中線程數(shù)是否一致,若不一致,則更新本地動(dòng)態(tài)線程池線程池?cái)?shù);

(3)引入公司ducc配置平臺(tái)相關(guān)jar包并創(chuàng)建一個(gè)動(dòng)態(tài)線程池配置key;

(4)定義和實(shí)現(xiàn)一個(gè)應(yīng)用啟動(dòng)后根據(jù)動(dòng)態(tài)線程池Bean和從ducc配置平臺(tái)拉取配置刷新應(yīng)用中的線程數(shù)配置;

接下來(lái)代碼一一實(shí)現(xiàn):

(1)動(dòng)態(tài)線程池類

/**
 * 動(dòng)態(tài)線程池
 *
 */
public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
}

(2)動(dòng)態(tài)線程池配置定時(shí)刷新類

@Slf4j
public class DynamicThreadPoolRefresh implements InitializingBean {
    /**
     * Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor.
     */
    private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();

    /**
     * @param threadPoolBeanName
     * @param threadPoolTaskExecutor
     */
    public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) {
        log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor()));
        DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        this.refresh();
        //創(chuàng)建定時(shí)任務(wù)線程池
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build());
        //延遲1秒執(zhí)行,每個(gè)1分鐘check一次
        executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS);
    }

    private void refresh() {
        String dynamicThreadPool = "";
        try {
            if (DTP_REGISTRY.isEmpty()) {
                log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty");
                return;
            }
            dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL);
            if (StringUtils.isBlank(dynamicThreadPool)) {
                log.debug("DynamicThreadPool refresh dynamicThreadPool not config");
                return;
            }
            log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool);
            List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() {
            });
            if (CollectionUtils.isEmpty(threadPoolPropertiesList)) {
                log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool);
                return;
            }
            for (ThreadPoolProperties properties : threadPoolPropertiesList) {
                doRefresh(properties);
            }
        } catch (Exception e) {
            log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e);
        }
    }

    /**
     * @param properties
     */
    private void doRefresh(ThreadPoolProperties properties) {
        if (StringUtils.isBlank(properties.getThreadPoolBeanName())
                || properties.getCorePoolSize() < 1
                || properties.getMaxPoolSize() < 1
                || properties.getMaxPoolSize() < properties.getCorePoolSize()) {
            log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties);
            return;
        }
        DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName());
        if (Objects.isNull(threadPoolTaskExecutor)) {
            log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName());
            return;
        }
        ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
        if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())
                && Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
            log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName());
            return;
        }
        if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) {
            threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize());
            log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize());
        }
        if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
            threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize());
            log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize());
        }
       
        ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
        log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp);
    }

    private class RefreshThreadPoolConfig extends TimerTask {
        private RefreshThreadPoolConfig() {
        }

        @Override
        public void run() {
            DynamicThreadPoolRefresh.this.refresh();
        }
    }

}

線程池配置類

@Data
public class ThreadPoolProperties {
    /**
     * 線程池名稱
     */
    private String threadPoolBeanName;
    /**
     * 線程池核心線程數(shù)量
     */
    private int corePoolSize;
    /**
     * 線程池最大線程池?cái)?shù)量
     */
    private int maxPoolSize;
}

(3)引入公司ducc配置平臺(tái)相關(guān)jar包并創(chuàng)建一個(gè)動(dòng)態(tài)線程池配置key

配置value:

[
  {
    "threadPoolBeanName": "submitOrderThreadPoolTaskExecutor",
    "corePoolSize": 32,
    "maxPoolSize": 128
  }
]

(4) 應(yīng)用啟動(dòng)刷新應(yīng)用本地動(dòng)態(tài)線程池配置

@Slf4j
public class DynamicThreadPoolPostProcessor implements BeanPostProcessor {

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof DynamicThreadPoolTaskExecutor) {
            DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean);
        }
        return bean;
    }
}

3.動(dòng)態(tài)線程池應(yīng)用

動(dòng)態(tài)線程池Bean聲明

    <!-- 普通線程池 -->
    <bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrApper">
        <!-- 核心線程數(shù),默認(rèn)為 -->
        <property name="corePoolSize" value="128"/>
        <!-- 最大線程數(shù),默認(rèn)為Integer.MAX_VALUE -->
        <property name="maxPoolSize" value="512"/>
        <!-- 隊(duì)列最大長(zhǎng)度,一般需要設(shè)置值>=notifyScheduledMainExecutor.maxNum;默認(rèn)為Integer.MAX_VALUE -->
        <property name="queueCapacity" value="500"/>
        <!-- 線程池維護(hù)線程所允許的空閑時(shí)間,默認(rèn)為60s -->
        <property name="keepAliveSeconds" value="60"/>
        <!-- 線程池對(duì)拒絕任務(wù)(無(wú)線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認(rèn)為后者 -->
        <property name="rejectedExecutionHandler">
            <!-- AbortPolicy:直接拋出JAVA.util.concurrent.RejectedExecutionException異常 -->
            <!-- CallerRunsPolicy:主線程直接執(zhí)行該任務(wù),執(zhí)行完之后嘗試添加下一個(gè)任務(wù)到線程池中,可以有效降低向線程池內(nèi)添加任務(wù)的速度 -->
            <!-- DiscardOldestPolicy:拋棄舊的任務(wù)、暫不支持;會(huì)導(dǎo)致被丟棄的任務(wù)無(wú)法再次被執(zhí)行 -->
            <!-- DiscardPolicy:拋棄當(dāng)前任務(wù)、暫不支持;會(huì)導(dǎo)致被丟棄的任務(wù)無(wú)法再次被執(zhí)行 -->
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
    </bean>
    <!-- 動(dòng)態(tài)線程池 -->
    <bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor">
        <!-- 核心線程數(shù),默認(rèn)為 -->
        <property name="corePoolSize" value="32"/>
        <!-- 最大線程數(shù),默認(rèn)為Integer.MAX_VALUE -->
        <property name="maxPoolSize" value="128"/>
        <!-- 隊(duì)列最大長(zhǎng)度,一般需要設(shè)置值>=notifyScheduledMainExecutor.maxNum;默認(rèn)為Integer.MAX_VALUE -->
        <property name="queueCapacity" value="500"/>
        <!-- 線程池維護(hù)線程所允許的空閑時(shí)間,默認(rèn)為60s -->
        <property name="keepAliveSeconds" value="60"/>
        <!-- 線程池對(duì)拒絕任務(wù)(無(wú)線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認(rèn)為后者 -->
        <property name="rejectedExecutionHandler">
            <!-- AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 -->
            <!-- CallerRunsPolicy:主線程直接執(zhí)行該任務(wù),執(zhí)行完之后嘗試添加下一個(gè)任務(wù)到線程池中,可以有效降低向線程池內(nèi)添加任務(wù)的速度 -->
            <!-- DiscardOldestPolicy:拋棄舊的任務(wù)、暫不支持;會(huì)導(dǎo)致被丟棄的任務(wù)無(wú)法再次被執(zhí)行 -->
            <!-- DiscardPolicy:拋棄當(dāng)前任務(wù)、暫不支持;會(huì)導(dǎo)致被丟棄的任務(wù)無(wú)法再次被執(zhí)行 -->
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
    </bean>
    <!-- 動(dòng)態(tài)線程池刷新配置 -->
    <bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/>
    <bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/>

業(yè)務(wù)類注入Spring Bean后,直接使用即可

 @Resource
 private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor;

 
 Runnable asyncTask = ()->{...};
 CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);

4.小結(jié)

本文從實(shí)際項(xiàng)目的業(yè)務(wù)痛點(diǎn)場(chǎng)景出發(fā),并基于公司已有的ducc配置平臺(tái)簡(jiǎn)單實(shí)現(xiàn)了線程池線程數(shù)量可配置。

分享到:
標(biāo)簽:線程
用戶無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定