1
0
mirror of https://github.com/bitwarden/server synced 2026-01-13 22:13:43 +00:00

[PM-17562] Add support for retries on event integrations (#5795)

* [PM-17562] Add support for retires on event integrations

* Add additional test coverage

* Fixed missing await call

* Remove debug organization id

* Respond to PR feedback

* Change NotBeforeUtc to DelayUntilDate. Adjust comments.

* Respond to PR feedback
This commit is contained in:
Brant DeBow
2025-05-27 08:28:50 -04:00
committed by GitHub
parent c989abdb82
commit f3e637cf2d
40 changed files with 1277 additions and 216 deletions

View File

@@ -2,10 +2,10 @@
using Bit.Api.AdminConsole.Models.Response.Organizations;
using Bit.Core;
using Bit.Core.AdminConsole.Entities;
using Bit.Core.AdminConsole.Models.Data.Integrations;
using Bit.Core.Context;
using Bit.Core.Enums;
using Bit.Core.Exceptions;
using Bit.Core.Models.Data.Integrations;
using Bit.Core.Repositories;
using Bit.Core.Services;
using Bit.Core.Utilities;

View File

@@ -1,8 +1,8 @@
using System.ComponentModel.DataAnnotations;
using System.Text.Json;
using Bit.Core.AdminConsole.Entities;
using Bit.Core.AdminConsole.Models.Data.Integrations;
using Bit.Core.Enums;
using Bit.Core.Models.Data.Integrations;
#nullable enable

View File

@@ -28,10 +28,8 @@ using Bit.Core.Tools.Entities;
using Bit.Core.Vault.Entities;
using Bit.Api.Auth.Models.Request.WebAuthn;
using Bit.Api.Billing;
using Bit.Core.AdminConsole.Services.NoopImplementations;
using Bit.Core.Auth.Models.Data;
using Bit.Core.Auth.Identity.TokenProviders;
using Bit.Core.Services;
using Bit.Core.Tools.ImportFeatures;
using Bit.Core.Tools.ReportFeatures;
using Bit.Core.Auth.Models.Api.Request;
@@ -224,18 +222,8 @@ public class Startup
services.AddHostedService<Core.HostedServices.ApplicationCacheHostedService>();
}
// Slack
if (CoreHelpers.SettingHasValue(globalSettings.Slack.ClientId) &&
CoreHelpers.SettingHasValue(globalSettings.Slack.ClientSecret) &&
CoreHelpers.SettingHasValue(globalSettings.Slack.Scopes))
{
services.AddHttpClient(SlackService.HttpClientName);
services.AddSingleton<ISlackService, SlackService>();
}
else
{
services.AddSingleton<ISlackService, NoopSlackService>();
}
// Add SlackService for OAuth API requests - if configured
services.AddSlackService(globalSettings);
}
public void Configure(

View File

@@ -7,3 +7,19 @@ public enum IntegrationType : int
Slack = 3,
Webhook = 4,
}
public static class IntegrationTypeExtensions
{
public static string ToRoutingKey(this IntegrationType type)
{
switch (type)
{
case IntegrationType.Slack:
return "slack";
case IntegrationType.Webhook:
return "webhook";
default:
throw new ArgumentOutOfRangeException(nameof(type), $"Unsupported integration type: {type}");
}
}
}

View File

@@ -0,0 +1,12 @@
using Bit.Core.Enums;
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public interface IIntegrationMessage
{
IntegrationType IntegrationType { get; }
int RetryCount { get; set; }
DateTime? DelayUntilDate { get; set; }
void ApplyRetry(DateTime? handlerDelayUntilDate);
string ToJson();
}

View File

@@ -0,0 +1,16 @@
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public class IntegrationHandlerResult
{
public IntegrationHandlerResult(bool success, IIntegrationMessage message)
{
Success = success;
Message = message;
}
public bool Success { get; set; } = false;
public bool Retryable { get; set; } = false;
public IIntegrationMessage Message { get; set; }
public DateTime? DelayUntilDate { get; set; }
public string FailureReason { get; set; } = string.Empty;
}

View File

@@ -0,0 +1,34 @@
using System.Text.Json;
using Bit.Core.Enums;
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public class IntegrationMessage<T> : IIntegrationMessage
{
public IntegrationType IntegrationType { get; set; }
public T Configuration { get; set; }
public string RenderedTemplate { get; set; }
public int RetryCount { get; set; } = 0;
public DateTime? DelayUntilDate { get; set; }
public void ApplyRetry(DateTime? handlerDelayUntilDate)
{
RetryCount++;
var baseTime = handlerDelayUntilDate ?? DateTime.UtcNow;
var backoffSeconds = Math.Pow(2, RetryCount);
var jitterSeconds = Random.Shared.Next(0, 3);
DelayUntilDate = baseTime.AddSeconds(backoffSeconds + jitterSeconds);
}
public string ToJson()
{
return JsonSerializer.Serialize(this);
}
public static IntegrationMessage<T> FromJson(string json)
{
return JsonSerializer.Deserialize<IntegrationMessage<T>>(json);
}
}

