Skip to content

0.0.15 #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion HostingMessageDemo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ COPY ["Hydra4Net.HostingExtensions/Hydra4Net.HostingExtensions.csproj", "Hydra4N
COPY ["Hydra4NET/Hydra4NET.csproj", "Hydra4NET/"]
RUN dotnet restore "HostingMessageDemo/HostingMessageDemo.csproj"
COPY . .
WORKDIR "/src/HostingMessageDemo"
WORKDIR "HostingMessageDemo"
RUN dotnet build "HostingMessageDemo.csproj" -c Release -o /app/build

FROM build AS publish
Expand Down
54 changes: 31 additions & 23 deletions HostingMessageDemo/SampleMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@ namespace HostingMessageDemo
{
internal class SampleMessageHandler : HydraEventsHandler
{
private ILogger<SampleMessageHandler> _logger;

private string _mode = "";
private readonly Sender _sender;

public SampleMessageHandler(ILogger<SampleMessageHandler> logger, Sender sender, IHydra hydra)
public SampleMessageHandler(ILogger<SampleMessageHandler> logger, Sender sender, IHydra hydra) : base(logger)
{
_logger = logger;
_sender = sender;
SetValidateMode(hydra);
}
Expand All @@ -32,7 +29,7 @@ void SetValidateMode(IHydra hydra)
{
case Modes.Sender:
case Modes.Queuer:
_logger.LogInformation($"Configured as {_mode}");
Logger.LogInformation($"Configured as {_mode}");
break;
default:
throw new ArgumentOutOfRangeException("ServiceType", "Hydra config doesn't specify a valid ServiceType role");
Expand All @@ -43,7 +40,7 @@ public override async Task OnMessageReceived(IInboundMessage msg, IHydra hydra)
{
try
{
_logger.LogInformation($"Received message of type {msg.Type}");
Logger.LogInformation($"Received message of type {msg.Type}");
if (_mode == Modes.Sender && msg.ReceivedUMF != null)
{
await _sender.ProcessMessage(msg.Type, msg.ReceivedUMF);
Expand All @@ -62,7 +59,7 @@ public override async Task OnMessageReceived(IInboundMessage msg, IHydra hydra)
}
catch (Exception e)
{
_logger.LogError(e, "OnMessageReceived failed");
Logger.LogError(e, "OnMessageReceived failed");
}
}

Expand All @@ -72,16 +69,15 @@ public override async Task OnQueueMessageReceived(IInboundMessage msg, IHydra hy
return;
try
{
_logger.LogInformation($"Queuer: processing queued message from sender");
Logger.LogInformation($"Queuer: processing queued message from sender");
if (msg.Type == "queuer")
{
await HandleQueuerType(msg, hydra);
}

}
catch (Exception e)
{
_logger.LogError(e, "Queue handler failed");
Logger.LogError(e, "Queue handler failed");
}
}

Expand All @@ -100,20 +96,20 @@ private async Task HandleQueuerType(IInboundMessage msg, IHydra hydra)
Msg = $"Queuer: processed message containing {Msg} with ID of {Id}"
});
string json = sharedMessage.Serialize();
_logger.LogInformation($"Queuer: mark message: {msg.MessageJson}");
Logger.LogInformation($"Queuer: mark message: {msg.MessageJson}");
await hydra.MarkQueueMessageAsync(msg.MessageJson ?? "", true);
_logger.LogInformation($"Queuer: send json: {json}");
Logger.LogInformation($"Queuer: send json: {json}");
await hydra.SendMessageAsync(sharedMessage.To, json);
_logger.LogInformation($"Queuer: sent completion message back to sender");
Logger.LogInformation($"Queuer: sent completion message back to sender");
}
else
{
_logger.LogWarning("Queue Msg null: {0}", msg.MessageJson);
Logger.LogWarning("Queue Msg null: {0}", msg.MessageJson);
}
}
else
{
_logger.LogError("SharedMessage is null, body: {0}", msg.MessageJson);
Logger.LogError("SharedMessage is null, body: {0}", msg.MessageJson);
}
}

Expand All @@ -130,9 +126,8 @@ private async Task HandleRespondType(IInboundMessage msg, IHydra hydra)
Msg = $"Queuer: sending single response to {Msg} with ID of {Id}"
});
await hydra.SendMessageAsync(sharedMessage);
_logger.LogInformation($"Queuer: sent single response message back to sender");
Logger.LogInformation($"Queuer: sent single response message back to sender");
}

}

private async Task HandleResponseStreamType(IInboundMessage msg, IHydra hydra)
Expand All @@ -151,43 +146,56 @@ private async Task HandleResponseStreamType(IInboundMessage msg, IHydra hydra)
Msg = $"Queuer: sending response stream {i} to {Msg} with ID of {Id}"
});
await hydra.SendMessageAsync(sharedMessage);
_logger.LogInformation($"Queuer: sent response stream message back to sender");
Logger.LogInformation($"Queuer: sent response stream message back to sender");
}
IUMF<SharedMessageBody> completeMsg = hydra.CreateUMFResponse(sm!, "response-stream-complete", new SharedMessageBody()
{
Id = Id,
Msg = $"Queuer: sending complete response stream to {Msg} with ID of {Id}"
});
await hydra.SendMessageAsync(completeMsg);
_logger.LogInformation($"Queuer: sent response stream complete message back to sender");
Logger.LogInformation($"Queuer: sent response stream complete message back to sender");
}
}

