Publishing Protobuf Events to a Channel

The Universal Messaging Java client API supports publishing protobuf events to a store. For more information about using Google Protocol Buffers with Universal Messaging, see Google Protocol Buffers.

You can use the following example to publish protobuf events to a Universal Messaging channel. The example contains the following code samples:

  • A sample that publishes protobuf events to a channel, in which the event is filled with certain data that is then consumed and parsed on the subscriber side. The sample uses a predefined file named umTeam.proto.
  • The contents of the predefined file umTeam.proto that you must compile using the following command:
    protoc --descriptor_set_out=<path_to_file>/umTeam.descriptor <path_to_file>/umTeam.proto

    After you generate the umTeam.descriptor file, you must set it as a protobuf descriptor on the channel used in the sample below. You can do this while creating or editing the channel.

Sample that Publishes Protobuf Events

public class ProtobufSample {

  public static final String TEAM1="Team1";
  public static final String TEAMNAME1="INDIA";
  public static final String TEAM2="Team2";
  public static final String TEAMNAME2="USA";
  public static final String TEAM3="Team3";
  public static final String TEAMNAME3="BULGARIA";
  public static final String TEAMS="Teams";
  public static final String TEAMLEAD1="John";
  public static final String TEAMLEAD2="Richard";
  public static final String TEAMLEAD3="Frank";
  public static final String TEAMLEADFIELD="TeamLead";
  public static final String _EMAIL="_email";
  public static final String URL="url";
  public static final String PRODUCTNAME="productName";
  public static final String UM="Universal Messaging";
  public static final String UMMESSAGE="UniversalMessaging";

  public static DynamicMessage.Builder buildTeamLead(String name, FileDescriptor desc,
      DynamicMessage.Builder employees){

    Descriptor teamLeadDescriptor = desc.findMessageTypeByName(TEAMLEADFIELD);

    return DynamicMessage.newBuilder(teamLeadDescriptor)
        .setField(teamLeadDescriptor.findFieldByName("teamLeadName"), name)
        .setField(teamLeadDescriptor.findFieldByName("employees"), employees.build());
  }

  public static DynamicMessage.Builder buildTeam(String teamName, String teamFieldName, FileDescriptor desc,
      DynamicMessage.Builder teamLeadBuilder, DynamicMessage.Builder emailBuilder){

    Descriptor teamDescriptor = desc.findMessageTypeByName(teamFieldName);

    return DynamicMessage.newBuilder(teamDescriptor)
        .setField(teamDescriptor.findFieldByName("teamName"), teamName)
        .setField(teamDescriptor.findFieldByName("TeamLead"), teamLeadBuilder.build())
        .setField(teamDescriptor.findFieldByName("_email"), emailBuilder.build());
  }

  public static DynamicMessage.Builder buildUrl(String url, FileDescriptor desc){
    Descriptor urlDescriptor = desc.findMessageTypeByName(URL);

    return DynamicMessage.newBuilder(urlDescriptor)
        .setField(urlDescriptor.findFieldByName(URL), url);
  }

  public static DynamicMessage.Builder buildEmail(DynamicMessage.Builder urlBuilder, FileDescriptor desc){
    Descriptor emailDescriptor = desc.findMessageTypeByName(_EMAIL);

    return DynamicMessage.newBuilder(emailDescriptor)
        .setField(emailDescriptor.findFieldByName(URL), urlBuilder.build());
  }


