国产午夜男女在线|欧美日本一道高清国产|亚洲日韩乱码中文字幕|麻豆国产97在线精品一区|日韩一区2区三区另类图片|亚洲精品国产99在线观看|亚洲国产午夜福利精品大秀在线|一级做a爰片性色毛片免费网站

          1. <form id="n2a4a"><nav id="n2a4a"></nav></form>
          2. 您當(dāng)前的位置 :寧夏資訊網(wǎng) > 游戲 >  內(nèi)容正文
            投稿

            ASP.NET Core 3.x 并發(fā)限制

            寧夏資訊網(wǎng) 2020-03-30 17:07:28 來源: 閱讀:-

            前言

            Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0后增加的,用于傳入的請求進行排隊處理,避免線程池的不足.
            我們?nèi)粘i_發(fā)中可能常做的給某web服務(wù)器配置連接數(shù)以及,請求隊列大小,那么今天我們看看如何在通過中間件形式實現(xiàn)一個并發(fā)量以及隊列長度限制.

            Queue策略

            添加Nuget

            Install-Package Microsoft.AspNetCore.ConcurrencyLimiter

            Copy public void ConfigureServices(IServiceCollection services)
            {
            services.AddQueuePolicy(options =>
            {
            //最大并發(fā)請求數(shù)
            options.MaxConcurrentRequests = 2;
            //請求隊列長度限制
            options.RequestQueueLimit = 1;
            });
            services.AddControllers();
            }
            public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
            {
            //添加并發(fā)限制中間件
            app.UseConcurrencyLimiter();
            app.Run(async context =>
            {
            Task.Delay(100).Wait(); // 100ms sync-over-async

            await context.Response.WriteAsync("Hello World!");
            });
            if (env.IsDevelopment())
            {
            app.UseDeveloperExceptionPage();
            }

            app.UseHttpsRedirection();

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
            endpoints.MapControllers();
            });
            }

            通過上面簡單的配置,我們就可以將他引入到我們的代碼中,從而做并發(fā)量限制,以及隊列的長度;那么問題來了,他是怎么實現(xiàn)的呢?

            Copy public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action configure){ services.Configure(configure); services.AddSingleton();
            return services;
            }

            QueuePolicy采用的是SemaphoreSlim信號量設(shè)計,SemaphoreSlim、Semaphore(信號量)支持并發(fā)多線程進入被保護代碼,對象在初始化時會指定 最大任務(wù)數(shù)量,當(dāng)線程請求訪問資源,信號量遞減,而當(dāng)他們釋放時,信號量計數(shù)又遞增。

            Copy ///  /// 構(gòu)造方法(初始化Queue策略) ///  ///  public QueuePolicy(IOptions options) { _maxConcurrentRequests = options.Value.MaxConcurrentRequests; if (_maxConcurrentRequests <= 0) { throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer."); } _requestQueueLimit = options.Value.RequestQueueLimit; if (_requestQueueLimit < 0)
            {
            throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
            }
            //使用SemaphoreSlim來限制任務(wù)最大個數(shù)
            _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
            }

            ConcurrencyLimiterMiddleware中間件

            Copy ///  /// Invokes the logic of the middleware. ///  /// The . /// A  that completes when the request leaves.
            public async Task Invoke(HttpContext context)
            {
            var waitInQueueTask = _queuePolicy.TryEnterAsync();

            // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
            bool result;

            if (waitInQueueTask.IsCompleted)
            {
            ConcurrencyLimiterEventSource.Log.QueueSkipped();
            result = waitInQueueTask.Result;
            }
            else
            {
            using (ConcurrencyLimiterEventSource.Log.QueueTimer())
            {
            result = await waitInQueueTask;
            }
            }

            if (result)
            {
            try
            {
            await _next(context);
            }
            finally
            {
            _queuePolicy.OnExit();
            }
            }
            else
            {
            ConcurrencyLimiterEventSource.Log.RequestRejected();
            ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
            context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
            await _onRejected(context);
            }
            }

            每次當(dāng)我們請求的時候首先會調(diào)用_queuePolicy.TryEnterAsync(),進入該方法后先開啟一個私有l(wèi)ock鎖,再接著判斷總請求量是否≥(請求隊列限制的大小+最大并發(fā)請求數(shù)),如果當(dāng)前數(shù)量超出了,那么我直接拋出,送你個503狀態(tài);

            Copy if (result)
            {
            try
            {
            await _next(context);
            }
            finally
            {
            _queuePolicy.OnExit();
            }
            }
            else
            {
            ConcurrencyLimiterEventSource.Log.RequestRejected();
            ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
            context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
            await _onRejected(context);
            }

            問題來了,我這邊如果說還沒到你設(shè)置的大小呢,我這個請求沒有給你服務(wù)器造不成壓力,那么你給我處理一下吧.
            await _serverSemaphore.WaitAsync();異步等待進入信號量,如果沒有線程被授予對信號量的訪問權(quán)限,則進入執(zhí)行保護代碼;否則此線程將在此處等待,直到信號量被釋放為止

            Copy lock (_totalRequestsLock)
            {
            if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
            {
            return false;
            }
            TotalRequests++;
            }
            //異步等待進入信號量,如果沒有線程被授予對信號量的訪問權(quán)限,則進入執(zhí)行保護代碼;否則此線程將在此處等待,直到信號量被釋放為止
            await _serverSemaphore.WaitAsync();
            return true;
            }

            返回成功后那么中間件這邊再進行處理,_queuePolicy.OnExit();通過該調(diào)用進行調(diào)用_serverSemaphore.Release();釋放信號燈,再對總請求數(shù)遞減

            Stack策略

            再來看看另一種方法,棧策略,他是怎么做的呢?一起來看看.再附加上如何使用的代碼.

            Copy public void ConfigureServices(IServiceCollection services)
            {
            services.AddStackPolicy(options =>
            {
            //最大并發(fā)請求數(shù)
            options.MaxConcurrentRequests = 2;
            //請求隊列長度限制
            options.RequestQueueLimit = 1;
            });
            services.AddControllers();
            }

            通過上面的配置,我們便可以對我們的應(yīng)用程序執(zhí)行出相應(yīng)的策略.下面再來看看他是怎么實現(xiàn)的呢

            Copy public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action configure) { services.Configure(configure); services.AddSingleton();
            return services;
            }

            可以看到這次是通過StackPolicy類做的策略.來一起來看看主要的方法

            Copy ///  /// 構(gòu)造方法(初始化參數(shù)) ///  ///  public StackPolicy(IOptions options) { //棧分配 _buffer = new List();
            //隊列大小
            _maxQueueCapacity = options.Value.RequestQueueLimit;
            //最大并發(fā)請求數(shù)
            _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
            //剩余可用空間
            _freeServerSpots = options.Value.MaxConcurrentRequests;
            }

            當(dāng)我們通過中間件請求調(diào)用,_queuePolicy.TryEnterAsync()時,首先會判斷我們是否還有訪問請求次數(shù),如果_freeServerSpots>0,那么則直接給我們返回true,讓中間件直接去執(zhí)行下一步,如果當(dāng)前隊列=我們設(shè)置的隊列大小的話,那我們需要取消先前請求;每次取消都是先取消之前的保留后面的請求;

            Copy public ValueTask TryEnterAsync() { lock (_bufferLock) { if (_freeServerSpots > 0) { _freeServerSpots--; return _trueTask; } // 如果隊列滿了,取消先前的請求 if (_queueLength == _maxQueueCapacity) { _hasReachedCapacity = true; _buffer[_head].Complete(false); _queueLength--; } var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this); _cachedResettableTCS = null; if (_hasReachedCapacity || _queueLength < _buffer.Count)
            {
            _buffer[_head] = tcs;
            }
            else
            {
            _buffer.Add(tcs);
            }
            _queueLength++;
            // increment _head for next time
            _head++;
            if (_head == _maxQueueCapacity)
            {
            _head = 0;
            }
            return tcs.GetValueTask();
            }
            }

            當(dāng)我們請求后調(diào)用_queuePolicy.OnExit();出棧,再將請求長度遞減

            Copy public void OnExit()
            {
            lock (_bufferLock)
            {
            if (_queueLength == 0)
            {
            _freeServerSpots++;

            if (_freeServerSpots > _maxConcurrentRequests)
            {
            _freeServerSpots--;
            throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
            }

            return;
            }

            // step backwards and launch a new task
            if (_head == 0)
            {
            _head = _maxQueueCapacity - 1;
            }
            else
            {
            _head--;
            }
            //退出,出棧
            _buffer[_head].Complete(true);
            _queueLength--;
            }
            }

            總結(jié)

            基于棧結(jié)構(gòu)的特點,在實際應(yīng)用中,通常只會對棧執(zhí)行以下兩種操作:

            • 向棧中添加元素,此過程被稱為"進棧"(入?;驂簵#?;
            • 從棧中提取出指定元素,此過程被稱為"出棧"(或彈棧);

            隊列存儲結(jié)構(gòu)的實現(xiàn)有以下兩種方式:

            • 順序隊列:在順序表的基礎(chǔ)上實現(xiàn)的隊列結(jié)構(gòu);
            • 鏈隊列:在鏈表的基礎(chǔ)上實現(xiàn)的隊列結(jié)構(gòu);

            (正文已結(jié)束)

            推薦閱讀:網(wǎng)紅藍(lán)牙耳機

            免責(zé)聲明及提醒:此文內(nèi)容為本網(wǎng)所轉(zhuǎn)載企業(yè)宣傳資訊,該相關(guān)信息僅為宣傳及傳遞更多信息之目的,不代表本網(wǎng)站觀點,文章真實性請瀏覽者慎重核實!任何投資加盟均有風(fēng)險,提醒廣大民眾投資需謹(jǐn)慎!

            網(wǎng)站簡介 - 聯(lián)系我們 - 營銷服務(wù) - 老版地圖 - 版權(quán)聲明 - 網(wǎng)站地圖
            Copyright.2002-2019 寧夏資訊網(wǎng) 版權(quán)所有 本網(wǎng)拒絕一切非法行為 歡迎監(jiān)督舉報 如有錯誤信息 歡迎糾正
            布尔津县| 阿拉善右旗| 怀来县| 岳池县| 雅江县| 桐城市| 康马县| 元阳县| 华容县| 肥城市| 益阳市| 临猗县| 疏附县| 西充县| 双辽市| 洪泽县| 上犹县| 卢氏县| 游戏| 本溪| 平谷区| 高陵县| 和林格尔县| 石家庄市| 大兴区| 北流市| 鹤庆县| 佛学| 文登市| 巴南区| 泊头市| 乳山市| 邵阳县| 西充县| 中方县| 文山县| 长岛县| 龙山县| 广昌县| 佛山市| 元氏县|