View File

@@ -1,10 +1,11 @@
using Bit.Core.AdminConsole.Entities;
#nullable enable
using Bit.Core.AdminConsole.Entities;
using Bit.Core.Entities;
using Bit.Core.Enums;
using Bit.Core.Models.Data;
#nullable enable
namespace Bit.Core.Models.Data.Integrations;
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public class IntegrationTemplateContext(EventMessage eventMessage)
{

View File

@@ -1,3 +1,3 @@
namespace Bit.Core.Models.Data.Integrations;
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public record SlackIntegration(string token);

View File

@@ -1,3 +1,3 @@
namespace Bit.Core.Models.Data.Integrations;
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public record SlackIntegrationConfiguration(string channelId);

View File

@@ -1,3 +1,3 @@
namespace Bit.Core.Models.Data.Integrations;
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public record SlackIntegrationConfigurationDetails(string channelId, string token);

View File

@@ -1,3 +1,3 @@
namespace Bit.Core.Models.Data.Integrations;
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public record WebhookIntegrationConfiguration(string url);

View File

@@ -0,0 +1,3 @@
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public record WebhookIntegrationConfigurationDetails(string url);

View File

@@ -1,3 +0,0 @@
namespace Bit.Core.Models.Data.Integrations;
public record WebhookIntegrationConfigurationDetils(string url);

View File

@@ -0,0 +1,24 @@
using Bit.Core.AdminConsole.Models.Data.Integrations;
namespace Bit.Core.Services;
public interface IIntegrationHandler
{
Task<IntegrationHandlerResult> HandleAsync(string json);
}
public interface IIntegrationHandler<T> : IIntegrationHandler
{
Task<IntegrationHandlerResult> HandleAsync(IntegrationMessage<T> message);
}
public abstract class IntegrationHandlerBase<T> : IIntegrationHandler<T>
{
public async Task<IntegrationHandlerResult> HandleAsync(string json)
{
var message = IntegrationMessage<T>.FromJson(json);
return await HandleAsync(message);
}
public abstract Task<IntegrationHandlerResult> HandleAsync(IntegrationMessage<T> message);
}

View File

@@ -0,0 +1,8 @@
using Bit.Core.AdminConsole.Models.Data.Integrations;
namespace Bit.Core.Services;
public interface IIntegrationPublisher
{
Task PublishAsync(IIntegrationMessage message);
}

View File

@@ -0,0 +1,83 @@
using System.Text.Json;
using Bit.Core.AdminConsole.Models.Data.Integrations;
using Bit.Core.AdminConsole.Utilities;
using Bit.Core.Enums;
using Bit.Core.Models.Data;
using Bit.Core.Repositories;
namespace Bit.Core.Services;
#nullable enable
public class EventIntegrationHandler<T>(
IntegrationType integrationType,
IIntegrationPublisher integrationPublisher,
IOrganizationIntegrationConfigurationRepository configurationRepository,
IUserRepository userRepository,
IOrganizationRepository organizationRepository)
: IEventMessageHandler
{
public async Task HandleEventAsync(EventMessage eventMessage)
{
if (eventMessage.OrganizationId is not Guid organizationId)
{
return;
}
var configurations = await configurationRepository.GetConfigurationDetailsAsync(
organizationId,
integrationType,
eventMessage.Type);
foreach (var configuration in configurations)
{
var template = configuration.Template ?? string.Empty;
var context = await BuildContextAsync(eventMessage, template);
var renderedTemplate = IntegrationTemplateProcessor.ReplaceTokens(template, context);
var config = configuration.MergedConfiguration.Deserialize<T>()
?? throw new InvalidOperationException($"Failed to deserialize to {typeof(T).Name}");
var message = new IntegrationMessage<T>
{
IntegrationType = integrationType,
Configuration = config,
RenderedTemplate = renderedTemplate,
RetryCount = 0,
DelayUntilDate = null
};
await integrationPublisher.PublishAsync(message);
}
}
public async Task HandleManyEventsAsync(IEnumerable<EventMessage> eventMessages)
{
foreach (var eventMessage in eventMessages)
{
await HandleEventAsync(eventMessage);
}
}
private async Task<IntegrationTemplateContext> BuildContextAsync(EventMessage eventMessage, string template)
{
var context = new IntegrationTemplateContext(eventMessage);
if (IntegrationTemplateProcessor.TemplateRequiresUser(template) && eventMessage.UserId.HasValue)
{
context.User = await userRepository.GetByIdAsync(eventMessage.UserId.Value);
}
if (IntegrationTemplateProcessor.TemplateRequiresActingUser(template) && eventMessage.ActingUserId.HasValue)
{
context.ActingUser = await userRepository.GetByIdAsync(eventMessage.ActingUserId.Value);
}
if (IntegrationTemplateProcessor.TemplateRequiresOrganization(template) && eventMessage.OrganizationId.HasValue)
{
context.Organization = await organizationRepository.GetByIdAsync(eventMessage.OrganizationId.Value);
}
return context;
}
}

View File

@@ -1,8 +1,8 @@
using System.Text.Json.Nodes;
using Bit.Core.AdminConsole.Models.Data.Integrations;
using Bit.Core.AdminConsole.Utilities;
using Bit.Core.Enums;
using Bit.Core.Models.Data;
using Bit.Core.Models.Data.Integrations;
using Bit.Core.Repositories;
namespace Bit.Core.Services;

View File

@@ -29,7 +29,7 @@ public class RabbitMqEventListenerService : EventLoggingListenerService
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
_exchangeName = globalSettings.EventLogging.RabbitMq.ExchangeName;
_exchangeName = globalSettings.EventLogging.RabbitMq.EventExchangeName;
_logger = logger;
_queueName = queueName;
}

