mirror of
https://github.com/bitwarden/server
synced 2025-12-06 00:03:34 +00:00
Refactor Azure Service Bus to use the organization id as a partition key (#6477)
* Refactored Azure Service Bus to use the organization id as a partition key * Use null for partition key instead of empty string when organization id is null
This commit is contained in:
@@ -6,6 +6,7 @@ public interface IIntegrationMessage
|
||||
{
|
||||
IntegrationType IntegrationType { get; }
|
||||
string MessageId { get; set; }
|
||||
string? OrganizationId { get; set; }
|
||||
int RetryCount { get; }
|
||||
DateTime? DelayUntilDate { get; }
|
||||
void ApplyRetry(DateTime? handlerDelayUntilDate);
|
||||
|
||||
@@ -7,6 +7,7 @@ public class IntegrationMessage : IIntegrationMessage
|
||||
{
|
||||
public IntegrationType IntegrationType { get; set; }
|
||||
public required string MessageId { get; set; }
|
||||
public string? OrganizationId { get; set; }
|
||||
public required string RenderedTemplate { get; set; }
|
||||
public int RetryCount { get; set; } = 0;
|
||||
public DateTime? DelayUntilDate { get; set; }
|
||||
|
||||
@@ -5,5 +5,5 @@ namespace Bit.Core.Services;
|
||||
public interface IEventIntegrationPublisher : IAsyncDisposable
|
||||
{
|
||||
Task PublishAsync(IIntegrationMessage message);
|
||||
Task PublishEventAsync(string body);
|
||||
Task PublishEventAsync(string body, string? organizationId);
|
||||
}
|
||||
|
||||
@@ -30,7 +30,8 @@ public class AzureServiceBusService : IAzureServiceBusService
|
||||
var serviceBusMessage = new ServiceBusMessage(json)
|
||||
{
|
||||
Subject = message.IntegrationType.ToRoutingKey(),
|
||||
MessageId = message.MessageId
|
||||
MessageId = message.MessageId,
|
||||
PartitionKey = message.OrganizationId
|
||||
};
|
||||
|
||||
await _integrationSender.SendMessageAsync(serviceBusMessage);
|
||||
@@ -44,18 +45,20 @@ public class AzureServiceBusService : IAzureServiceBusService
|
||||
{
|
||||
Subject = message.IntegrationType.ToRoutingKey(),
|
||||
ScheduledEnqueueTime = message.DelayUntilDate ?? DateTime.UtcNow,
|
||||
MessageId = message.MessageId
|
||||
MessageId = message.MessageId,
|
||||
PartitionKey = message.OrganizationId
|
||||
};
|
||||
|
||||
await _integrationSender.SendMessageAsync(serviceBusMessage);
|
||||
}
|
||||
|
||||
public async Task PublishEventAsync(string body)
|
||||
public async Task PublishEventAsync(string body, string? organizationId)
|
||||
{
|
||||
var message = new ServiceBusMessage(body)
|
||||
{
|
||||
ContentType = "application/json",
|
||||
MessageId = Guid.NewGuid().ToString()
|
||||
MessageId = Guid.NewGuid().ToString(),
|
||||
PartitionKey = organizationId
|
||||
};
|
||||
|
||||
await _eventSender.SendMessageAsync(message);
|
||||
|
||||
@@ -14,15 +14,21 @@ public class EventIntegrationEventWriteService : IEventWriteService, IAsyncDispo
|
||||
public async Task CreateAsync(IEvent e)
|
||||
{
|
||||
var body = JsonSerializer.Serialize(e);
|
||||
await _eventIntegrationPublisher.PublishEventAsync(body: body);
|
||||
await _eventIntegrationPublisher.PublishEventAsync(body: body, organizationId: e.OrganizationId?.ToString());
|
||||
}
|
||||
|
||||
public async Task CreateManyAsync(IEnumerable<IEvent> events)
|
||||
{
|
||||
var body = JsonSerializer.Serialize(events);
|
||||
await _eventIntegrationPublisher.PublishEventAsync(body: body);
|
||||
}
|
||||
var eventList = events as IList<IEvent> ?? events.ToList();
|
||||
if (eventList.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var organizationId = eventList[0].OrganizationId?.ToString();
|
||||
var body = JsonSerializer.Serialize(eventList);
|
||||
await _eventIntegrationPublisher.PublishEventAsync(body: body, organizationId: organizationId);
|
||||
}
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await _eventIntegrationPublisher.DisposeAsync();
|
||||
|
||||
@@ -57,6 +57,7 @@ public class EventIntegrationHandler<T>(
|
||||
{
|
||||
IntegrationType = integrationType,
|
||||
MessageId = messageId.ToString(),
|
||||
OrganizationId = organizationId.ToString(),
|
||||
Configuration = config,
|
||||
RenderedTemplate = renderedTemplate,
|
||||
RetryCount = 0,
|
||||
|
||||
@@ -122,7 +122,7 @@ public class RabbitMqService : IRabbitMqService
|
||||
body: body);
|
||||
}
|
||||
|
||||
public async Task PublishEventAsync(string body)
|
||||
public async Task PublishEventAsync(string body, string? organizationId)
|
||||
{
|
||||
await using var channel = await CreateChannelAsync();
|
||||
var properties = new BasicProperties
|
||||
|
||||
@@ -8,6 +8,7 @@ namespace Bit.Core.Test.Models.Data.EventIntegrations;
|
||||
public class IntegrationMessageTests
|
||||
{
|
||||
private const string _messageId = "TestMessageId";
|
||||
private const string _organizationId = "TestOrganizationId";
|
||||
|
||||
[Fact]
|
||||
public void ApplyRetry_IncrementsRetryCountAndSetsDelayUntilDate()
|
||||
@@ -16,6 +17,7 @@ public class IntegrationMessageTests
|
||||
{
|
||||
Configuration = new WebhookIntegrationConfigurationDetails(new Uri("https://localhost"), "Bearer", "AUTH-TOKEN"),
|
||||
MessageId = _messageId,
|
||||
OrganizationId = _organizationId,
|
||||
RetryCount = 2,
|
||||
RenderedTemplate = string.Empty,
|
||||
DelayUntilDate = null
|
||||
@@ -36,6 +38,7 @@ public class IntegrationMessageTests
|
||||
{
|
||||
Configuration = new WebhookIntegrationConfigurationDetails(new Uri("https://localhost"), "Bearer", "AUTH-TOKEN"),
|
||||
MessageId = _messageId,
|
||||
OrganizationId = _organizationId,
|
||||
RenderedTemplate = "This is the message",
|
||||
IntegrationType = IntegrationType.Webhook,
|
||||
RetryCount = 2,
|
||||
@@ -48,6 +51,7 @@ public class IntegrationMessageTests
|
||||
Assert.NotNull(result);
|
||||
Assert.Equal(message.Configuration, result.Configuration);
|
||||
Assert.Equal(message.MessageId, result.MessageId);
|
||||
Assert.Equal(message.OrganizationId, result.OrganizationId);
|
||||
Assert.Equal(message.RenderedTemplate, result.RenderedTemplate);
|
||||
Assert.Equal(message.IntegrationType, result.IntegrationType);
|
||||
Assert.Equal(message.RetryCount, result.RetryCount);
|
||||
@@ -67,6 +71,7 @@ public class IntegrationMessageTests
|
||||
var message = new IntegrationMessage
|
||||
{
|
||||
MessageId = _messageId,
|
||||
OrganizationId = _organizationId,
|
||||
RenderedTemplate = "This is the message",
|
||||
IntegrationType = IntegrationType.Webhook,
|
||||
RetryCount = 2,
|
||||
@@ -77,6 +82,7 @@ public class IntegrationMessageTests
|
||||
var result = JsonSerializer.Deserialize<IntegrationMessage>(json);
|
||||
|
||||
Assert.Equal(message.MessageId, result.MessageId);
|
||||
Assert.Equal(message.OrganizationId, result.OrganizationId);
|
||||
Assert.Equal(message.RenderedTemplate, result.RenderedTemplate);
|
||||
Assert.Equal(message.IntegrationType, result.IntegrationType);
|
||||
Assert.Equal(message.RetryCount, result.RetryCount);
|
||||
|
||||
@@ -22,18 +22,20 @@ public class EventIntegrationEventWriteServiceTests
|
||||
[Theory, BitAutoData]
|
||||
public async Task CreateAsync_EventPublishedToEventQueue(EventMessage eventMessage)
|
||||
{
|
||||
var expected = JsonSerializer.Serialize(eventMessage);
|
||||
await Subject.CreateAsync(eventMessage);
|
||||
await _eventIntegrationPublisher.Received(1).PublishEventAsync(
|
||||
Arg.Is<string>(body => AssertJsonStringsMatch(eventMessage, body)));
|
||||
body: Arg.Is<string>(body => AssertJsonStringsMatch(eventMessage, body)),
|
||||
organizationId: Arg.Is<string>(orgId => eventMessage.OrganizationId.ToString().Equals(orgId)));
|
||||
}
|
||||
|
||||
[Theory, BitAutoData]
|
||||
public async Task CreateManyAsync_EventsPublishedToEventQueue(IEnumerable<EventMessage> eventMessages)
|
||||
{
|
||||
var eventMessage = eventMessages.First();
|
||||
await Subject.CreateManyAsync(eventMessages);
|
||||
await _eventIntegrationPublisher.Received(1).PublishEventAsync(
|
||||
Arg.Is<string>(body => AssertJsonStringsMatch(eventMessages, body)));
|
||||
body: Arg.Is<string>(body => AssertJsonStringsMatch(eventMessages, body)),
|
||||
organizationId: Arg.Is<string>(orgId => eventMessage.OrganizationId.ToString().Equals(orgId)));
|
||||
}
|
||||
|
||||
private static bool AssertJsonStringsMatch(EventMessage expected, string body)
|
||||
|
||||
@@ -23,6 +23,7 @@ public class EventIntegrationHandlerTests
|
||||
private const string _templateWithOrganization = "Org: #OrganizationName#";
|
||||
private const string _templateWithUser = "#UserName#, #UserEmail#";
|
||||
private const string _templateWithActingUser = "#ActingUserName#, #ActingUserEmail#";
|
||||
private static readonly Guid _organizationId = Guid.NewGuid();
|
||||
private static readonly Uri _uri = new Uri("https://localhost");
|
||||
private static readonly Uri _uri2 = new Uri("https://example.com");
|
||||
private readonly IEventIntegrationPublisher _eventIntegrationPublisher = Substitute.For<IEventIntegrationPublisher>();
|
||||
@@ -50,6 +51,7 @@ public class EventIntegrationHandlerTests
|
||||
{
|
||||
IntegrationType = IntegrationType.Webhook,
|
||||
MessageId = "TestMessageId",
|
||||
OrganizationId = _organizationId.ToString(),
|
||||
Configuration = new WebhookIntegrationConfigurationDetails(_uri),
|
||||
RenderedTemplate = template,
|
||||
RetryCount = 0,
|
||||
@@ -122,6 +124,7 @@ public class EventIntegrationHandlerTests
|
||||
public async Task HandleEventAsync_BaseTemplateOneConfiguration_PublishesIntegrationMessage(EventMessage eventMessage)
|
||||
{
|
||||
var sutProvider = GetSutProvider(OneConfiguration(_templateBase));
|
||||
eventMessage.OrganizationId = _organizationId;
|
||||
|
||||
await sutProvider.Sut.HandleEventAsync(eventMessage);
|
||||
|
||||
@@ -140,6 +143,7 @@ public class EventIntegrationHandlerTests
|
||||
public async Task HandleEventAsync_BaseTemplateTwoConfigurations_PublishesIntegrationMessages(EventMessage eventMessage)
|
||||
{
|
||||
var sutProvider = GetSutProvider(TwoConfigurations(_templateBase));
|
||||
eventMessage.OrganizationId = _organizationId;
|
||||
|
||||
await sutProvider.Sut.HandleEventAsync(eventMessage);
|
||||
|
||||
@@ -164,6 +168,7 @@ public class EventIntegrationHandlerTests
|
||||
var user = Substitute.For<User>();
|
||||
user.Email = "test@example.com";
|
||||
user.Name = "Test";
|
||||
eventMessage.OrganizationId = _organizationId;
|
||||
|
||||
sutProvider.GetDependency<IUserRepository>().GetByIdAsync(Arg.Any<Guid>()).Returns(user);
|
||||
await sutProvider.Sut.HandleEventAsync(eventMessage);
|
||||
@@ -183,6 +188,7 @@ public class EventIntegrationHandlerTests
|
||||
var sutProvider = GetSutProvider(OneConfiguration(_templateWithOrganization));
|
||||
var organization = Substitute.For<Organization>();
|
||||
organization.Name = "Test";
|
||||
eventMessage.OrganizationId = _organizationId;
|
||||
|
||||
sutProvider.GetDependency<IOrganizationRepository>().GetByIdAsync(Arg.Any<Guid>()).Returns(organization);
|
||||
await sutProvider.Sut.HandleEventAsync(eventMessage);
|
||||
@@ -205,6 +211,7 @@ public class EventIntegrationHandlerTests
|
||||
var user = Substitute.For<User>();
|
||||
user.Email = "test@example.com";
|
||||
user.Name = "Test";
|
||||
eventMessage.OrganizationId = _organizationId;
|
||||
|
||||
sutProvider.GetDependency<IUserRepository>().GetByIdAsync(Arg.Any<Guid>()).Returns(user);
|
||||
await sutProvider.Sut.HandleEventAsync(eventMessage);
|
||||
@@ -235,6 +242,7 @@ public class EventIntegrationHandlerTests
|
||||
var sutProvider = GetSutProvider(ValidFilterConfiguration());
|
||||
sutProvider.GetDependency<IIntegrationFilterService>().EvaluateFilterGroup(
|
||||
Arg.Any<IntegrationFilterGroup>(), Arg.Any<EventMessage>()).Returns(true);
|
||||
eventMessage.OrganizationId = _organizationId;
|
||||
|
||||
await sutProvider.Sut.HandleEventAsync(eventMessage);
|
||||
|
||||
@@ -284,7 +292,7 @@ public class EventIntegrationHandlerTests
|
||||
$"Date: {eventMessage.Date}, Type: {eventMessage.Type}, UserId: {eventMessage.UserId}"
|
||||
);
|
||||
await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is(
|
||||
AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" })));
|
||||
AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId", "OrganizationId" })));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -301,12 +309,12 @@ public class EventIntegrationHandlerTests
|
||||
var expectedMessage = EventIntegrationHandlerTests.expectedMessage(
|
||||
$"Date: {eventMessage.Date}, Type: {eventMessage.Type}, UserId: {eventMessage.UserId}"
|
||||
);
|
||||
await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is(
|
||||
AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" })));
|
||||
await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(
|
||||
expectedMessage, new[] { "MessageId", "OrganizationId" })));
|
||||
|
||||
expectedMessage.Configuration = new WebhookIntegrationConfigurationDetails(_uri2);
|
||||
await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is(
|
||||
AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" })));
|
||||
await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(
|
||||
expectedMessage, new[] { "MessageId", "OrganizationId" })));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ public class IntegrationHandlerTests
|
||||
{
|
||||
Configuration = new WebhookIntegrationConfigurationDetails(new Uri("https://localhost"), "Bearer", "AUTH-TOKEN"),
|
||||
MessageId = "TestMessageId",
|
||||
OrganizationId = "TestOrganizationId",
|
||||
IntegrationType = IntegrationType.Webhook,
|
||||
RenderedTemplate = "Template",
|
||||
DelayUntilDate = null,
|
||||
@@ -25,6 +26,8 @@ public class IntegrationHandlerTests
|
||||
var result = await sut.HandleAsync(expected.ToJson());
|
||||
var typedResult = Assert.IsType<IntegrationMessage<WebhookIntegrationConfigurationDetails>>(result.Message);
|
||||
|
||||
Assert.Equal(expected.MessageId, typedResult.MessageId);
|
||||
Assert.Equal(expected.OrganizationId, typedResult.OrganizationId);
|
||||
Assert.Equal(expected.Configuration, typedResult.Configuration);
|
||||
Assert.Equal(expected.RenderedTemplate, typedResult.RenderedTemplate);
|
||||
Assert.Equal(expected.IntegrationType, typedResult.IntegrationType);
|
||||
|
||||
Reference in New Issue
Block a user