  public static void main(String[] args) throws Exception {

    String[] RNAME = {"nsp://localhost:11000"};

    nSessionAttributes nsa = new nSessionAttributes(RNAME);
    nSession session = nSessionFactory.create(nsa);
    session.init();

    String[] fileNames = {"/build/change-management/test/protobuf/umTeam.descriptor"};

    FileDescriptorSet.Builder builder = DescriptorProtos.FileDescriptorSet.newBuilder();
    List<ProtobufDescriptor> descriptors = new ArrayList<>();
    for (String fileName : fileNames) {
      byte[] bytes = fFile.readAllBytes(new File(System.getProperty("PROJECT_ROOT_FOLDER", "..") + fileName));

      builder.mergeFrom(bytes);
      String[] elements = fileName.split("/");
      String descName = elements[elements.length - 1];
      ProtobufDescriptor desc = new ProtobufDescriptor(descName, bytes);
      descriptors.add(desc);
    }
    nChannelAttributes nca = new nChannelAttributes("channel");
    nca.setProtobufDescriptors(descriptors);

    nChannel chan = session.createChannel(nca);

    FileDescriptor fileDescriptor = FileDescriptor.buildFrom(builder.getFile(0), new FileDescriptor[0]);

    Descriptor messageEmployees = fileDescriptor.findMessageTypeByName("employees");
    DynamicMessage.Builder employeesBuilder = DynamicMessage.newBuilder(messageEmployees).
        setField(messageEmployees.findFieldByName("numberOfEmployees"), 5);

    DynamicMessage.Builder builderTeamLead1 = ProtobufSample.buildTeamLead(TEAMLEAD1, fileDescriptor, employeesBuilder);
    DynamicMessage.Builder builderURLTeam1 = ProtobufSample.buildUrl("teamIndia@softwareag.com", fileDescriptor);
    DynamicMessage.Builder builderEmailTeam1 = ProtobufSample.buildEmail(builderURLTeam1, fileDescriptor);

    DynamicMessage.Builder builderTeamLead2 = ProtobufSample.buildTeamLead(TEAMLEAD2, fileDescriptor, employeesBuilder);
    DynamicMessage.Builder builderURLTeam2 = ProtobufSample.buildUrl("teamUSA@softwareag.com", fileDescriptor);
    DynamicMessage.Builder builderEmailTeam2 = ProtobufSample.buildEmail(builderURLTeam2, fileDescriptor);

    DynamicMessage.Builder builderTeamLead3 = ProtobufSample.buildTeamLead(TEAMLEAD3, fileDescriptor, employeesBuilder);
    DynamicMessage.Builder builderURLTeam3 = ProtobufSample.buildUrl("teamBulgaria@softwareag.com", fileDescriptor);
    DynamicMessage.Builder builderEmailTeam3 = ProtobufSample.buildEmail(builderURLTeam3, fileDescriptor);

    DynamicMessage.Builder builderTeam1 = ProtobufSample.buildTeam(TEAMNAME1,TEAM1,fileDescriptor,builderTeamLead1,
        builderEmailTeam1);
    DynamicMessage.Builder builderTeam2 = ProtobufSample.buildTeam(TEAMNAME2,TEAM2,fileDescriptor,builderTeamLead2,
        builderEmailTeam2);
    DynamicMessage.Builder builderTeam3 = ProtobufSample.buildTeam(TEAMNAME3,TEAM3,fileDescriptor,builderTeamLead3,
        builderEmailTeam3);

    Descriptor teamsDescriptor = fileDescriptor.findMessageTypeByName(TEAMS);
    DynamicMessage.Builder teamsBuilder = DynamicMessage.newBuilder(teamsDescriptor)
        .setField(teamsDescriptor.findFieldByName(TEAM1), builderTeam1.build())
        .setField(teamsDescriptor.findFieldByName(TEAM2), builderTeam2.build())
        .setField(teamsDescriptor.findFieldByName(TEAM3), builderTeam3.build());

    DynamicMessage.Builder builderURLUM = ProtobufSample.buildUrl("UniversalMessaging@softwareag.com", fileDescriptor);
    DynamicMessage.Builder builderEmailUM = ProtobufSample.buildEmail(builderURLUM, fileDescriptor);
    Descriptor UMDescriptor = fileDescriptor.findMessageTypeByName(UMMESSAGE);

    DynamicMessage.Builder UMMessageBuilder = DynamicMessage.newBuilder(UMDescriptor)
        .setField(UMDescriptor.findFieldByName(TEAMS), teamsBuilder.build())
        .setField(UMDescriptor.findFieldByName(PRODUCTNAME), UM)
        .setField(UMDescriptor.findFieldByName(_EMAIL), builderEmailUM.build());

    DynamicMessage dynamicMessage = UMMessageBuilder.build();

    nEventListener nel = new nEventListener() {
      @Override
      public void go(nConsumeEvent evt) {
          if (evt.getEventID() != -2) {
            try {

              DynamicMessage eventReceived = DynamicMessage.parseFrom(UMDescriptor, evt.getEventData());
              System.out.println("<<<<<<<<<<<<<<<"+ UMDescriptor.getName() + ">>>>>>>>>>>>>>>");

              for (Entry<FieldDescriptor,Object> entry : eventReceived.getAllFields().entrySet()){
                System.out.println("Name of the attribute: " + entry.getKey().getName() + "\n\n");
                System.out.println(entry.getValue().toString());
                System.out.println("============================");
              }
              evt.ack();
            } catch (Exception e) {
              e.printStackTrace();
            }
        }
      }
    };

    chan.addSubscriber(nel);

    nProtobufEvent pbe = new nProtobufEvent(dynamicMessage.toByteArray(),
        UMMessageBuilder.getDescriptorForType().getName());
    chan.publish(pbe);
  }



}

umTeam.proto File

message UniversalMessaging {
optional  Teams Teams = 1;
optional  string productName = 2;
optional  _email _email = 3;
}
message _email {
optional  url url = 1;
}
message url {
optional  string url = 1;
}
message Teams {
optional  Team1 Team1 = 1;
optional  Team2 Team2 = 2;
optional  Team3 Team3 = 3;
}
message Team1 {
optional  string teamName = 1;
optional  TeamLead TeamLead = 2;
optional  _email _email = 3;
}
message Team2 {
optional  string teamName = 1;
optional  TeamLead TeamLead = 2;
optional  _email _email = 3;
}
message Team3 {
optional  string teamName = 1;
optional  TeamLead TeamLead = 2;
optional  _email _email = 3;
}
message TeamLead {
optional  string teamLeadName = 1;
optional  employees employees = 2;
}
message employees {
optional  int32 numberOfEmployees = 1;
}