View File

@@ -18,7 +18,7 @@ public class RabbitMqEventWriteService : IEventWriteService, IAsyncDisposable
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
_exchangeName = globalSettings.EventLogging.RabbitMq.ExchangeName;
_exchangeName = globalSettings.EventLogging.RabbitMq.EventExchangeName;
_lazyConnection = new Lazy<Task<IConnection>>(CreateConnectionAsync);
}

View File

@@ -0,0 +1,191 @@
using System.Text;
using Bit.Core.Settings;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Bit.Core.Services;
public class RabbitMqIntegrationListenerService : BackgroundService
{
private const string _deadLetterRoutingKey = "dead-letter";
private IChannel _channel;
private IConnection _connection;
private readonly string _exchangeName;
private readonly string _queueName;
private readonly string _retryQueueName;
private readonly string _deadLetterQueueName;
private readonly string _routingKey;
private readonly string _retryRoutingKey;
private readonly int _maxRetries;
private readonly IIntegrationHandler _handler;
private readonly ConnectionFactory _factory;
private readonly ILogger<RabbitMqIntegrationListenerService> _logger;
private readonly int _retryTiming;
public RabbitMqIntegrationListenerService(IIntegrationHandler handler,
string routingKey,
string queueName,
string retryQueueName,
string deadLetterQueueName,
GlobalSettings globalSettings,
ILogger<RabbitMqIntegrationListenerService> logger)
{
_handler = handler;
_routingKey = routingKey;
_retryRoutingKey = $"{_routingKey}-retry";
_queueName = queueName;
_retryQueueName = retryQueueName;
_deadLetterQueueName = deadLetterQueueName;
_logger = logger;
_exchangeName = globalSettings.EventLogging.RabbitMq.IntegrationExchangeName;
_maxRetries = globalSettings.EventLogging.RabbitMq.MaxRetries;
_retryTiming = globalSettings.EventLogging.RabbitMq.RetryTiming;
_factory = new ConnectionFactory
{
HostName = globalSettings.EventLogging.RabbitMq.HostName,
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
}
public override async Task StartAsync(CancellationToken cancellationToken)
{
_connection = await _factory.CreateConnectionAsync(cancellationToken);
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
await _channel.ExchangeDeclareAsync(exchange: _exchangeName,
type: ExchangeType.Direct,
durable: true,
cancellationToken: cancellationToken);
// Declare main queue
await _channel.QueueDeclareAsync(queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
await _channel.QueueBindAsync(queue: _queueName,
exchange: _exchangeName,
routingKey: _routingKey,
cancellationToken: cancellationToken);
// Declare retry queue (Configurable TTL, dead-letters back to main queue)
await _channel.QueueDeclareAsync(queue: _retryQueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object>
{
{ "x-dead-letter-exchange", _exchangeName },
{ "x-dead-letter-routing-key", _routingKey },
{ "x-message-ttl", _retryTiming }
},
cancellationToken: cancellationToken);
await _channel.QueueBindAsync(queue: _retryQueueName,
exchange: _exchangeName,
routingKey: _retryRoutingKey,
cancellationToken: cancellationToken);
// Declare dead letter queue
await _channel.QueueDeclareAsync(queue: _deadLetterQueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
await _channel.QueueBindAsync(queue: _deadLetterQueueName,
exchange: _exchangeName,
routingKey: _deadLetterRoutingKey,
cancellationToken: cancellationToken);
await base.StartAsync(cancellationToken);
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (_, ea) =>
{
var json = Encoding.UTF8.GetString(ea.Body.Span);
try
{
var result = await _handler.HandleAsync(json);
var message = result.Message;
if (result.Success)
{
// Successful integration send. Acknowledge message delivery and return
await _channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
return;
}
if (result.Retryable)
{
// Integration failed, but is retryable - apply delay and check max retries
message.ApplyRetry(result.DelayUntilDate);
if (message.RetryCount < _maxRetries)
{
// Publish message to the retry queue. It will be re-published for retry after a delay
await _channel.BasicPublishAsync(
exchange: _exchangeName,
routingKey: _retryRoutingKey,
body: Encoding.UTF8.GetBytes(message.ToJson()),
cancellationToken: cancellationToken);
}
else
{
// Exceeded the max number of retries; fail and send to dead letter queue
await PublishToDeadLetterAsync(message.ToJson());
_logger.LogWarning("Max retry attempts reached. Sent to DLQ.");
}
}
else
{
// Fatal error (i.e. not retryable) occurred. Send message to dead letter queue without any retries
await PublishToDeadLetterAsync(message.ToJson());
_logger.LogWarning("Non-retryable failure. Sent to DLQ.");
}
// Message has been sent to retry or dead letter queues.
// Acknowledge receipt so Rabbit knows it's been processed
await _channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
}
catch (Exception ex)
{
// Unknown error occurred. Acknowledge so Rabbit doesn't keep attempting. Log the error
_logger.LogError(ex, "Unhandled error processing integration message.");
await _channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
}
};
await _channel.BasicConsumeAsync(queue: _queueName, autoAck: false, consumer: consumer, cancellationToken: cancellationToken);
}
private async Task PublishToDeadLetterAsync(string json)
{
await _channel.BasicPublishAsync(
exchange: _exchangeName,
routingKey: _deadLetterRoutingKey,
body: Encoding.UTF8.GetBytes(json));
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
await _channel.CloseAsync(cancellationToken);
await _connection.CloseAsync(cancellationToken);
await base.StopAsync(cancellationToken);
}
public override void Dispose()
{
_channel.Dispose();
_connection.Dispose();
base.Dispose();
}
}

View File

@@ -0,0 +1,54 @@
using System.Text;
using Bit.Core.AdminConsole.Models.Data.Integrations;
using Bit.Core.Enums;
using Bit.Core.Settings;
using RabbitMQ.Client;
namespace Bit.Core.Services;
public class RabbitMqIntegrationPublisher : IIntegrationPublisher, IAsyncDisposable
{
private readonly ConnectionFactory _factory;
private readonly Lazy<Task<IConnection>> _lazyConnection;
private readonly string _exchangeName;
public RabbitMqIntegrationPublisher(GlobalSettings globalSettings)
{
_factory = new ConnectionFactory
{
HostName = globalSettings.EventLogging.RabbitMq.HostName,
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
_exchangeName = globalSettings.EventLogging.RabbitMq.IntegrationExchangeName;
_lazyConnection = new Lazy<Task<IConnection>>(CreateConnectionAsync);
}
public async Task PublishAsync(IIntegrationMessage message)
{
var routingKey = message.IntegrationType.ToRoutingKey();
var connection = await _lazyConnection.Value;
await using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Direct, durable: true);
var body = Encoding.UTF8.GetBytes(message.ToJson());
await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: routingKey, body: body);
}
public async ValueTask DisposeAsync()
{
if (_lazyConnection.IsValueCreated)
{
var connection = await _lazyConnection.Value;
await connection.DisposeAsync();
}
}
private async Task<IConnection> CreateConnectionAsync()
{
return await _factory.CreateConnectionAsync();
}
}

View File

@@ -1,7 +1,7 @@
using System.Text.Json;
using System.Text.Json.Nodes;
using Bit.Core.AdminConsole.Models.Data.Integrations;
using Bit.Core.Enums;
using Bit.Core.Models.Data.Integrations;
using Bit.Core.Repositories;
#nullable enable

View File

@@ -0,0 +1,19 @@
using Bit.Core.AdminConsole.Models.Data.Integrations;
namespace Bit.Core.Services;
public class SlackIntegrationHandler(
ISlackService slackService)
: IntegrationHandlerBase<SlackIntegrationConfigurationDetails>
{
public override async Task<IntegrationHandlerResult> HandleAsync(IntegrationMessage<SlackIntegrationConfigurationDetails> message)
{
await slackService.SendSlackMessageByChannelIdAsync(
message.Configuration.token,
message.RenderedTemplate,
message.Configuration.channelId
);
return new IntegrationHandlerResult(success: true, message: message);
}
}

View File

@@ -1,8 +1,8 @@
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;
using Bit.Core.AdminConsole.Models.Data.Integrations;
using Bit.Core.Enums;
using Bit.Core.Models.Data.Integrations;
using Bit.Core.Repositories;
#nullable enable
@@ -25,7 +25,7 @@ public class WebhookEventHandler(
protected override async Task ProcessEventIntegrationAsync(JsonObject mergedConfiguration,
string renderedTemplate)
{
var config = mergedConfiguration.Deserialize<WebhookIntegrationConfigurationDetils>();
var config = mergedConfiguration.Deserialize<WebhookIntegrationConfigurationDetails>();
if (config is null || string.IsNullOrEmpty(config.url))
{
return;

View File

@@ -0,0 +1,61 @@
using System.Globalization;
using System.Net;
using System.Text;
using Bit.Core.AdminConsole.Models.Data.Integrations;
#nullable enable
namespace Bit.Core.Services;
public class WebhookIntegrationHandler(IHttpClientFactory httpClientFactory)
: IntegrationHandlerBase<WebhookIntegrationConfigurationDetails>
{
private readonly HttpClient _httpClient = httpClientFactory.CreateClient(HttpClientName);
public const string HttpClientName = "WebhookIntegrationHandlerHttpClient";
public override async Task<IntegrationHandlerResult> HandleAsync(IntegrationMessage<WebhookIntegrationConfigurationDetails> message)
{
var content = new StringContent(message.RenderedTemplate, Encoding.UTF8, "application/json");
var response = await _httpClient.PostAsync(message.Configuration.url, content);
var result = new IntegrationHandlerResult(success: response.IsSuccessStatusCode, message);
switch (response.StatusCode)
{
case HttpStatusCode.TooManyRequests:
case HttpStatusCode.RequestTimeout:
case HttpStatusCode.InternalServerError:
case HttpStatusCode.BadGateway:
case HttpStatusCode.ServiceUnavailable:
case HttpStatusCode.GatewayTimeout:
result.Retryable = true;
result.FailureReason = response.ReasonPhrase;
if (response.Headers.TryGetValues("Retry-After", out var values))
{
var value = values.FirstOrDefault();
if (int.TryParse(value, out var seconds))
{
// Retry-after was specified in seconds. Adjust DelayUntilDate by the requested number of seconds.
result.DelayUntilDate = DateTime.UtcNow.AddSeconds(seconds);
}
else if (DateTimeOffset.TryParseExact(value,
"r", // "r" is the round-trip format: RFC1123
CultureInfo.InvariantCulture,
DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal,
out var retryDate))
{
// Retry-after was specified as a date. Adjust DelayUntilDate to the specified date.
result.DelayUntilDate = retryDate.UtcDateTime;
}
}
break;
default:
result.Retryable = false;
result.FailureReason = response.ReasonPhrase;
break;
}
return result;
}
}

View File

@@ -1,4 +1,6 @@
using System.Text.RegularExpressions;
#nullable enable
using System.Text.RegularExpressions;
namespace Bit.Core.AdminConsole.Utilities;
@@ -9,7 +11,7 @@ public static partial class IntegrationTemplateProcessor
public static string ReplaceTokens(string template, object values)
{
if (string.IsNullOrEmpty(template) || values == null)
if (string.IsNullOrEmpty(template))
{
return template;
}

View File

@@ -312,11 +312,19 @@ public class GlobalSettings : IGlobalSettings
private string _hostName;
private string _username;
private string _password;
private string _exchangeName;
private string _eventExchangeName;
private string _integrationExchangeName;
public int MaxRetries { get; set; } = 3;
public int RetryTiming { get; set; } = 30000; // 30s
public virtual string EventRepositoryQueueName { get; set; } = "events-write-queue";
public virtual string WebhookQueueName { get; set; } = "events-webhook-queue";
public virtual string SlackQueueName { get; set; } = "events-slack-queue";
public virtual string IntegrationDeadLetterQueueName { get; set; } = "integration-dead-letter-queue";
public virtual string SlackEventsQueueName { get; set; } = "events-slack-queue";
public virtual string SlackIntegrationQueueName { get; set; } = "integration-slack-queue";
public virtual string SlackIntegrationRetryQueueName { get; set; } = "integration-slack-retry-queue";
public virtual string WebhookEventsQueueName { get; set; } = "events-webhook-queue";
public virtual string WebhookIntegrationQueueName { get; set; } = "integration-webhook-queue";
public virtual string WebhookIntegrationRetryQueueName { get; set; } = "integration-webhook-retry-queue";
public string HostName
{
@@ -333,10 +341,15 @@ public class GlobalSettings : IGlobalSettings
get => _password;
set => _password = value.Trim('"');
}
public string ExchangeName
public string EventExchangeName
{
get => _exchangeName;
set => _exchangeName = value.Trim('"');
get => _eventExchangeName;
set => _eventExchangeName = value.Trim('"');
}
public string IntegrationExchangeName
{
get => _integrationExchangeName;
set => _integrationExchangeName = value.Trim('"');
}
}
}

View File

@@ -1,6 +1,4 @@
using System.Globalization;
using Bit.Core.AdminConsole.Services.Implementations;
using Bit.Core.AdminConsole.Services.NoopImplementations;
using Bit.Core.Context;
using Bit.Core.IdentityServer;
using Bit.Core.Services;
@@ -63,37 +61,7 @@ public class Startup
services.AddSingleton<IApplicationCacheService, InMemoryApplicationCacheService>();
}
if (!globalSettings.SelfHosted && CoreHelpers.SettingHasValue(globalSettings.Events.ConnectionString))
{
services.AddKeyedSingleton<IEventWriteService, AzureQueueEventWriteService>("storage");
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.TopicName))
{
services.AddKeyedSingleton<IEventWriteService, AzureServiceBusEventWriteService>("broadcast");
}
else
{
services.AddKeyedSingleton<IEventWriteService, NoopEventWriteService>("broadcast");
}
}
else
{
services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("storage");
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.HostName) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Username) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Password) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.ExchangeName))
{
services.AddKeyedSingleton<IEventWriteService, RabbitMqEventWriteService>("broadcast");
}
else
{
services.AddKeyedSingleton<IEventWriteService, NoopEventWriteService>("broadcast");
}
}
services.AddScoped<IEventWriteService, EventRouteService>();
services.AddEventWriteServices(globalSettings);
services.AddScoped<IEventService, EventService>();
services.AddOptionality();
@@ -109,49 +77,7 @@ public class Startup
services.AddHostedService<Core.HostedServices.ApplicationCacheHostedService>();
}
// Optional RabbitMQ Listeners
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.HostName) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Username) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Password) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.ExchangeName))
{
services.AddSingleton<EventRepositoryHandler>();
services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("persistent");
services.AddSingleton<IHostedService>(provider =>
new RabbitMqEventListenerService(
provider.GetRequiredService<EventRepositoryHandler>(),
provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>(),
globalSettings,
globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName));
if (CoreHelpers.SettingHasValue(globalSettings.Slack.ClientId) &&
CoreHelpers.SettingHasValue(globalSettings.Slack.ClientSecret) &&
CoreHelpers.SettingHasValue(globalSettings.Slack.Scopes))
{
services.AddHttpClient(SlackService.HttpClientName);
services.AddSingleton<ISlackService, SlackService>();
}
else
{
services.AddSingleton<ISlackService, NoopSlackService>();
}
services.AddSingleton<SlackEventHandler>();
services.AddSingleton<IHostedService>(provider =>
new RabbitMqEventListenerService(
provider.GetRequiredService<SlackEventHandler>(),
provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>(),
globalSettings,
globalSettings.EventLogging.RabbitMq.SlackQueueName));
services.AddHttpClient(WebhookEventHandler.HttpClientName);
services.AddSingleton<WebhookEventHandler>();
services.AddSingleton<IHostedService>(provider =>
new RabbitMqEventListenerService(
provider.GetRequiredService<WebhookEventHandler>(),
provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>(),
globalSettings,
globalSettings.EventLogging.RabbitMq.WebhookQueueName));
}
services.AddRabbitMqListeners(globalSettings);
}
public void Configure(

View File

@@ -1,12 +1,8 @@
using System.Globalization;
using Bit.Core.AdminConsole.Services.NoopImplementations;
using Bit.Core.Repositories;
using Bit.Core.Services;
using Bit.Core.Settings;
using Bit.Core.Utilities;
using Bit.SharedWeb.Utilities;
using Microsoft.IdentityModel.Logging;
using TableStorageRepos = Bit.Core.Repositories.TableStorage;
namespace Bit.EventsProcessor;
@@ -37,50 +33,7 @@ public class Startup
services.AddDatabaseRepositories(globalSettings);
// Hosted Services
// Optional Azure Service Bus Listeners
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.TopicName))
{
services.AddSingleton<IEventRepository, TableStorageRepos.EventRepository>();
services.AddSingleton<AzureTableStorageEventHandler>();
services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("persistent");
services.AddSingleton<IHostedService>(provider =>
new AzureServiceBusEventListenerService(
provider.GetRequiredService<AzureTableStorageEventHandler>(),
provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>(),
globalSettings,
globalSettings.EventLogging.AzureServiceBus.EventRepositorySubscriptionName));
if (CoreHelpers.SettingHasValue(globalSettings.Slack.ClientId) &&
CoreHelpers.SettingHasValue(globalSettings.Slack.ClientSecret) &&
CoreHelpers.SettingHasValue(globalSettings.Slack.Scopes))
{
services.AddHttpClient(SlackService.HttpClientName);
services.AddSingleton<ISlackService, SlackService>();
}
else
{
services.AddSingleton<ISlackService, NoopSlackService>();
}
services.AddSingleton<SlackEventHandler>();
services.AddSingleton<IHostedService>(provider =>
new AzureServiceBusEventListenerService(
provider.GetRequiredService<SlackEventHandler>(),
provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>(),
globalSettings,
globalSettings.EventLogging.AzureServiceBus.SlackSubscriptionName));
services.AddSingleton<WebhookEventHandler>();
services.AddHttpClient(WebhookEventHandler.HttpClientName);
services.AddSingleton<IHostedService>(provider =>
new AzureServiceBusEventListenerService(
provider.GetRequiredService<WebhookEventHandler>(),
provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>(),
globalSettings,
globalSettings.EventLogging.AzureServiceBus.WebhookSubscriptionName));
}
services.AddAzureServiceBusListeners(globalSettings);
services.AddHostedService<AzureQueueHostedService>();
}

