Publishing Protobuf Events to a Channel

The Universal Messaging C# client API supports publishing protobuf events to a store. For more information about using Google Protocol Buffers with Universal Messaging, see Google Protocol Buffers.

You can use the following example to publish protobuf events to a Universal Messaging channel. The example contains the following code samples:

  • A sample that publishes protobuf events to a channel, in which the event is filled with certain data that is then consumed and parsed on the subscriber side. The sample uses a predefined file named umTeam.proto.
  • The contents of the predefined file umTeam.proto that you must compile using the following command:
    protoc --descriptor_set_out=<path_to_file>/umTeam.descriptor <path_to_file>/umTeam.proto

    After you generate the umTeam.descriptor file, you must set it as a protobuf descriptor on the channel used in the sample below. You can do this while creating or editing the channel.

Sample that Publishes Protobuf Events

using com.pcbsys.nirvana.client;
using Google.ProtocolBuffers.Descriptors;
using Google.ProtocolBuffers;
using Google.ProtocolBuffers.DescriptorProtos;
using System;
using System.IO;

namespace ConsoleApplication1
{
    class ProtobufSample
    {
        public const String TEAM1 ="Team1";
        public const String TEAMNAME1 ="INDIA";
        public const String TEAM2 ="Team2";
        public const String TEAMNAME2 ="USA";
        public const String TEAM3 ="Team3";
        public const String TEAMNAME3 ="BULGARIA";
        public const String TEAMS="Teams";
        public const String TEAMLEAD1 ="John";
        public const String TEAMLEAD2="Richard";
        public const String TEAMLEAD3 ="Frank";
        public const String TEAMLEADFIELD="TeamLead";
        public const String _EMAIL="_email";
        public const String URL ="url";
        public const String PRODUCTNAME ="productName";
        public const String UM="Universal Messaging";
        public const String UMMESSAGE="UniversalMessaging";

