using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; namespace ExternalExample { // Dependency: Add RabbitMQ.Client with NuGet. class Program { // Settings: static string _userName = "APIACCESS-98"; static string _apiKey = "YIWb1phYnSuucGcAxsoWrQSPqhdBy1xS"; static string _hostName = "oceandev.etrel.pri"; // STAGE: stage-mycharger.etrel.com, PROD: mycharger.etrel.com /* ChargePointConnectorStatusChanged "*.connector.statusChanged", ChargingSessionb2BBillingEnded "session.billing.b2b.ended", ChargingSessionDataManuallyChanged "session.manuallyChanged", ChargingSessionEnded "*.session.ended", ChargingSessionMeasurementChanged "*.session.measurementsChanged", ChargingSessionStarted "*.session.started", ChargingSessionUserBillingEnded "session.billing.user.ended", ChargingSessionVehicleIsFull "*.session.vehicleFull", HelpDeskTicketOperatorResponded "helpdesk.ticket.operatorResponded", HelpDeskTicketStatusChanged "helpdesk.ticket.statusChanged", HelpDeskTicketUserRaised "helpdesk.ticket.raisedByUser", InvoiceDocumentReady "invoice.user.ready", MaintenanceTaskCreated "maintenance.taskCreated", MaintenanceTaskStatusChanged "maintenance.taskStatusChange", ReservationCanceled "*.reservation.canceled", ReservationCreated "*.reservation.created", SystemEventRaised "*.*.*.system.event", UserAccountBlocked "user.account.blocked", UserPrepaymentAccountEmpty "user.account.empty", UserRegistrationPending "userRegistration.pending", ChargingSessionStartedUser "*.session.started.user", ChargingSessionEndedRegulary "*.session.ended.regulary", UserAccountUnblocked "user.account.unblocked", UserPayerAccountUnblocked "userPayer.account.unblocked", UserPayerAccountBlocked "userPayer.account.blocked", UserPayerAccountPaymentBlocked "userPayer.account.paymentBlocked", UserPayerAccountPaymentUnblocked "userPayer.account.paymentUnblocked", ReservationTimeChanged "*.reservation.timeChanged", ChargingSessionPreferenceChanged "*.session.preference.changed", */ // add olny queuename for message types operator is subscibed to static string[] _queuesWithoutPrefix = { "*.session.ended", "session.billing.b2b.ended", "*.connector.statusChanged", }; static IConnection _connection; static IModel _channel; static void Main(string[] args) { try { var factory = new ConnectionFactory() { HostName = _hostName, Port = 5671, UserName = _userName, Password = _apiKey, Ssl = new SslOption() { Enabled = true, ServerName = _hostName } }; _connection = factory.CreateConnection(_userName); _channel = _connection.CreateModel(); var consumer = new EventingBasicConsumer(_channel); consumer.Received += Received; foreach (var item in _queuesWithoutPrefix) { string queue = string.Format("{0}.{1}", _userName, item); _channel.BasicConsume(queue, true, consumer); } Console.WriteLine("Press ENTER to close ..."); Console.ReadLine(); } catch (Exception ex) { // Wrong UserName/Password can throw 'System.IO.IOException' in System.dll, Additional information: Unable to read data from the transport connection: An existing connection was forcibly closed by the remote host. Console.WriteLine(ex.ToString()); } finally { // must not disposed until consuming (using Received)! if (_channel != null) _channel.Dispose(); if (_connection != null) _connection.Dispose(); } } public static void Received(object sender, BasicDeliverEventArgs e) { try { var type = e.BasicProperties.Type; int versionMajor = 1; if (e.BasicProperties.Headers != null) { if (e.BasicProperties.Headers.ContainsKey("VersionMajor") && e.BasicProperties.Headers["VersionMajor"] != null) versionMajor = (int)e.BasicProperties.Headers["VersionMajor"]; } // More than one major version of same message can be sent. Consume only one version! Older will be deprecated in the future. // Major is changed only at breaking change: if existing property is changed. // When new field appears in the message, Minor version is changed. // Consumer (your code) must not break in that case! // When a new message type is sent, major version will start with 1 // In this example we consume only major version 1 for all message types if (versionMajor != 1) return; DateTime? sentUtcTimeAmqp1 = null; int? versionMinor = null; if (e.BasicProperties.Headers != null) { if (e.BasicProperties.Headers.ContainsKey("VersionMinor") && e.BasicProperties.Headers["VersionMinor"] != null) versionMinor = (int)e.BasicProperties.Headers["VersionMinor"]; if (e.BasicProperties.Headers.ContainsKey("Timestamp_AMQP_1.0") && e.BasicProperties.Headers["Timestamp_AMQP_1.0"] != null) { sentUtcTimeAmqp1 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).AddMilliseconds((long)e.BasicProperties.Headers["Timestamp_AMQP_1.0"]); } } DateTime? sentUtcTime = null; if (e.BasicProperties.Timestamp.UnixTime != 0) { // AMQP 0-9-1 spec mandates that the timestamp property is a value in seconds sentUtcTime = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).AddSeconds(e.BasicProperties.Timestamp.UnixTime); } var routingKey = e.RoutingKey; var body = e.Body; var jsonStr = Encoding.UTF8.GetString(body); Console.WriteLine("Received: type={0}, sentUtcTime={1}, versionMinor={2}, jsonMessage={3}", type, sentUtcTimeAmqp1 ?? sentUtcTime, versionMinor, jsonStr); } catch (Exception ex) { Console.WriteLine(ex.Message); } } } }