diff --git a/src/Core/AdminConsole/Models/Data/EventIntegrations/IIntegrationMessage.cs b/src/Core/AdminConsole/Models/Data/EventIntegrations/IIntegrationMessage.cs index 7a0962d89a..5b6bfe2e53 100644 --- a/src/Core/AdminConsole/Models/Data/EventIntegrations/IIntegrationMessage.cs +++ b/src/Core/AdminConsole/Models/Data/EventIntegrations/IIntegrationMessage.cs @@ -6,6 +6,7 @@ public interface IIntegrationMessage { IntegrationType IntegrationType { get; } string MessageId { get; set; } + string? OrganizationId { get; set; } int RetryCount { get; } DateTime? DelayUntilDate { get; } void ApplyRetry(DateTime? handlerDelayUntilDate); diff --git a/src/Core/AdminConsole/Models/Data/EventIntegrations/IntegrationMessage.cs b/src/Core/AdminConsole/Models/Data/EventIntegrations/IntegrationMessage.cs index 11a5229f8c..b0fc2161ba 100644 --- a/src/Core/AdminConsole/Models/Data/EventIntegrations/IntegrationMessage.cs +++ b/src/Core/AdminConsole/Models/Data/EventIntegrations/IntegrationMessage.cs @@ -7,6 +7,7 @@ public class IntegrationMessage : IIntegrationMessage { public IntegrationType IntegrationType { get; set; } public required string MessageId { get; set; } + public string? OrganizationId { get; set; } public required string RenderedTemplate { get; set; } public int RetryCount { get; set; } = 0; public DateTime? DelayUntilDate { get; set; } diff --git a/src/Core/AdminConsole/Services/IEventIntegrationPublisher.cs b/src/Core/AdminConsole/Services/IEventIntegrationPublisher.cs index b80b518223..4d95707e90 100644 --- a/src/Core/AdminConsole/Services/IEventIntegrationPublisher.cs +++ b/src/Core/AdminConsole/Services/IEventIntegrationPublisher.cs @@ -5,5 +5,5 @@ namespace Bit.Core.Services; public interface IEventIntegrationPublisher : IAsyncDisposable { Task PublishAsync(IIntegrationMessage message); - Task PublishEventAsync(string body); + Task PublishEventAsync(string body, string? organizationId); } diff --git a/src/Core/AdminConsole/Services/Implementations/EventIntegrations/AzureServiceBusService.cs b/src/Core/AdminConsole/Services/Implementations/EventIntegrations/AzureServiceBusService.cs index 4887aa3a7f..953a9bb56e 100644 --- a/src/Core/AdminConsole/Services/Implementations/EventIntegrations/AzureServiceBusService.cs +++ b/src/Core/AdminConsole/Services/Implementations/EventIntegrations/AzureServiceBusService.cs @@ -30,7 +30,8 @@ public class AzureServiceBusService : IAzureServiceBusService var serviceBusMessage = new ServiceBusMessage(json) { Subject = message.IntegrationType.ToRoutingKey(), - MessageId = message.MessageId + MessageId = message.MessageId, + PartitionKey = message.OrganizationId }; await _integrationSender.SendMessageAsync(serviceBusMessage); @@ -44,18 +45,20 @@ public class AzureServiceBusService : IAzureServiceBusService { Subject = message.IntegrationType.ToRoutingKey(), ScheduledEnqueueTime = message.DelayUntilDate ?? DateTime.UtcNow, - MessageId = message.MessageId + MessageId = message.MessageId, + PartitionKey = message.OrganizationId }; await _integrationSender.SendMessageAsync(serviceBusMessage); } - public async Task PublishEventAsync(string body) + public async Task PublishEventAsync(string body, string? organizationId) { var message = new ServiceBusMessage(body) { ContentType = "application/json", - MessageId = Guid.NewGuid().ToString() + MessageId = Guid.NewGuid().ToString(), + PartitionKey = organizationId }; await _eventSender.SendMessageAsync(message); diff --git a/src/Core/AdminConsole/Services/Implementations/EventIntegrations/EventIntegrationEventWriteService.cs b/src/Core/AdminConsole/Services/Implementations/EventIntegrations/EventIntegrationEventWriteService.cs index 309b4a8409..4ac97df763 100644 --- a/src/Core/AdminConsole/Services/Implementations/EventIntegrations/EventIntegrationEventWriteService.cs +++ b/src/Core/AdminConsole/Services/Implementations/EventIntegrations/EventIntegrationEventWriteService.cs @@ -14,15 +14,21 @@ public class EventIntegrationEventWriteService : IEventWriteService, IAsyncDispo public async Task CreateAsync(IEvent e) { var body = JsonSerializer.Serialize(e); - await _eventIntegrationPublisher.PublishEventAsync(body: body); + await _eventIntegrationPublisher.PublishEventAsync(body: body, organizationId: e.OrganizationId?.ToString()); } public async Task CreateManyAsync(IEnumerable events) { - var body = JsonSerializer.Serialize(events); - await _eventIntegrationPublisher.PublishEventAsync(body: body); - } + var eventList = events as IList ?? events.ToList(); + if (eventList.Count == 0) + { + return; + } + var organizationId = eventList[0].OrganizationId?.ToString(); + var body = JsonSerializer.Serialize(eventList); + await _eventIntegrationPublisher.PublishEventAsync(body: body, organizationId: organizationId); + } public async ValueTask DisposeAsync() { await _eventIntegrationPublisher.DisposeAsync(); diff --git a/src/Core/AdminConsole/Services/Implementations/EventIntegrations/EventIntegrationHandler.cs b/src/Core/AdminConsole/Services/Implementations/EventIntegrations/EventIntegrationHandler.cs index 0a8ab67554..8423652eb8 100644 --- a/src/Core/AdminConsole/Services/Implementations/EventIntegrations/EventIntegrationHandler.cs +++ b/src/Core/AdminConsole/Services/Implementations/EventIntegrations/EventIntegrationHandler.cs @@ -57,6 +57,7 @@ public class EventIntegrationHandler( { IntegrationType = integrationType, MessageId = messageId.ToString(), + OrganizationId = organizationId.ToString(), Configuration = config, RenderedTemplate = renderedTemplate, RetryCount = 0, diff --git a/src/Core/AdminConsole/Services/Implementations/EventIntegrations/RabbitMqService.cs b/src/Core/AdminConsole/Services/Implementations/EventIntegrations/RabbitMqService.cs index 3e20e34200..8976530cf4 100644 --- a/src/Core/AdminConsole/Services/Implementations/EventIntegrations/RabbitMqService.cs +++ b/src/Core/AdminConsole/Services/Implementations/EventIntegrations/RabbitMqService.cs @@ -122,7 +122,7 @@ public class RabbitMqService : IRabbitMqService body: body); } - public async Task PublishEventAsync(string body) + public async Task PublishEventAsync(string body, string? organizationId) { await using var channel = await CreateChannelAsync(); var properties = new BasicProperties diff --git a/test/Core.Test/AdminConsole/Models/Data/EventIntegrations/IntegrationMessageTests.cs b/test/Core.Test/AdminConsole/Models/Data/EventIntegrations/IntegrationMessageTests.cs index edd5cd488f..71f9a15037 100644 --- a/test/Core.Test/AdminConsole/Models/Data/EventIntegrations/IntegrationMessageTests.cs +++ b/test/Core.Test/AdminConsole/Models/Data/EventIntegrations/IntegrationMessageTests.cs @@ -8,6 +8,7 @@ namespace Bit.Core.Test.Models.Data.EventIntegrations; public class IntegrationMessageTests { private const string _messageId = "TestMessageId"; + private const string _organizationId = "TestOrganizationId"; [Fact] public void ApplyRetry_IncrementsRetryCountAndSetsDelayUntilDate() @@ -16,6 +17,7 @@ public class IntegrationMessageTests { Configuration = new WebhookIntegrationConfigurationDetails(new Uri("https://localhost"), "Bearer", "AUTH-TOKEN"), MessageId = _messageId, + OrganizationId = _organizationId, RetryCount = 2, RenderedTemplate = string.Empty, DelayUntilDate = null @@ -36,6 +38,7 @@ public class IntegrationMessageTests { Configuration = new WebhookIntegrationConfigurationDetails(new Uri("https://localhost"), "Bearer", "AUTH-TOKEN"), MessageId = _messageId, + OrganizationId = _organizationId, RenderedTemplate = "This is the message", IntegrationType = IntegrationType.Webhook, RetryCount = 2, @@ -48,6 +51,7 @@ public class IntegrationMessageTests Assert.NotNull(result); Assert.Equal(message.Configuration, result.Configuration); Assert.Equal(message.MessageId, result.MessageId); + Assert.Equal(message.OrganizationId, result.OrganizationId); Assert.Equal(message.RenderedTemplate, result.RenderedTemplate); Assert.Equal(message.IntegrationType, result.IntegrationType); Assert.Equal(message.RetryCount, result.RetryCount); @@ -67,6 +71,7 @@ public class IntegrationMessageTests var message = new IntegrationMessage { MessageId = _messageId, + OrganizationId = _organizationId, RenderedTemplate = "This is the message", IntegrationType = IntegrationType.Webhook, RetryCount = 2, @@ -77,6 +82,7 @@ public class IntegrationMessageTests var result = JsonSerializer.Deserialize(json); Assert.Equal(message.MessageId, result.MessageId); + Assert.Equal(message.OrganizationId, result.OrganizationId); Assert.Equal(message.RenderedTemplate, result.RenderedTemplate); Assert.Equal(message.IntegrationType, result.IntegrationType); Assert.Equal(message.RetryCount, result.RetryCount); diff --git a/test/Core.Test/AdminConsole/Services/EventIntegrationEventWriteServiceTests.cs b/test/Core.Test/AdminConsole/Services/EventIntegrationEventWriteServiceTests.cs index 9369690d86..03f9c7764d 100644 --- a/test/Core.Test/AdminConsole/Services/EventIntegrationEventWriteServiceTests.cs +++ b/test/Core.Test/AdminConsole/Services/EventIntegrationEventWriteServiceTests.cs @@ -22,18 +22,20 @@ public class EventIntegrationEventWriteServiceTests [Theory, BitAutoData] public async Task CreateAsync_EventPublishedToEventQueue(EventMessage eventMessage) { - var expected = JsonSerializer.Serialize(eventMessage); await Subject.CreateAsync(eventMessage); await _eventIntegrationPublisher.Received(1).PublishEventAsync( - Arg.Is(body => AssertJsonStringsMatch(eventMessage, body))); + body: Arg.Is(body => AssertJsonStringsMatch(eventMessage, body)), + organizationId: Arg.Is(orgId => eventMessage.OrganizationId.ToString().Equals(orgId))); } [Theory, BitAutoData] public async Task CreateManyAsync_EventsPublishedToEventQueue(IEnumerable eventMessages) { + var eventMessage = eventMessages.First(); await Subject.CreateManyAsync(eventMessages); await _eventIntegrationPublisher.Received(1).PublishEventAsync( - Arg.Is(body => AssertJsonStringsMatch(eventMessages, body))); + body: Arg.Is(body => AssertJsonStringsMatch(eventMessages, body)), + organizationId: Arg.Is(orgId => eventMessage.OrganizationId.ToString().Equals(orgId))); } private static bool AssertJsonStringsMatch(EventMessage expected, string body) diff --git a/test/Core.Test/AdminConsole/Services/EventIntegrationHandlerTests.cs b/test/Core.Test/AdminConsole/Services/EventIntegrationHandlerTests.cs index f038fe28ef..89207a9d3a 100644 --- a/test/Core.Test/AdminConsole/Services/EventIntegrationHandlerTests.cs +++ b/test/Core.Test/AdminConsole/Services/EventIntegrationHandlerTests.cs @@ -23,6 +23,7 @@ public class EventIntegrationHandlerTests private const string _templateWithOrganization = "Org: #OrganizationName#"; private const string _templateWithUser = "#UserName#, #UserEmail#"; private const string _templateWithActingUser = "#ActingUserName#, #ActingUserEmail#"; + private static readonly Guid _organizationId = Guid.NewGuid(); private static readonly Uri _uri = new Uri("https://localhost"); private static readonly Uri _uri2 = new Uri("https://example.com"); private readonly IEventIntegrationPublisher _eventIntegrationPublisher = Substitute.For(); @@ -50,6 +51,7 @@ public class EventIntegrationHandlerTests { IntegrationType = IntegrationType.Webhook, MessageId = "TestMessageId", + OrganizationId = _organizationId.ToString(), Configuration = new WebhookIntegrationConfigurationDetails(_uri), RenderedTemplate = template, RetryCount = 0, @@ -122,6 +124,7 @@ public class EventIntegrationHandlerTests public async Task HandleEventAsync_BaseTemplateOneConfiguration_PublishesIntegrationMessage(EventMessage eventMessage) { var sutProvider = GetSutProvider(OneConfiguration(_templateBase)); + eventMessage.OrganizationId = _organizationId; await sutProvider.Sut.HandleEventAsync(eventMessage); @@ -140,6 +143,7 @@ public class EventIntegrationHandlerTests public async Task HandleEventAsync_BaseTemplateTwoConfigurations_PublishesIntegrationMessages(EventMessage eventMessage) { var sutProvider = GetSutProvider(TwoConfigurations(_templateBase)); + eventMessage.OrganizationId = _organizationId; await sutProvider.Sut.HandleEventAsync(eventMessage); @@ -164,6 +168,7 @@ public class EventIntegrationHandlerTests var user = Substitute.For(); user.Email = "test@example.com"; user.Name = "Test"; + eventMessage.OrganizationId = _organizationId; sutProvider.GetDependency().GetByIdAsync(Arg.Any()).Returns(user); await sutProvider.Sut.HandleEventAsync(eventMessage); @@ -183,6 +188,7 @@ public class EventIntegrationHandlerTests var sutProvider = GetSutProvider(OneConfiguration(_templateWithOrganization)); var organization = Substitute.For(); organization.Name = "Test"; + eventMessage.OrganizationId = _organizationId; sutProvider.GetDependency().GetByIdAsync(Arg.Any()).Returns(organization); await sutProvider.Sut.HandleEventAsync(eventMessage); @@ -205,6 +211,7 @@ public class EventIntegrationHandlerTests var user = Substitute.For(); user.Email = "test@example.com"; user.Name = "Test"; + eventMessage.OrganizationId = _organizationId; sutProvider.GetDependency().GetByIdAsync(Arg.Any()).Returns(user); await sutProvider.Sut.HandleEventAsync(eventMessage); @@ -235,6 +242,7 @@ public class EventIntegrationHandlerTests var sutProvider = GetSutProvider(ValidFilterConfiguration()); sutProvider.GetDependency().EvaluateFilterGroup( Arg.Any(), Arg.Any()).Returns(true); + eventMessage.OrganizationId = _organizationId; await sutProvider.Sut.HandleEventAsync(eventMessage); @@ -284,7 +292,7 @@ public class EventIntegrationHandlerTests $"Date: {eventMessage.Date}, Type: {eventMessage.Type}, UserId: {eventMessage.UserId}" ); await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is( - AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" }))); + AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId", "OrganizationId" }))); } } @@ -301,12 +309,12 @@ public class EventIntegrationHandlerTests var expectedMessage = EventIntegrationHandlerTests.expectedMessage( $"Date: {eventMessage.Date}, Type: {eventMessage.Type}, UserId: {eventMessage.UserId}" ); - await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is( - AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" }))); + await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual( + expectedMessage, new[] { "MessageId", "OrganizationId" }))); expectedMessage.Configuration = new WebhookIntegrationConfigurationDetails(_uri2); - await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is( - AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" }))); + await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual( + expectedMessage, new[] { "MessageId", "OrganizationId" }))); } } } diff --git a/test/Core.Test/AdminConsole/Services/IntegrationHandlerTests.cs b/test/Core.Test/AdminConsole/Services/IntegrationHandlerTests.cs index aa93567538..f6f587cfd7 100644 --- a/test/Core.Test/AdminConsole/Services/IntegrationHandlerTests.cs +++ b/test/Core.Test/AdminConsole/Services/IntegrationHandlerTests.cs @@ -16,6 +16,7 @@ public class IntegrationHandlerTests { Configuration = new WebhookIntegrationConfigurationDetails(new Uri("https://localhost"), "Bearer", "AUTH-TOKEN"), MessageId = "TestMessageId", + OrganizationId = "TestOrganizationId", IntegrationType = IntegrationType.Webhook, RenderedTemplate = "Template", DelayUntilDate = null, @@ -25,6 +26,8 @@ public class IntegrationHandlerTests var result = await sut.HandleAsync(expected.ToJson()); var typedResult = Assert.IsType>(result.Message); + Assert.Equal(expected.MessageId, typedResult.MessageId); + Assert.Equal(expected.OrganizationId, typedResult.OrganizationId); Assert.Equal(expected.Configuration, typedResult.Configuration); Assert.Equal(expected.RenderedTemplate, typedResult.RenderedTemplate); Assert.Equal(expected.IntegrationType, typedResult.IntegrationType);