        public static MessageDescriptor UMDescriptor;
        public static DynamicMessage.Builder buildTeamLead(String name, FileDescriptor desc, DynamicMessage employees){
            MessageDescriptor teamLeadDescriptor = desc.FindTypeByName<MessageDescriptor>(TEAMLEADFIELD);    
            return (DynamicMessage.Builder)DynamicMessage.CreateBuilder(teamLeadDescriptor)
                .SetField(teamLeadDescriptor.FindFieldByName("teamLeadName"), name)
                .SetField(teamLeadDescriptor.FindFieldByName("employees"), employees);
        }
        public static DynamicMessage.Builder buildTeam1(String teamName, String teamFieldName, FileDescriptor desc, 
            DynamicMessage.Builder teamLeadBuilder, DynamicMessage.Builder emailBuilder){

            MessageDescriptor teamDescriptor = desc.FindTypeByName<MessageDescriptor>(teamFieldName);

            return (DynamicMessage.Builder)DynamicMessage.CreateBuilder(teamDescriptor)
                .SetField(teamDescriptor.FindFieldByName("teamName"), teamName)
                .SetField(teamDescriptor.FindFieldByName("TeamLead"), teamLeadBuilder.Build())
                .SetField(teamDescriptor.FindFieldByName("_email"), emailBuilder.Build());
        }
        public static DynamicMessage.Builder buildTeam2(String teamName, String teamFieldName, FileDescriptor desc,
            DynamicMessage.Builder teamLeadBuilder, DynamicMessage.Builder emailBuilder)
        {

            MessageDescriptor teamDescriptor = desc.FindTypeByName<MessageDescriptor>(teamFieldName);

            return (DynamicMessage.Builder)DynamicMessage.CreateBuilder(teamDescriptor)
                .SetField(teamDescriptor.FindFieldByName("teamName"), teamName)
                .SetField(teamDescriptor.FindFieldByName("TeamLead"), teamLeadBuilder.Build())
                .SetField(teamDescriptor.FindFieldByName("_email"), emailBuilder.Build());
        }
        public static DynamicMessage.Builder buildTeam3(String teamName, String teamFieldName, FileDescriptor desc,
            DynamicMessage.Builder teamLeadBuilder, DynamicMessage.Builder emailBuilder)
        {

            MessageDescriptor teamDescriptor = desc.FindTypeByName<MessageDescriptor>(teamFieldName);

            return (DynamicMessage.Builder)DynamicMessage.CreateBuilder(teamDescriptor)
                .SetField(teamDescriptor.FindFieldByName("teamName"), teamName)
                .SetField(teamDescriptor.FindFieldByName("TeamLead"), teamLeadBuilder.Build())
                .SetField(teamDescriptor.FindFieldByName("_email"), emailBuilder.Build());
        }
        public static DynamicMessage.Builder buildUrl(String url, FileDescriptor desc)
        {
            MessageDescriptor urlDescriptor = desc.FindTypeByName<MessageDescriptor>(URL);

            return (DynamicMessage.Builder)DynamicMessage.CreateBuilder(urlDescriptor)
                .SetField(urlDescriptor.FindFieldByName(URL), url);
        }
        public static DynamicMessage.Builder buildEmail(DynamicMessage url, FileDescriptor desc)
        {
            MessageDescriptor emailDescriptor = desc.FindTypeByName<MessageDescriptor>(_EMAIL);

            return (DynamicMessage.Builder)DynamicMessage.CreateBuilder(emailDescriptor)
                .SetField(emailDescriptor.FindFieldByName(URL), url);
        }
        static void Main(string[] args)
        {
            String[] RNAME = { "nsp://localhost:11000" };
            nSessionAttributes nsa = new nSessionAttributes(RNAME);
            nSession sess = nSessionFactory.create(nsa);
            sess.init();

            String fileName = "umTeam.descriptor"; // Please adjust the correct path to the descriptor file
            byte[] bytes = File.ReadAllBytes(fileName);
            byte[][] descriptorBytes = new byte[1][];
            descriptorBytes[0] = bytes;

            nChannelAttributes nca = new nChannelAttributes("channel");
            nca.setProtobufDescriptors(descriptorBytes);
            nChannel chan = sess.createChannel(nca);

            FileDescriptorSet.Builder builder = FileDescriptorSet.CreateBuilder();
            builder.MergeFrom(bytes);
            FileDescriptor fileDescriptor = FileDescriptor.BuildFrom(builder.GetFile(0), new FileDescriptor[0]);

            MessageDescriptor messageEmployees = fileDescriptor.FindTypeByName<MessageDescriptor>("employees");
            DynamicMessage.Builder employeesBuilder = (DynamicMessage.Builder)DynamicMessage.CreateBuilder(messageEmployees)
                .SetField(messageEmployees.FindFieldByName("numberOfEmployees"), 5);
            DynamicMessage employees = employeesBuilder.Build();

            DynamicMessage.Builder builderTeamLead1 = ProtobufSample.buildTeamLead(TEAMLEAD1, fileDescriptor, employees);
            DynamicMessage urlTeam1 = ProtobufSample.buildUrl("teamIndia@softwareag.com", fileDescriptor).Build();
            DynamicMessage.Builder builderEmailTeam1 = ProtobufSample.buildEmail(urlTeam1, fileDescriptor);

            DynamicMessage.Builder builderTeamLead2 = ProtobufSample.buildTeamLead(TEAMLEAD2, fileDescriptor, employees);
            DynamicMessage urlTeam2 = ProtobufSample.buildUrl("teamUSA@softwareag.com", fileDescriptor).Build();
            DynamicMessage.Builder builderEmailTeam2 = ProtobufSample.buildEmail(urlTeam2, fileDescriptor);


            DynamicMessage.Builder builderTeamLead3 = ProtobufSample.buildTeamLead(TEAMLEAD3, fileDescriptor, employees);
            DynamicMessage urlTeam3 = ProtobufSample.buildUrl("teamBulgaria@softwareag.com", fileDescriptor).Build();
            DynamicMessage.Builder builderEmailTeam3 = ProtobufSample.buildEmail(urlTeam3, fileDescriptor);

            DynamicMessage.Builder builderTeam1 = ProtobufSample.buildTeam1(TEAMNAME1, TEAM1, fileDescriptor, builderTeamLead1,
            builderEmailTeam1);
            DynamicMessage.Builder builderTeam2 = ProtobufSample.buildTeam2(TEAMNAME2, TEAM2, fileDescriptor, builderTeamLead2,
            builderEmailTeam2);
            DynamicMessage.Builder builderTeam3 = ProtobufSample.buildTeam3(TEAMNAME3, TEAM3, fileDescriptor, builderTeamLead3,
            builderEmailTeam3);

            MessageDescriptor teamsDescriptor = fileDescriptor.FindTypeByName<MessageDescriptor>(TEAMS);
            DynamicMessage.Builder teamsBuilder = (DynamicMessage.Builder)DynamicMessage.CreateBuilder(teamsDescriptor)
                .SetField(teamsDescriptor.FindFieldByName(TEAM1), builderTeam1.Build())
                .SetField(teamsDescriptor.FindFieldByName(TEAM2), builderTeam2.Build())
                .SetField(teamsDescriptor.FindFieldByName(TEAM3), builderTeam3.Build());

            DynamicMessage urlUM = ProtobufSample.buildUrl("UniversalMessaging@softwareag.com", fileDescriptor).Build();
            DynamicMessage.Builder builderEmailUM = ProtobufSample.buildEmail(urlUM, fileDescriptor);
            UMDescriptor = fileDescriptor.FindTypeByName<MessageDescriptor>(UMMESSAGE);

            DynamicMessage.Builder UMMessageBuilder = (DynamicMessage.Builder)DynamicMessage.CreateBuilder(UMDescriptor)
                .SetField(UMDescriptor.FindFieldByName(TEAMS), teamsBuilder.Build())
                .SetField(UMDescriptor.FindFieldByName(PRODUCTNAME), UM)
                .SetField(UMDescriptor.FindFieldByName(_EMAIL), builderEmailUM.Build());

            DynamicMessage dynamicMessage = UMMessageBuilder.Build();

            Listener nested = new Listener(chan);

            nProtobufEvent pbe = new nProtobufEvent(dynamicMessage.ToByteArray(),
                UMMessageBuilder.DescriptorForType.Name);
            chan.publish(pbe);

            Console.ReadLine();
        }
        public class Listener : nEventListener
        {
            public Listener(nChannel channel)
            {
                channel.addSubscriber(this);
            }
            public void go(nConsumeEvent evt)
            {
                if (evt.getEventID() != -2)
                {
                    try
                    {
                        DynamicMessage eventReceived = DynamicMessage.ParseFrom(UMDescriptor, evt.getEventData());
                        Console.WriteLine("<<<<<<<<<<<<<<<" + UMDescriptor.FullName + ">>>>>>>>>>>>>>>");

                        foreach (var entry in eventReceived.AllFields)
                        {
                            Console.WriteLine("Name of the attribute: " + entry.Key.Name + "\n\n");
                            Console.WriteLine(entry.Value.ToString());
                            Console.WriteLine("============================");
                        }
                        evt.ack();
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine("Exception caught", e);
                    }
                }
            }
        }
    }
}

umTeam.proto File

message UniversalMessaging {
optional  Teams Teams = 1;
optional  string productName = 2;
optional  _email _email = 3;
}
message _email {
optional  url url = 1;
}
message url {
optional  string url = 1;
}
message Teams {
optional  Team1 Team1 = 1;
optional  Team2 Team2 = 2;
optional  Team3 Team3 = 3;
}
message Team1 {
optional  string teamName = 1;
optional  TeamLead TeamLead = 2;
optional  _email _email = 3;
}
message Team2 {
optional  string teamName = 1;
optional  TeamLead TeamLead = 2;
optional  _email _email = 3;
}
message Team3 {
optional  string teamName = 1;
optional  TeamLead TeamLead = 2;
optional  _email _email = 3;
}
message TeamLead {
optional  string teamLeadName = 1;
optional  employees employees = 2;
}
message employees {
optional  int32 numberOfEmployees = 1;
}