View File

@@ -5,6 +5,7 @@ using System.Security.Cryptography.X509Certificates;
using AspNetCoreRateLimit;
using Azure.Storage.Queues;
using Bit.Core.AdminConsole.Models.Business.Tokenables;
using Bit.Core.AdminConsole.Models.Data.Integrations;
using Bit.Core.AdminConsole.OrganizationFeatures.Policies;
using Bit.Core.AdminConsole.Services;
using Bit.Core.AdminConsole.Services.Implementations;
@@ -324,42 +325,7 @@ public static class ServiceCollectionExtensions
services.AddSingleton<IMailEnqueuingService, BlockingMailEnqueuingService>();
}
if (!globalSettings.SelfHosted && CoreHelpers.SettingHasValue(globalSettings.Events.ConnectionString))
{
services.AddKeyedSingleton<IEventWriteService, AzureQueueEventWriteService>("storage");
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.TopicName))
{
services.AddKeyedSingleton<IEventWriteService, AzureServiceBusEventWriteService>("broadcast");
}
else
{
services.AddKeyedSingleton<IEventWriteService, NoopEventWriteService>("broadcast");
}
}
else if (globalSettings.SelfHosted)
{
services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("storage");
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.HostName) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Username) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Password) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.ExchangeName))
{
services.AddKeyedSingleton<IEventWriteService, RabbitMqEventWriteService>("broadcast");
}
else
{
services.AddKeyedSingleton<IEventWriteService, NoopEventWriteService>("broadcast");
}
}
else
{
services.AddKeyedSingleton<IEventWriteService, NoopEventWriteService>("storage");
services.AddKeyedSingleton<IEventWriteService, NoopEventWriteService>("broadcast");
}
services.AddScoped<IEventWriteService, EventRouteService>();
services.AddEventWriteServices(globalSettings);
if (CoreHelpers.SettingHasValue(globalSettings.Attachment.ConnectionString))
{
@@ -584,6 +550,193 @@ public static class ServiceCollectionExtensions
return globalSettings;
}
public static IServiceCollection AddEventWriteServices(this IServiceCollection services, GlobalSettings globalSettings)
{
if (!globalSettings.SelfHosted && CoreHelpers.SettingHasValue(globalSettings.Events.ConnectionString))
{
services.AddKeyedSingleton<IEventWriteService, AzureQueueEventWriteService>("storage");
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.TopicName))
{
services.AddKeyedSingleton<IEventWriteService, AzureServiceBusEventWriteService>("broadcast");
}
else
{
services.AddKeyedSingleton<IEventWriteService, NoopEventWriteService>("broadcast");
}
}
else if (globalSettings.SelfHosted)
{
services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("storage");
if (IsRabbitMqEnabled(globalSettings))
{
services.AddKeyedSingleton<IEventWriteService, RabbitMqEventWriteService>("broadcast");
}
else
{
services.AddKeyedSingleton<IEventWriteService, NoopEventWriteService>("broadcast");
}
}
else
{
services.AddKeyedSingleton<IEventWriteService, NoopEventWriteService>("storage");
services.AddKeyedSingleton<IEventWriteService, NoopEventWriteService>("broadcast");
}
services.AddScoped<IEventWriteService, EventRouteService>();
return services;
}
public static IServiceCollection AddAzureServiceBusListeners(this IServiceCollection services, GlobalSettings globalSettings)
{
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.TopicName))
{
services.AddSingleton<IEventRepository, TableStorageRepos.EventRepository>();
services.AddSingleton<AzureTableStorageEventHandler>();
services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("persistent");
services.AddSingleton<IHostedService>(provider =>
new AzureServiceBusEventListenerService(
provider.GetRequiredService<AzureTableStorageEventHandler>(),
provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>(),
globalSettings,
globalSettings.EventLogging.AzureServiceBus.EventRepositorySubscriptionName));
services.AddSlackService(globalSettings);
services.AddSingleton<SlackEventHandler>();
services.AddSingleton<IHostedService>(provider =>
new AzureServiceBusEventListenerService(
provider.GetRequiredService<SlackEventHandler>(),
provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>(),
globalSettings,
globalSettings.EventLogging.AzureServiceBus.SlackSubscriptionName));
services.AddSingleton<WebhookEventHandler>();
services.AddHttpClient(WebhookEventHandler.HttpClientName);
services.AddSingleton<IHostedService>(provider =>
new AzureServiceBusEventListenerService(
provider.GetRequiredService<WebhookEventHandler>(),
provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>(),
globalSettings,
globalSettings.EventLogging.AzureServiceBus.WebhookSubscriptionName));
}
return services;
}
public static IServiceCollection AddRabbitMqListeners(this IServiceCollection services, GlobalSettings globalSettings)
{
if (IsRabbitMqEnabled(globalSettings))
{
services.AddRabbitMqEventRepositoryListener(globalSettings);
services.AddSlackService(globalSettings);
services.AddRabbitMqIntegration<SlackIntegrationConfigurationDetails, SlackIntegrationHandler>(
globalSettings.EventLogging.RabbitMq.SlackEventsQueueName,
globalSettings.EventLogging.RabbitMq.SlackIntegrationQueueName,
globalSettings.EventLogging.RabbitMq.SlackIntegrationRetryQueueName,
globalSettings.EventLogging.RabbitMq.IntegrationDeadLetterQueueName,
IntegrationType.Slack,
globalSettings);
services.AddHttpClient(WebhookIntegrationHandler.HttpClientName);
services.AddRabbitMqIntegration<WebhookIntegrationConfigurationDetails, WebhookIntegrationHandler>(
globalSettings.EventLogging.RabbitMq.WebhookEventsQueueName,
globalSettings.EventLogging.RabbitMq.WebhookIntegrationQueueName,
globalSettings.EventLogging.RabbitMq.WebhookIntegrationRetryQueueName,
globalSettings.EventLogging.RabbitMq.IntegrationDeadLetterQueueName,
IntegrationType.Webhook,
globalSettings);
}
return services;
}
public static IServiceCollection AddSlackService(this IServiceCollection services, GlobalSettings globalSettings)
{
if (CoreHelpers.SettingHasValue(globalSettings.Slack.ClientId) &&
CoreHelpers.SettingHasValue(globalSettings.Slack.ClientSecret) &&
CoreHelpers.SettingHasValue(globalSettings.Slack.Scopes))
{
services.AddHttpClient(SlackService.HttpClientName);
services.AddSingleton<ISlackService, SlackService>();
}
else
{
services.AddSingleton<ISlackService, NoopSlackService>();
}
return services;
}
private static IServiceCollection AddRabbitMqEventRepositoryListener(this IServiceCollection services, GlobalSettings globalSettings)
{
services.AddSingleton<EventRepositoryHandler>();
services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("persistent");
services.AddSingleton<IHostedService>(provider =>
new RabbitMqEventListenerService(
provider.GetRequiredService<EventRepositoryHandler>(),
provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>(),
globalSettings,
globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName));
return services;
}
private static IServiceCollection AddRabbitMqIntegration<TConfig, THandler>(this IServiceCollection services,
string eventQueueName,
string integrationQueueName,
string integrationRetryQueueName,
string integrationDeadLetterQueueName,
IntegrationType integrationType,
GlobalSettings globalSettings)
where TConfig : class
where THandler : class, IIntegrationHandler<TConfig>
{
var routingKey = integrationType.ToRoutingKey();
services.AddSingleton<IIntegrationPublisher, RabbitMqIntegrationPublisher>();
services.AddKeyedSingleton<IEventMessageHandler>(routingKey, (provider, _) =>
new EventIntegrationHandler<TConfig>(
integrationType,
provider.GetRequiredService<IIntegrationPublisher>(),
provider.GetRequiredService<IOrganizationIntegrationConfigurationRepository>(),
provider.GetRequiredService<IUserRepository>(),
provider.GetRequiredService<IOrganizationRepository>()));
services.AddSingleton<IHostedService>(provider =>
new RabbitMqEventListenerService(
provider.GetRequiredKeyedService<IEventMessageHandler>(routingKey),
provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>(),
globalSettings,
eventQueueName));
services.AddSingleton<IIntegrationHandler<TConfig>, THandler>();
services.AddSingleton<IHostedService>(provider =>
new RabbitMqIntegrationListenerService(
handler: provider.GetRequiredService<IIntegrationHandler<TConfig>>(),
routingKey: routingKey,
queueName: integrationQueueName,
retryQueueName: integrationRetryQueueName,
deadLetterQueueName: integrationDeadLetterQueueName,
globalSettings: globalSettings,
logger: provider.GetRequiredService<ILogger<RabbitMqIntegrationListenerService>>()));
return services;
}
private static bool IsRabbitMqEnabled(GlobalSettings settings)
{
return CoreHelpers.SettingHasValue(settings.EventLogging.RabbitMq.HostName) &&
CoreHelpers.SettingHasValue(settings.EventLogging.RabbitMq.Username) &&
CoreHelpers.SettingHasValue(settings.EventLogging.RabbitMq.Password) &&
CoreHelpers.SettingHasValue(settings.EventLogging.RabbitMq.EventExchangeName);
}
public static void UseDefaultMiddleware(this IApplicationBuilder app,
IWebHostEnvironment env, GlobalSettings globalSettings)
{