mirror of
https://github.com/bitwarden/server
synced 2025-12-10 13:23:27 +00:00
Add default values for MaxConcurrentCalls and PrefetchCount across event and integration ASB listeners (#6403)
* Add default values for MaxConcurrentCalls and PrefetchCount across all event and integration ASB listeners * Fix test failure
This commit is contained in:
@@ -5,4 +5,6 @@ public interface IEventListenerConfiguration
|
|||||||
public string EventQueueName { get; }
|
public string EventQueueName { get; }
|
||||||
public string EventSubscriptionName { get; }
|
public string EventSubscriptionName { get; }
|
||||||
public string EventTopicName { get; }
|
public string EventTopicName { get; }
|
||||||
|
public int EventPrefetchCount { get; }
|
||||||
|
public int EventMaxConcurrentCalls { get; }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,6 +10,8 @@ public interface IIntegrationListenerConfiguration : IEventListenerConfiguration
|
|||||||
public string IntegrationSubscriptionName { get; }
|
public string IntegrationSubscriptionName { get; }
|
||||||
public string IntegrationTopicName { get; }
|
public string IntegrationTopicName { get; }
|
||||||
public int MaxRetries { get; }
|
public int MaxRetries { get; }
|
||||||
|
public int IntegrationPrefetchCount { get; }
|
||||||
|
public int IntegrationMaxConcurrentCalls { get; }
|
||||||
|
|
||||||
public string RoutingKey
|
public string RoutingKey
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -25,4 +25,24 @@ public abstract class ListenerConfiguration
|
|||||||
{
|
{
|
||||||
get => _globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName;
|
get => _globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int EventPrefetchCount
|
||||||
|
{
|
||||||
|
get => _globalSettings.EventLogging.AzureServiceBus.DefaultPrefetchCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int EventMaxConcurrentCalls
|
||||||
|
{
|
||||||
|
get => _globalSettings.EventLogging.AzureServiceBus.DefaultMaxConcurrentCalls;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int IntegrationPrefetchCount
|
||||||
|
{
|
||||||
|
get => _globalSettings.EventLogging.AzureServiceBus.DefaultPrefetchCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int IntegrationMaxConcurrentCalls
|
||||||
|
{
|
||||||
|
get => _globalSettings.EventLogging.AzureServiceBus.DefaultMaxConcurrentCalls;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,13 +14,14 @@ public class AzureServiceBusEventListenerService<TConfiguration> : EventLoggingL
|
|||||||
TConfiguration configuration,
|
TConfiguration configuration,
|
||||||
IEventMessageHandler handler,
|
IEventMessageHandler handler,
|
||||||
IAzureServiceBusService serviceBusService,
|
IAzureServiceBusService serviceBusService,
|
||||||
|
ServiceBusProcessorOptions serviceBusOptions,
|
||||||
ILoggerFactory loggerFactory)
|
ILoggerFactory loggerFactory)
|
||||||
: base(handler, CreateLogger(loggerFactory, configuration))
|
: base(handler, CreateLogger(loggerFactory, configuration))
|
||||||
{
|
{
|
||||||
_processor = serviceBusService.CreateProcessor(
|
_processor = serviceBusService.CreateProcessor(
|
||||||
topicName: configuration.EventTopicName,
|
topicName: configuration.EventTopicName,
|
||||||
subscriptionName: configuration.EventSubscriptionName,
|
subscriptionName: configuration.EventSubscriptionName,
|
||||||
new ServiceBusProcessorOptions());
|
options: serviceBusOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ public class AzureServiceBusIntegrationListenerService<TConfiguration> : Backgro
|
|||||||
TConfiguration configuration,
|
TConfiguration configuration,
|
||||||
IIntegrationHandler handler,
|
IIntegrationHandler handler,
|
||||||
IAzureServiceBusService serviceBusService,
|
IAzureServiceBusService serviceBusService,
|
||||||
|
ServiceBusProcessorOptions serviceBusOptions,
|
||||||
ILoggerFactory loggerFactory)
|
ILoggerFactory loggerFactory)
|
||||||
{
|
{
|
||||||
_handler = handler;
|
_handler = handler;
|
||||||
@@ -29,7 +30,7 @@ public class AzureServiceBusIntegrationListenerService<TConfiguration> : Backgro
|
|||||||
_processor = _serviceBusService.CreateProcessor(
|
_processor = _serviceBusService.CreateProcessor(
|
||||||
topicName: configuration.IntegrationTopicName,
|
topicName: configuration.IntegrationTopicName,
|
||||||
subscriptionName: configuration.IntegrationSubscriptionName,
|
subscriptionName: configuration.IntegrationSubscriptionName,
|
||||||
options: new ServiceBusProcessorOptions());
|
options: serviceBusOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||||
|
|||||||
@@ -301,6 +301,9 @@ public class GlobalSettings : IGlobalSettings
|
|||||||
private string _eventTopicName;
|
private string _eventTopicName;
|
||||||
private string _integrationTopicName;
|
private string _integrationTopicName;
|
||||||
|
|
||||||
|
public virtual int DefaultMaxConcurrentCalls { get; set; } = 1;
|
||||||
|
public virtual int DefaultPrefetchCount { get; set; } = 0;
|
||||||
|
|
||||||
public virtual string EventRepositorySubscriptionName { get; set; } = "events-write-subscription";
|
public virtual string EventRepositorySubscriptionName { get; set; } = "events-write-subscription";
|
||||||
public virtual string SlackEventSubscriptionName { get; set; } = "events-slack-subscription";
|
public virtual string SlackEventSubscriptionName { get; set; } = "events-slack-subscription";
|
||||||
public virtual string SlackIntegrationSubscriptionName { get; set; } = "integration-slack-subscription";
|
public virtual string SlackIntegrationSubscriptionName { get; set; } = "integration-slack-subscription";
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ using System.Reflection;
|
|||||||
using System.Security.Claims;
|
using System.Security.Claims;
|
||||||
using System.Security.Cryptography.X509Certificates;
|
using System.Security.Cryptography.X509Certificates;
|
||||||
using AspNetCoreRateLimit;
|
using AspNetCoreRateLimit;
|
||||||
|
using Azure.Messaging.ServiceBus;
|
||||||
using Bit.Core.AdminConsole.AbilitiesCache;
|
using Bit.Core.AdminConsole.AbilitiesCache;
|
||||||
using Bit.Core.AdminConsole.Models.Business.Tokenables;
|
using Bit.Core.AdminConsole.Models.Business.Tokenables;
|
||||||
using Bit.Core.AdminConsole.Models.Data.EventIntegrations;
|
using Bit.Core.AdminConsole.Models.Data.EventIntegrations;
|
||||||
@@ -855,6 +856,11 @@ public static class ServiceCollectionExtensions
|
|||||||
configuration: listenerConfiguration,
|
configuration: listenerConfiguration,
|
||||||
handler: provider.GetRequiredKeyedService<IEventMessageHandler>(serviceKey: listenerConfiguration.RoutingKey),
|
handler: provider.GetRequiredKeyedService<IEventMessageHandler>(serviceKey: listenerConfiguration.RoutingKey),
|
||||||
serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
|
serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
|
||||||
|
serviceBusOptions: new ServiceBusProcessorOptions()
|
||||||
|
{
|
||||||
|
PrefetchCount = listenerConfiguration.EventPrefetchCount,
|
||||||
|
MaxConcurrentCalls = listenerConfiguration.EventMaxConcurrentCalls
|
||||||
|
},
|
||||||
loggerFactory: provider.GetRequiredService<ILoggerFactory>()
|
loggerFactory: provider.GetRequiredService<ILoggerFactory>()
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@@ -865,6 +871,11 @@ public static class ServiceCollectionExtensions
|
|||||||
configuration: listenerConfiguration,
|
configuration: listenerConfiguration,
|
||||||
handler: provider.GetRequiredService<IIntegrationHandler<TConfig>>(),
|
handler: provider.GetRequiredService<IIntegrationHandler<TConfig>>(),
|
||||||
serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
|
serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
|
||||||
|
serviceBusOptions: new ServiceBusProcessorOptions()
|
||||||
|
{
|
||||||
|
PrefetchCount = listenerConfiguration.IntegrationPrefetchCount,
|
||||||
|
MaxConcurrentCalls = listenerConfiguration.IntegrationMaxConcurrentCalls
|
||||||
|
},
|
||||||
loggerFactory: provider.GetRequiredService<ILoggerFactory>()
|
loggerFactory: provider.GetRequiredService<ILoggerFactory>()
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@@ -927,6 +938,11 @@ public static class ServiceCollectionExtensions
|
|||||||
configuration: repositoryConfiguration,
|
configuration: repositoryConfiguration,
|
||||||
handler: provider.GetRequiredService<AzureTableStorageEventHandler>(),
|
handler: provider.GetRequiredService<AzureTableStorageEventHandler>(),
|
||||||
serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
|
serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
|
||||||
|
serviceBusOptions: new ServiceBusProcessorOptions()
|
||||||
|
{
|
||||||
|
PrefetchCount = repositoryConfiguration.EventPrefetchCount,
|
||||||
|
MaxConcurrentCalls = repositoryConfiguration.EventMaxConcurrentCalls
|
||||||
|
},
|
||||||
loggerFactory: provider.GetRequiredService<ILoggerFactory>()
|
loggerFactory: provider.GetRequiredService<ILoggerFactory>()
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -13,4 +13,8 @@ public class TestListenerConfiguration : IIntegrationListenerConfiguration
|
|||||||
public string IntegrationSubscriptionName => "integration_subscription";
|
public string IntegrationSubscriptionName => "integration_subscription";
|
||||||
public string IntegrationTopicName => "integration_topic";
|
public string IntegrationTopicName => "integration_topic";
|
||||||
public int MaxRetries => 3;
|
public int MaxRetries => 3;
|
||||||
|
public int EventMaxConcurrentCalls => 1;
|
||||||
|
public int EventPrefetchCount => 0;
|
||||||
|
public int IntegrationMaxConcurrentCalls => 1;
|
||||||
|
public int IntegrationPrefetchCount => 0;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user