#region Optional
public override Task BeforeInit(IHydra hydra)
{
_logger.LogInformation($"Hydra initialized");
Logger.LogInformation($"Hydra initialized");
return base.BeforeInit(hydra);
}

public override Task OnShutdown(IHydra hydra)
{
_logger.LogInformation($"Hydra shut down");
Logger.LogInformation($"Hydra shut down");
return base.OnShutdown(hydra);
}

public override Task OnInitError(IHydra hydra, Exception e)
{
_logger.LogCritical(e, "A fatal error occurred initializing Hydra");
Logger.LogCritical(e, "A fatal error occurred initializing Hydra");
return base.OnInitError(hydra, e);
}

public override Task OnDequeueError(IHydra hydra, Exception e)
{
_logger.LogWarning(e, "An error occurred while dequeueing Hydra");
//base class logs this (Error level) by default
return base.OnDequeueError(hydra, e);
}

public override Task OnInternalError(IHydra hydra, Exception e)
{
//base class logs this (Error level) by default
return base.OnInternalError(hydra, e);
}

//base class logs this (Debug level) by default
public override void OnDebugEvent(IHydra hydra, DebugEvent e)
{
//base class logs this (Error level) by default
base.OnDebugEvent(hydra, e);
}

#endregion Optional
}
}
6 changes: 6 additions & 0 deletions HostingMessageDemo/appsettings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"Microsoft.AspNetCore": "Warning"
}
},
"Hydra": {
"ServiceName": "sender-svcs",
"ServiceIP": "10.0.9.*",
Expand Down
2 changes: 1 addition & 1 deletion HostingMessageDemo/scripts/build.bat
Original file line number Diff line number Diff line change
@@ -1 +1 @@
docker build --force-rm -t hostingdemo -f Dockerfile ../..
docker build --force-rm -t hostingdemo -f ../Dockerfile ../..
20 changes: 12 additions & 8 deletions Hydra4NET/Config/HydraConfigObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@ public class HydraConfigObject
public Plugins? Plugins { get; set; }
public RedisConfig? Redis { get; set; }

/// <summary>
/// Configure whether to emit debug events from
/// </summary>
public bool EmitDebugEvents { get; set; } = true;

/// <summary>
/// The maximum length of the UMF message to emit in debug events. Larger UMFs will be truncated if value > 0.
/// </summary>
public int? EmitDebugMaxUmfLength { get; set; } = 2000;

public string GetRedisConnectionString()
{
if (Redis == null)
throw new NullReferenceException("Redis configuration is null");
//no default database in case the ConnectionMultiplexer is accessed outside hydra
string connectionString = Redis.GetRedisHost();
if (!string.IsNullOrWhiteSpace(Redis.Options))
{
connectionString = $"{connectionString},{Redis.Options}";
}
return connectionString;
return Redis.GetConnectionString();
}

/// <summary>
Expand Down Expand Up @@ -63,4 +67,4 @@ public class HydraLogger
public bool LogToConsole { get; set; }
public bool OnlyLogLocally { get; set; }
}
}
}
10 changes: 10 additions & 0 deletions Hydra4NET/Config/RedisConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,15 @@ public string GetRedisHost()
throw new ArgumentNullException(nameof(Host), "Host cannot be null or empty");
return $"{Host}:{Port ?? 6379}";
}
public string GetConnectionString()
{
//no default database in case the ConnectionMultiplexer is accessed outside hydra
string connectionString = GetRedisHost();
if (!string.IsNullOrWhiteSpace(Options))
{
connectionString = $"{connectionString},{Options}";
}
return connectionString;
}
}
}
20 changes: 20 additions & 0 deletions Hydra4NET/DebugEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace Hydra4NET
{
public enum DebugEventType
{
MessageReceived,
SendMessage,
QueueReceived,
SendQueue,
SendBroadcastMessage,
MarkQueueMessage,
Register
}

public class DebugEvent
{
public DebugEventType EventType { get; set; }
public string? UMF { get; set; }
public string Message { get; set; } = "";
}
}
32 changes: 24 additions & 8 deletions Hydra4NET/Helpers/StandardSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,24 @@ namespace Hydra4NET.Helpers
{
public class StandardSerializer
{
//caching these options improves performance
private static readonly JsonSerializerOptions DeserializeOptions = new JsonSerializerOptions()
{
PropertyNameCaseInsensitive = true
};

private static readonly JsonSerializerOptions SerializeOptions = new JsonSerializerOptions()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};

/// <summary>
/// A standardized JSON deserializer helper. This is essential as Hydra-based services written in non-Dotnet environments expect a universal format.
/// </summary>
/// <returns></returns>
static public U? Deserialize<U>(string message) where U : class
{
return JsonSerializer.Deserialize<U>(message, new JsonSerializerOptions()
{
PropertyNameCaseInsensitive = true
});
return JsonSerializer.Deserialize<U>(message, DeserializeOptions);
}

/// <summary>
Expand All @@ -22,10 +30,18 @@ public class StandardSerializer
/// <returns></returns>
static public string Serialize<T>(T item)
{
return JsonSerializer.Serialize(item, new JsonSerializerOptions()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
});
return JsonSerializer.Serialize(item, SerializeOptions);
}

/// <summary>
/// A standardized JSON serializer helper which ensures that the generated JSON is compatible with JavaScript camel case. Serializes directly to UTF8 bytes for better performance
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="item"></param>
/// <returns></returns>
static public byte[] SerializeBytes<T>(T item)
{
return JsonSerializer.SerializeToUtf8Bytes(item, SerializeOptions);
}
}
}
Loading