VOL-1598 Add new group API in OpenOLT Driver

A new API method called PerformGroupOperation is added. This method
performs the corresponding group operation according to the command
field of the Group message. The method currently assumes fixed queue
QoS model. FlowAdd_ and FlowRemove_ are also updated accordingly.

Change-Id: I355f20c3b5b5df484e90082936b10386b5fdf67f
diff --git a/agent/Makefile.in b/agent/Makefile.in
index 7d70bc5..061927c 100644
--- a/agent/Makefile.in
+++ b/agent/Makefile.in
@@ -42,7 +42,7 @@
 # This specifies the GIT tag in https://github.com/opencord/voltha-protos
 # repo that we need to refer to, to pick the right version of
 # openolt.proto and tech_profile.proto
-OPENOLT_PROTO_VER ?= v1.0.3
+OPENOLT_PROTO_VER ?= v3.1.0
 
 #
 # Build directory
diff --git a/agent/common/core.h b/agent/common/core.h
index 4e79d2c..1b2b3dd 100644
--- a/agent/common/core.h
+++ b/agent/common/core.h
@@ -71,7 +71,8 @@
     ACTION_O_PBITS = 23,
     ACTION_I_VID = 24,
     ACTION_I_PBITS = 25,
-    STATE = 26
+    STATE = 26,
+    GROUP_ID = 27
 };
 
 enum AllocCfgAction {
@@ -122,7 +123,8 @@
                 uint32_t flow_id, const std::string flow_type,
                 int32_t alloc_id, int32_t network_intf_id,
                 int32_t gemport_id, const ::openolt::Classifier& classifier,
-                const ::openolt::Action& action, int32_t priority_value, uint64_t cookie);
+                const ::openolt::Action& action, int32_t priority_value,
+                uint64_t cookie, int32_t group_id);
 Status FlowRemove_(uint32_t flow_id, const std::string flow_type);
 Status Disable_();
 Status Reenable_();
@@ -131,6 +133,7 @@
 Status RemoveTrafficSchedulers_(const tech_profile::TrafficSchedulers *traffic_scheds);
 Status CreateTrafficQueues_(const tech_profile::TrafficQueues *traffic_queues);
 Status RemoveTrafficQueues_(const tech_profile::TrafficQueues *traffic_queues);
+Status PerformGroupOperation_(const openolt::Group *group_cfg);
 uint32_t GetPortNum_(uint32_t flow_id);
 int get_status_bcm_cli_quit(void);
 uint16_t get_dev_id(void);
diff --git a/agent/common/server.cc b/agent/common/server.cc
index 1c67137..e33938b 100644
--- a/agent/common/server.cc
+++ b/agent/common/server.cc
@@ -140,7 +140,8 @@
             request->classifier(),
             request->action(),
             request->priority(),
-            request->cookie());
+            request->cookie(),
+            request->group_id());
     }
 
     Status FlowRemove(
@@ -300,6 +301,12 @@
         return Status::OK;
     };
 
+    Status PerformGroupOperation(
+            ServerContext* context,
+            const openolt::Group* request,
+            openolt::Empty* response) override {
+        return PerformGroupOperation_(request);
+    };
 };
 
 void RunServer() {
diff --git a/agent/src/core.cc b/agent/src/core.cc
index 247ecd5..7168fcd 100644
--- a/agent/src/core.cc
+++ b/agent/src/core.cc
@@ -54,12 +54,10 @@
 // #include <bcm_dev_log_task.h>
 }
 
-
 dev_log_id openolt_log_id = bcm_dev_log_id_register("OPENOLT", DEV_LOG_LEVEL_INFO, DEV_LOG_ID_TYPE_BOTH);
 dev_log_id omci_log_id = bcm_dev_log_id_register("OMCI", DEV_LOG_LEVEL_INFO, DEV_LOG_ID_TYPE_BOTH);
 
 #define BAL_RSC_MANAGER_BASE_TM_SCHED_ID 16384
-#define MAX_TM_QUEUE_ID 8192
 #define MAX_TM_QMP_ID 16
 #define TMQ_MAP_PROFILE_SIZE 8
 #define MAX_TM_SCHED_ID 1023
@@ -67,6 +65,8 @@
 #define EAP_ETHER_TYPE 34958
 #define XGS_BANDWIDTH_GRANULARITY 16000
 #define GPON_BANDWIDTH_GRANULARITY 32000
+#define NUM_OF_PRIORITIES 8
+#define NUMBER_OF_DEFAULT_INTERFACE_QUEUES 4 // <= NUM_OF_PRIORITIES
 #define FILL_ARRAY(ARRAY,START,END,VALUE) for(int i=START;i<END;ARRAY[i++]=VALUE);
 #define COUNT_OF(array) (sizeof(array) / sizeof(array[0]))
 
@@ -101,6 +101,7 @@
 
 const std::string upstream = "upstream";
 const std::string downstream = "downstream";
+const std::string multicast = "multicast";
 bcmolt_oltid dev_id = 0;
 
 /* Constants used for retrying some BAL APIs */
@@ -837,8 +838,7 @@
     bcmos_task_parm bal_cli_task_p_dummy;
 
     /* Switch to interactive mode if not stopped in the init script */
-    if (!bcmcli_is_stopped(sess))
-    {       
+    if (!bcmcli_is_stopped(sess)) {
         /* Force a CLI command prompt
          * The string passed into the parse function
          * must be modifiable, so a string constant like
@@ -849,7 +849,7 @@
 
         /* Process user input until EOF or quit command */
         bcmcli_driver(sess);
-    };      
+    }
     OPENOLT_LOG(INFO, openolt_log_id, "BAL API End CLI terminated\n");
 
     /* Cleanup */
@@ -894,16 +894,14 @@
     bcmos_task_parm bal_cli_task_p_dummy;
 
     /** before creating the task, check if it is already created by the other half of BAL i.e. Core side */
-    if (BCM_ERR_OK != bcmos_task_query(&bal_cli_thread, &bal_cli_task_p_dummy))
-    {
+    if (BCM_ERR_OK != bcmos_task_query(&bal_cli_thread, &bal_cli_task_p_dummy)) {
         /* Create BAL CLI thread */
         bal_cli_task_p.name = bal_cli_thread_name;
         bal_cli_task_p.handler = _bal_apiend_cli_thread_handler;
         bal_cli_task_p.priority = TASK_PRIORITY_CLI;
 
         ret = bcmos_task_create(&bal_cli_thread, &bal_cli_task_p);
-        if (BCM_ERR_OK != ret)
-        {
+        if (BCM_ERR_OK != ret) {
             bcmos_printf("Couldn't create BAL API end CLI thread\n");
             return ret;
         }
@@ -952,8 +950,9 @@
 
         //check BCM daemon is connected or not
         Status status = check_connection();
-        if (!status.ok())
+        if (!status.ok()) {
             return status;
+        }
         else {
             Status status = SubscribeIndication();
             if (!status.ok()) {
@@ -965,8 +964,9 @@
 
             //check BAL state in initial stage
             status = check_bal_ready();
-            if (!status.ok())
+            if (!status.ok()) {
                 return status;
+            }
         }
 
         {
@@ -1045,7 +1045,7 @@
         oltIndQ.push(ind);
         return Status::OK;
     }
-    if (failedCount ==NumPonIf_()){
+    if (failedCount ==NumPonIf_()) {
         return grpc::Status(grpc::StatusCode::INTERNAL, "failed to disable olt ,all the PON ports are still in enabled state");
     }
 
@@ -1062,7 +1062,7 @@
             BCM_LOG(ERROR, openolt_log_id, "Failed to enable PON interface: %d\n", i);
         }
     }
-    if (failedCount == 0){
+    if (failedCount == 0) {
         state.activate();
         openolt::Indication ind;
         openolt::OltIndication* olt_ind = new openolt::OltIndication;
@@ -1072,7 +1072,7 @@
         oltIndQ.push(ind);
         return Status::OK;
     }
-    if (failedCount ==NumPonIf_()){
+    if (failedCount ==NumPonIf_()) {
         return grpc::Status(grpc::StatusCode::INTERNAL, "failed to re-enable olt ,all the PON ports are still in disabled state");
     }
     return grpc::Status(grpc::StatusCode::UNKNOWN, "failed to re-enable olt ,few PON ports are still in disabled state");
@@ -1452,6 +1452,14 @@
                 return err;
             }
             return flow_cfg.data.state;
+        case GROUP_ID:
+            BCMOLT_FIELD_SET_PRESENT(&flow_cfg.data, flow_cfg_data, group_id);
+            err = bcmolt_cfg_get(dev_id, &flow_cfg.hdr);
+            if (err) {
+                OPENOLT_LOG(ERROR, openolt_log_id, "Failed to get group_id, err = %s\n",bcmos_strerror(err));
+                return err;
+            }
+            return flow_cfg.data.group_id;
         default:
             return BCM_ERR_INTERNAL;
     }
@@ -1625,7 +1633,7 @@
         bcmos_usleep(500000);
     }
 
-    /* If all the devices returned errors then we tell the caller that this is an error else we work with 
+    /* If all the devices returned errors then we tell the caller that this is an error else we work with
        only the devices that retured success*/
     if (num_failed_cfg_gets == BCM_MAX_DEVS_PER_LINE_CARD) {
         OPENOLT_LOG(ERROR, openolt_log_id, "device: Query of all the devices failed\n");
@@ -1659,22 +1667,22 @@
             if (board_technology == "XGS-PON") {
                 switch(interface_obj.data.xgpon_trx.transceiver_type) {
                     case BCMOLT_XGPON_TRX_TYPE_LTH_7222_PC:
-                    case BCMOLT_XGPON_TRX_TYPE_WTD_RTXM266_702: 
-                    case BCMOLT_XGPON_TRX_TYPE_LTH_7222_BC_PLUS: 
+                    case BCMOLT_XGPON_TRX_TYPE_WTD_RTXM266_702:
+                    case BCMOLT_XGPON_TRX_TYPE_LTH_7222_BC_PLUS:
                     case BCMOLT_XGPON_TRX_TYPE_LTH_7226_PC:
-                    case BCMOLT_XGPON_TRX_TYPE_LTH_5302_PC: 
-                    case BCMOLT_XGPON_TRX_TYPE_LTH_7226_A_PC_PLUS: 
-                    case BCMOLT_XGPON_TRX_TYPE_D272RR_SSCB_DM: 
+                    case BCMOLT_XGPON_TRX_TYPE_LTH_5302_PC:
+                    case BCMOLT_XGPON_TRX_TYPE_LTH_7226_A_PC_PLUS:
+                    case BCMOLT_XGPON_TRX_TYPE_D272RR_SSCB_DM:
                         intf_technologies[intf_id] = "XGS-PON";
                         break;
                 }
             } else if (board_technology == "GPON") {
                 switch(interface_obj.data.gpon_trx.transceiver_type) {
-                    case BCMOLT_TRX_TYPE_SPS_43_48_H_HP_CDE_SD_2013: 
+                    case BCMOLT_TRX_TYPE_SPS_43_48_H_HP_CDE_SD_2013:
                     case BCMOLT_TRX_TYPE_LTE_3680_M:
                     case BCMOLT_TRX_TYPE_SOURCE_PHOTONICS:
                     case BCMOLT_TRX_TYPE_LTE_3680_P_TYPE_C_PLUS:
-                    case BCMOLT_TRX_TYPE_LTE_3680_P_BC: 
+                    case BCMOLT_TRX_TYPE_LTE_3680_P_BC:
                         intf_technologies[intf_id] = "GPON";
                         break;
                 }
@@ -1717,7 +1725,7 @@
 }
 
 Status SetStateUplinkIf_(uint32_t intf_id, bool set_state) {
-    bcmos_errno err = BCM_ERR_OK; 
+    bcmos_errno err = BCM_ERR_OK;
     bcmolt_nni_interface_key intf_key = {.id = (bcmolt_interface)intf_id};
     bcmolt_nni_interface_set_nni_state nni_interface_set_state;
     bcmolt_interface_state state;
@@ -2191,8 +2199,10 @@
         a_val.i_vid , get_flow_status(flow_index, flow_id_data[flowid][1], ACTION_I_VID)); \
     OPENOLT_LOG(INFO, openolt_log_id, "action o_pbits (%d %lu)\n", \
         a_val.o_pbits , get_flow_status(flow_index, flow_id_data[flowid][1], ACTION_O_PBITS)); \
-    OPENOLT_LOG(INFO, openolt_log_id, "action i_pbits (%d %lu)\n\n", \
+   OPENOLT_LOG(INFO, openolt_log_id, "action i_pbits (%d %lu)\n", \
         a_val.i_pbits, get_flow_status(flow_index, flow_id_data[flowid][1], ACTION_I_PBITS)); \
+    OPENOLT_LOG(INFO, openolt_log_id, "group_id (%d %lu)\n\n", \
+        cfg.data.group_id, get_flow_status(flow_index, flow_id_data[flowid][1], GROUP_ID)); \
     } while(0)
 
 #define FLOW_CHECKER
@@ -2202,7 +2212,8 @@
                 uint32_t flow_id, const std::string flow_type,
                 int32_t alloc_id, int32_t network_intf_id,
                 int32_t gemport_id, const ::openolt::Classifier& classifier,
-                const ::openolt::Action& action, int32_t priority_value, uint64_t cookie) {
+                const ::openolt::Action& action, int32_t priority_value, uint64_t cookie,
+                int32_t group_id) {
     bcmolt_flow_cfg cfg;
     bcmolt_flow_key key = { }; /**< Object key. */
     int32_t o_vid = -1;
@@ -2219,6 +2230,8 @@
         key.flow_type = BCMOLT_FLOW_TYPE_UPSTREAM;
     } else if (flow_type.compare(downstream) == 0) {
         key.flow_type = BCMOLT_FLOW_TYPE_DOWNSTREAM;
+    } else if (flow_type.compare(multicast) == 0) {
+        key.flow_type = BCMOLT_FLOW_TYPE_MULTICAST;
     } else {
         OPENOLT_LOG(ERROR, openolt_log_id, "Invalid flow type %s\n", flow_type.c_str());
         return bcm_to_grpc_err(BCM_ERR_PARM, "Invalid flow type");
@@ -2227,54 +2240,65 @@
     BCMOLT_CFG_INIT(&cfg, flow, key);
     BCMOLT_MSG_FIELD_SET(&cfg, cookie, cookie);
 
-    if (access_intf_id >= 0 && network_intf_id >= 0) {
-        if (key.flow_type == BCMOLT_FLOW_TYPE_UPSTREAM) { //upstream
-            BCMOLT_MSG_FIELD_SET(&cfg, ingress_intf.intf_type, BCMOLT_FLOW_INTERFACE_TYPE_PON);
-            BCMOLT_MSG_FIELD_SET(&cfg, ingress_intf.intf_id, access_intf_id);
-            if (classifier.eth_type() == EAP_ETHER_TYPE || //EAPOL packet
-               (classifier.ip_proto() == 17 && classifier.src_port() == 68 && classifier.dst_port() == 67)) { //DHCP packet
-                BCMOLT_MSG_FIELD_SET(&cfg, egress_intf.intf_type, BCMOLT_FLOW_INTERFACE_TYPE_HOST);
-            } else {
-                BCMOLT_MSG_FIELD_SET(&cfg, egress_intf.intf_type, BCMOLT_FLOW_INTERFACE_TYPE_NNI);
-                BCMOLT_MSG_FIELD_SET(&cfg, egress_intf.intf_id, network_intf_id);
+
+    if (key.flow_type != BCMOLT_FLOW_TYPE_MULTICAST) {
+
+        if (access_intf_id >= 0 && network_intf_id >= 0) {
+            if (key.flow_type == BCMOLT_FLOW_TYPE_UPSTREAM) { //upstream
+                BCMOLT_MSG_FIELD_SET(&cfg, ingress_intf.intf_type, BCMOLT_FLOW_INTERFACE_TYPE_PON);
+                BCMOLT_MSG_FIELD_SET(&cfg, ingress_intf.intf_id, access_intf_id);
+                if (classifier.eth_type() == EAP_ETHER_TYPE || //EAPOL packet
+                    classifier.ip_proto() == 2 || // IGMP packet
+                (classifier.ip_proto() == 17 && classifier.src_port() == 68 && classifier.dst_port() == 67)) { //DHCP packet
+                    BCMOLT_MSG_FIELD_SET(&cfg, egress_intf.intf_type, BCMOLT_FLOW_INTERFACE_TYPE_HOST);
+                } else {
+                    BCMOLT_MSG_FIELD_SET(&cfg, egress_intf.intf_type, BCMOLT_FLOW_INTERFACE_TYPE_NNI);
+                    BCMOLT_MSG_FIELD_SET(&cfg, egress_intf.intf_id, network_intf_id);
+                }
+            } else if (key.flow_type == BCMOLT_FLOW_TYPE_DOWNSTREAM) { //downstream
+                BCMOLT_MSG_FIELD_SET(&cfg, ingress_intf.intf_type, BCMOLT_FLOW_INTERFACE_TYPE_NNI);
+                BCMOLT_MSG_FIELD_SET(&cfg, ingress_intf.intf_id, network_intf_id);
+                BCMOLT_MSG_FIELD_SET(&cfg, egress_intf.intf_type, BCMOLT_FLOW_INTERFACE_TYPE_PON);
+                BCMOLT_MSG_FIELD_SET(&cfg, egress_intf.intf_id, access_intf_id);
             }
-        } else if (key.flow_type == BCMOLT_FLOW_TYPE_DOWNSTREAM) { //downstream
-            BCMOLT_MSG_FIELD_SET(&cfg, ingress_intf.intf_type, BCMOLT_FLOW_INTERFACE_TYPE_NNI);
-            BCMOLT_MSG_FIELD_SET(&cfg, ingress_intf.intf_id, network_intf_id);
-            BCMOLT_MSG_FIELD_SET(&cfg, egress_intf.intf_type, BCMOLT_FLOW_INTERFACE_TYPE_PON);
-            BCMOLT_MSG_FIELD_SET(&cfg, egress_intf.intf_id, access_intf_id);
+        } else if (access_intf_id < 0 ) {
+                // This is the case for packet trap from NNI flow.
+                BCMOLT_MSG_FIELD_SET(&cfg, ingress_intf.intf_type, BCMOLT_FLOW_INTERFACE_TYPE_NNI);
+                BCMOLT_MSG_FIELD_SET(&cfg, ingress_intf.intf_id, network_intf_id);
+                BCMOLT_MSG_FIELD_SET(&cfg, egress_intf.intf_type, BCMOLT_FLOW_INTERFACE_TYPE_HOST);
+        } else {
+            OPENOLT_LOG(ERROR, openolt_log_id, "flow network setting invalid\n");
+            return bcm_to_grpc_err(BCM_ERR_PARM, "flow network setting invalid");
         }
-    } else if (access_intf_id < 0 ) {
-            // This is the case for packet trap from NNI flow.
-            BCMOLT_MSG_FIELD_SET(&cfg, ingress_intf.intf_type, BCMOLT_FLOW_INTERFACE_TYPE_NNI);
-            BCMOLT_MSG_FIELD_SET(&cfg, ingress_intf.intf_id, network_intf_id);
-            BCMOLT_MSG_FIELD_SET(&cfg, egress_intf.intf_type, BCMOLT_FLOW_INTERFACE_TYPE_HOST);
-    } else {
-        OPENOLT_LOG(ERROR, openolt_log_id, "flow network setting invalid\n");
-        return bcm_to_grpc_err(BCM_ERR_PARM, "flow network setting invalid");
-    }
 
+        if (onu_id >= 0) {
+            BCMOLT_MSG_FIELD_SET(&cfg, onu_id, onu_id);
+        }
+        if (gemport_id >= 0) {
+            BCMOLT_MSG_FIELD_SET(&cfg, svc_port_id, gemport_id);
+        }
+        if (gemport_id >= 0 && port_no != 0) {
+            bcmos_fastlock_lock(&data_lock);
+            if (key.flow_type == BCMOLT_FLOW_TYPE_DOWNSTREAM) {
+                port_to_flows[port_no].insert(key.flow_id);
+                flowid_to_gemport[key.flow_id] = gemport_id;
+            }
+            else
+            {
+                flowid_to_port[key.flow_id] = port_no;
+            }
+            bcmos_fastlock_unlock(&data_lock, 0);
+        }
+        if (priority_value >= 0) {
+            BCMOLT_MSG_FIELD_SET(&cfg, priority, priority_value);
+        }
 
-    if (onu_id >= 0) {
-        BCMOLT_MSG_FIELD_SET(&cfg, onu_id, onu_id);
-    }
-    if (gemport_id >= 0) {
-        BCMOLT_MSG_FIELD_SET(&cfg, svc_port_id, gemport_id);
-    }
-    if (gemport_id >= 0 && port_no != 0) {
-        bcmos_fastlock_lock(&data_lock);
-        if (key.flow_type == BCMOLT_FLOW_TYPE_DOWNSTREAM) {
-            port_to_flows[port_no].insert(key.flow_id);
-            flowid_to_gemport[key.flow_id] = gemport_id;
+    } else { // MULTICAST FLOW
+        if (group_id >= 0) {
+            BCMOLT_MSG_FIELD_SET(&cfg, group_id, group_id);
         }
-        else
-        {
-            flowid_to_port[key.flow_id] = port_no;
-        }
-        bcmos_fastlock_unlock(&data_lock, 0);
-    }
-    if (priority_value >= 0) {
-        BCMOLT_MSG_FIELD_SET(&cfg, priority, priority_value);
+        BCMOLT_MSG_FIELD_SET(&cfg, ingress_intf.intf_type, BCMOLT_FLOW_INTERFACE_TYPE_NNI);
+        BCMOLT_MSG_FIELD_SET(&cfg, ingress_intf.intf_id, network_intf_id);
     }
 
     {
@@ -2297,11 +2321,16 @@
             BCMOLT_FIELD_SET(&c_val, classifier, ether_type, classifier.eth_type());
         }
 
-        /*
-        if (classifier.dst_mac()) {
-            BCMBAL_ATTRIBUTE_PROP_SET(&val, classifier, dst_mac, classifier.dst_mac());
+        if (classifier.dst_mac().size() > 0) {
+            bcmos_mac_address d_mac = {};
+            bcmos_mac_address_init(&d_mac);
+            memcpy(d_mac.u8, classifier.dst_mac().data(), sizeof(d_mac.u8));
+            OPENOLT_LOG(DEBUG, openolt_log_id, "classify dst_mac %02x:%02x:%02x:%02x:%02x:%02x\n", d_mac.u8[0],
+                        d_mac.u8[1], d_mac.u8[2], d_mac.u8[3], d_mac.u8[4], d_mac.u8[5]);
+            BCMOLT_FIELD_SET(&c_val, classifier, dst_mac, d_mac);
         }
 
+        /*
         if (classifier.src_mac()) {
             BCMBAL_ATTRIBUTE_PROP_SET(&val, classifier, src_mac, classifier.src_mac());
         }
@@ -2524,9 +2553,14 @@
     }
 
     BCMOLT_MSG_FIELD_SET(&cfg, state, BCMOLT_FLOW_STATE_ENABLE);
-    BCMOLT_MSG_FIELD_SET(&cfg, statistics, BCMOLT_CONTROL_STATE_ENABLE);
+
+    // BAL 3.1 supports statistics only for unicast flows.
+    if (key.flow_type != BCMOLT_FLOW_TYPE_MULTICAST) {
+        BCMOLT_MSG_FIELD_SET(&cfg, statistics, BCMOLT_CONTROL_STATE_ENABLE);
+    }
+
 #ifdef FLOW_CHECKER
-    //Flow Checker, To avoid duplicate flow. 
+    //Flow Checker, To avoid duplicate flow.
     if (flow_id_counters != 0) {
         bool b_duplicate_flow = false;
         for (int flowid=0; flowid < flow_id_counters; flowid++) {
@@ -2557,7 +2591,8 @@
                 (a_val.i_vid == get_flow_status(flow_index, flow_id_data[flowid][1], ACTION_I_VID)) && \
                 (a_val.o_pbits == get_flow_status(flow_index, flow_id_data[flowid][1], ACTION_O_PBITS)) && \
                 (a_val.i_pbits == get_flow_status(flow_index, flow_id_data[flowid][1], ACTION_I_PBITS)) && \
-                (cfg.data.state == get_flow_status(flowid, flow_id_data[flowid][1], STATE)); 
+                (cfg.data.state == get_flow_status(flowid, flow_id_data[flowid][1], STATE)) && \
+                (cfg.data.group_id == get_flow_status(flowid, flow_id_data[flowid][1], GROUP_ID));
 #ifdef SHOW_FLOW_PARAM
             // Flow Parameter
             FLOW_PARAM_LOG();
@@ -2598,6 +2633,8 @@
         key.flow_type = BCMOLT_FLOW_TYPE_UPSTREAM;
     } else if (flow_type.compare(downstream) == 0) {
         key.flow_type = BCMOLT_FLOW_TYPE_DOWNSTREAM;
+    } else if(flow_type.compare(multicast) == 0) {
+        key.flow_type = BCMOLT_FLOW_TYPE_MULTICAST;
     } else {
         OPENOLT_LOG(WARNING, openolt_log_id, "Invalid flow type %s\n", flow_type.c_str());
         return bcm_to_grpc_err(BCM_ERR_PARM, "Invalid flow type");
@@ -2629,8 +2666,8 @@
         if (flow_id_data[flowid][0] == flow_id && flow_id_data[flowid][1] == key.flow_type) {
             flow_id_counters -= 1;
             for (int i=flowid; i < flow_id_counters; i++) {
-                flow_id_data[i][0] = flow_id_data[i + 1][0]; 
-                flow_id_data[i][1] = flow_id_data[i + 1][1]; 
+                flow_id_data[i][0] = flow_id_data[i + 1][0];
+                flow_id_data[i][1] = flow_id_data[i + 1][1];
             }
             break;
         }
@@ -2690,7 +2727,7 @@
     BCMOLT_MSG_FIELD_SET(&tm_sched_cfg, sched_type, BCMOLT_TM_SCHED_TYPE_SP);
 
     // num_priorities: Max number of strict priority scheduling elements
-    BCMOLT_MSG_FIELD_SET(&tm_sched_cfg, num_priorities, 8);
+    BCMOLT_MSG_FIELD_SET(&tm_sched_cfg, num_priorities, NUM_OF_PRIORITIES);
 
     err = bcmolt_cfg_set(dev_id, &tm_sched_cfg.hdr);
     if (err) {
@@ -2726,7 +2763,7 @@
         // The parent for the sub_term scheduler is the PON scheduler in the downstream
         BCMOLT_MSG_FIELD_SET(&tm_sched_cfg, attachment_point.u.tm_sched.tm_sched_id, get_default_tm_sched_id(intf_id, direction));
         BCMOLT_MSG_FIELD_SET(&tm_sched_cfg, attachment_point.u.tm_sched.tm_sched_param.u.priority.priority, priority);
-        /* removed by BAL v3.0, N/A - No direct attachment point of type ONU, same functionality may 
+        /* removed by BAL v3.0, N/A - No direct attachment point of type ONU, same functionality may
            be achieved using the' virtual' type of attachment.
         tm_sched_owner.u.sub_term.intf_id = intf_id;
         tm_sched_owner.u.sub_term.sub_term_id = onu_id;
@@ -2855,7 +2892,7 @@
         /* Maximum allocated bandwidth allowed for this alloc ID */
         BCMOLT_MSG_FIELD_SET(&cfg, sla.maximum_bw, pir_bw);
         BCMOLT_MSG_FIELD_SET(&cfg, sla.alloc_type, BCMOLT_ALLOC_TYPE_NSR);
-        /* Set to True for AllocID with CBR RT Bandwidth that requires compensation 
+        /* Set to True for AllocID with CBR RT Bandwidth that requires compensation
            for skipped allocations during quiet window */
         BCMOLT_MSG_FIELD_SET(&cfg, sla.cbr_rt_compensation, BCMOS_FALSE);
         /**< Allocation Profile index for CBR non-RT Bandwidth */
@@ -2941,7 +2978,7 @@
         key.alloc_id = alloc_id;
         sched_id = alloc_id;
 
-        BCMOLT_CFG_INIT(&cfg, itupon_alloc, key); 
+        BCMOLT_CFG_INIT(&cfg, itupon_alloc, key);
         err = bcmolt_cfg_clear(dev_id, &cfg.hdr);
         if (err) {
             OPENOLT_LOG(ERROR, openolt_log_id, "Failed to remove scheduler, direction = %s, intf_id %d, alloc_id %d, err = %s\n",
@@ -3064,8 +3101,8 @@
 bcmos_errno CreateDefaultQueue(uint32_t intf_id, const std::string direction) {
     bcmos_errno err;
 
-    /* Create 4 Queues on given PON/NNI scheduler */
-    for (int queue_id = 0; queue_id < 4; queue_id++) {
+    /* Create default queues on the given PON/NNI scheduler */
+    for (int queue_id = 0; queue_id < NUMBER_OF_DEFAULT_INTERFACE_QUEUES; queue_id++) {
         bcmolt_tm_queue_cfg tm_queue_cfg;
         bcmolt_tm_queue_key tm_queue_key = {};
         tm_queue_key.sched_id = get_default_tm_sched_id(intf_id, direction);
@@ -3339,3 +3376,226 @@
     OPENOLT_LOG(INFO, openolt_log_id, "BAL is ready\n");
     return Status::OK;
 }
+
+Status PerformGroupOperation_(const openolt::Group *group_cfg) {
+
+    bcmos_errno err;
+    bcmolt_group_key key = {};
+    bcmolt_group_cfg grp_cfg_obj;
+    bcmolt_group_members_update grp_mem_upd;
+    bcmolt_members_update_command grp_mem_upd_cmd;
+    bcmolt_group_member_info member_info = {};
+    bcmolt_group_member_info_list_u8 members = {};
+    bcmolt_intf_ref interface_ref = {};
+    bcmolt_egress_qos egress_qos = {};
+    bcmolt_tm_sched_ref tm_sched_ref = {};
+    bcmolt_action a_val = {};
+
+    uint32_t group_id = group_cfg->group_id();
+
+    OPENOLT_LOG(INFO, openolt_log_id, "PerformGroupOperation request received for Group %d\n", group_id);
+
+    if (group_id >= 0) {
+        key.id = group_id;
+    }
+    else {
+        OPENOLT_LOG(ERROR, openolt_log_id, "Invalid group id %d.\n", group_id);
+        return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid group id");
+    }
+
+    BCMOLT_CFG_INIT(&grp_cfg_obj, group, key);
+    BCMOLT_FIELD_SET_PRESENT(&grp_cfg_obj.data, group_cfg_data, state);
+
+    OPENOLT_LOG(INFO, openolt_log_id, "Checking if Group %d exists...\n",group_id);
+
+    err = bcmolt_cfg_get(dev_id, &(grp_cfg_obj.hdr));
+    if (err != BCM_ERR_OK) {
+        OPENOLT_LOG(ERROR, openolt_log_id, "Error in querying Group %d, err = %s\n", group_id, bcmos_strerror(err));
+        return bcm_to_grpc_err(err, "Error in querying group");
+    }
+
+    members.len = group_cfg->members_size();
+
+    // IMPORTANT: A member cannot be added to a group if the group type is not determined.
+    // Group type is determined after a flow is assigned to it.
+    // Therefore, a group must be created first, then a flow (with multicast type) must be assigned to it.
+    // Only then we can add members to the group.
+
+    // if group does not exist, create it and return.
+    if (grp_cfg_obj.data.state == BCMOLT_GROUP_STATE_NOT_CONFIGURED) {
+
+        if (members.len != 0) {
+            OPENOLT_LOG(ERROR, openolt_log_id, "Member list is not empty for non-existent Group %d. Members can be added only after a flow is assigned to this newly-created group.\n", group_id);
+            return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Non-empty member list given for non-existent group");
+        } else {
+
+            BCMOLT_CFG_INIT(&grp_cfg_obj, group, key);
+            BCMOLT_MSG_FIELD_SET(&grp_cfg_obj, cookie, key.id);
+
+            /*  Setting group actions and action parameters, if any.
+                Only remove_outer_tag and translate_inner_tag actions and i_vid action parameter
+                are supported for multicast groups in BAL 3.1.
+            */
+            const ::openolt::Action& action = group_cfg->action();
+            const ::openolt::ActionCmd &cmd = action.cmd();
+
+            bcmolt_action_cmd_id cmd_bmask = BCMOLT_ACTION_CMD_ID_NONE;
+            if (cmd.remove_outer_tag()) {
+                OPENOLT_LOG(INFO, openolt_log_id, "Action remove_outer_tag applied to Group %d.\n", group_id);
+                cmd_bmask = (bcmolt_action_cmd_id) (cmd_bmask | BCMOLT_ACTION_CMD_ID_REMOVE_OUTER_TAG);
+            }
+
+            if (cmd.translate_inner_tag()) {
+                OPENOLT_LOG(INFO, openolt_log_id, "Action translate_inner_tag applied to Group %d.\n", group_id);
+                cmd_bmask = (bcmolt_action_cmd_id) (cmd_bmask | BCMOLT_ACTION_CMD_ID_XLATE_INNER_TAG);
+            }
+
+            BCMOLT_FIELD_SET(&a_val, action, cmds_bitmask, cmd_bmask);
+
+            if (action.i_vid()) {
+                OPENOLT_LOG(INFO, openolt_log_id, "Setting action parameter i_vid=%d for Group %d.\n", action.i_vid(), group_id);
+                BCMOLT_FIELD_SET(&a_val, action, i_vid, action.i_vid());
+            }
+
+            BCMOLT_MSG_FIELD_SET(&grp_cfg_obj, action, a_val);
+
+            // Create group
+            err = bcmolt_cfg_set(dev_id, &(grp_cfg_obj.hdr));
+
+            if (BCM_ERR_OK != err) {
+                BCM_LOG(ERROR, openolt_log_id, "Failed to create Group %d, err = %s (%d)\n", key.id, bcmos_strerror(err), err);
+                return bcm_to_grpc_err(err, "Error in creating group");
+            }
+
+            BCM_LOG(INFO, openolt_log_id, "Group %d has been created and configured with empty member list.\n", key.id);
+            return Status::OK;
+        }
+    }
+
+    // The group already exists. Continue configuring it according to the update member command.
+
+    OPENOLT_LOG(INFO, openolt_log_id, "Configuring existing Group %d.\n",group_id);
+
+    // MEMBER LIST CONSTRUCTION
+    // Note that members.len can be 0 here. if the group already exists and the command is SET then sending
+    // empty list to the group is a legit operation and this actually empties the member list.
+    members.arr = (bcmolt_group_member_info*)bcmos_calloc((members.len)*sizeof(bcmolt_group_member_info));
+
+    if (!members.arr) {
+        OPENOLT_LOG(ERROR, openolt_log_id, "Failed to allocate memory for group member list.\n");
+        return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, "Memory exhausted during member list creation");
+    }
+
+    /* SET GROUP MEMBERS UPDATE COMMAND */
+    openolt::Group::GroupMembersCommand command = group_cfg->command();
+    switch(command) {
+        case openolt::Group::SET_MEMBERS :
+            grp_mem_upd_cmd = BCMOLT_MEMBERS_UPDATE_COMMAND_SET;
+            OPENOLT_LOG(INFO, openolt_log_id, "Setting %d members for Group %d.\n", members.len, group_id);
+            break;
+        case openolt::Group::ADD_MEMBERS :
+            grp_mem_upd_cmd = BCMOLT_MEMBERS_UPDATE_COMMAND_ADD;
+            OPENOLT_LOG(INFO, openolt_log_id, "Adding %d members to Group %d.\n", members.len, group_id);
+            break;
+        case openolt::Group::REMOVE_MEMBERS :
+            grp_mem_upd_cmd = BCMOLT_MEMBERS_UPDATE_COMMAND_REMOVE;
+            OPENOLT_LOG(INFO, openolt_log_id, "Removing %d members from Group %d.\n", members.len, group_id);
+            break;
+        default :
+            OPENOLT_LOG(ERROR, openolt_log_id, "Invalid value %d for group member command.\n", command);
+            bcmos_free(members.arr);
+            return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid group member command");
+    }
+
+    // SET MEMBERS LIST
+    for (int i = 0; i < members.len; i++) {
+
+        if (command ==  openolt::Group::REMOVE_MEMBERS) {
+            OPENOLT_LOG(INFO, openolt_log_id, "Removing group member %d from group %d\n",i,key.id);
+        } else {
+            OPENOLT_LOG(INFO, openolt_log_id, "Adding group member %d to group %d\n",i,key.id);
+        }
+
+        openolt::GroupMember *member = (openolt::GroupMember *) &group_cfg->members()[i];
+
+        // Set member interface type
+        openolt::GroupMember::InterfaceType if_type = member->interface_type();
+        switch(if_type){
+            case openolt::GroupMember::PON :
+                BCMOLT_FIELD_SET(&interface_ref, intf_ref, intf_type, BCMOLT_INTERFACE_TYPE_PON);
+                OPENOLT_LOG(INFO, openolt_log_id, "Interface type PON is assigned to GroupMember %d\n",i);
+                break;
+            case openolt::GroupMember::EPON_1G_PATH :
+                BCMOLT_FIELD_SET(&interface_ref, intf_ref, intf_type, BCMOLT_INTERFACE_TYPE_EPON_1_G);
+                OPENOLT_LOG(INFO, openolt_log_id, "Interface type EPON_1G is assigned to GroupMember %d\n",i);
+                break;
+            case openolt::GroupMember::EPON_10G_PATH :
+                BCMOLT_FIELD_SET(&interface_ref, intf_ref, intf_type, BCMOLT_INTERFACE_TYPE_EPON_10_G);
+                OPENOLT_LOG(INFO, openolt_log_id, "Interface type EPON_10G is assigned to GroupMember %d\n",i);
+                break;
+            default :
+                OPENOLT_LOG(ERROR, openolt_log_id, "Invalid interface type value %d for GroupMember %d.\n",if_type,i);
+                bcmos_free(members.arr);
+                return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid interface type for a group member");
+        }
+
+        // Set member interface id
+        if (member->interface_id() >= 0) {
+            BCMOLT_FIELD_SET(&interface_ref, intf_ref, intf_id, member->interface_id());
+            OPENOLT_LOG(INFO, openolt_log_id, "Interface %d is assigned to GroupMember %d\n", member->interface_id(), i);
+        } else {
+            bcmos_free(members.arr);
+            return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid interface id for a group member");
+        }
+
+        // Set member interface_ref
+        BCMOLT_FIELD_SET(&member_info, group_member_info, intf, interface_ref);
+
+        // Set member gem_port_id. This must be a multicast gemport.
+        if (member->gem_port_id() >= 0) {
+            BCMOLT_FIELD_SET(&member_info, group_member_info, svc_port_id, member->gem_port_id());
+            OPENOLT_LOG(INFO, openolt_log_id, "GEM Port %d is assigned to GroupMember %d\n", member->gem_port_id(), i);
+        } else {
+            bcmos_free(members.arr);
+            return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid gem port id for a group member");
+        }
+
+        // Set member scheduler id and queue_id
+        uint32_t tm_sched_id = get_default_tm_sched_id(member->interface_id(), downstream);
+        OPENOLT_LOG(INFO, openolt_log_id, "Scheduler %d is assigned to GroupMember %d\n", tm_sched_id, i);
+        BCMOLT_FIELD_SET(&tm_sched_ref, tm_sched_ref, id, tm_sched_id);
+        BCMOLT_FIELD_SET(&egress_qos, egress_qos, tm_sched, tm_sched_ref);
+
+        // We assume that all multicast traffic destined to a PON port is using the same fixed queue.
+        uint32_t tm_queue_id;
+        if (member->priority() >= 0 && member->priority() < NUMBER_OF_DEFAULT_INTERFACE_QUEUES) {
+            tm_queue_id = queue_id_list[member->priority()];
+            OPENOLT_LOG(INFO, openolt_log_id, "Queue %d is assigned to GroupMember %d\n", tm_queue_id, i);
+            BCMOLT_FIELD_SET(&egress_qos, egress_qos, type, BCMOLT_EGRESS_QOS_TYPE_FIXED_QUEUE);
+            BCMOLT_FIELD_SET(&egress_qos.u.fixed_queue, egress_qos_fixed_queue, queue_id, tm_queue_id);
+        } else {
+            OPENOLT_LOG(ERROR, openolt_log_id, "Invalid fixed queue priority/ID %d for GroupMember %d\n", member->priority(), i);
+            bcmos_free(members.arr);
+            return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid queue priority for a group member");
+        }
+
+        BCMOLT_FIELD_SET(&member_info, group_member_info, egress_qos, egress_qos);
+        BCMOLT_ARRAY_ELEM_SET(&(members), i, member_info);
+    }
+
+    BCMOLT_OPER_INIT(&grp_mem_upd, group, members_update, key);
+    BCMOLT_MSG_FIELD_SET(&grp_mem_upd, members_cmd.members, members);
+    BCMOLT_MSG_FIELD_SET(&grp_mem_upd, members_cmd.command, grp_mem_upd_cmd);
+
+    err = bcmolt_oper_submit(dev_id, &(grp_mem_upd.hdr));
+    bcmos_free(members.arr);
+
+    if (BCM_ERR_OK != err) {
+        OPENOLT_LOG(ERROR, openolt_log_id, "Failed to submit members update operation for Group %d err = %s (%d)\n", key.id, bcmos_strerror(err), err);
+        return bcm_to_grpc_err(err, "Failed to submit members update operation for the group");
+    }
+
+    OPENOLT_LOG(INFO, openolt_log_id, "Successfully submitted members update operation for Group %d\n", key.id);
+
+    return Status::OK;
+}
diff --git a/agent/src/indications.cc b/agent/src/indications.cc
index 3f1cca2..936e14a 100644
--- a/agent/src/indications.cc
+++ b/agent/src/indications.cc
@@ -840,6 +840,28 @@
 }
 */
 
+static void GroupIndication(bcmolt_devid olt, bcmolt_msg *msg) {
+
+    switch (msg->obj_type) {
+        case BCMOLT_OBJ_ID_GROUP:
+            switch (msg->subgroup) {
+                case BCMOLT_GROUP_AUTO_SUBGROUP_COMPLETE_MEMBERS_UPDATE:
+                {
+                    bcmolt_group_key *key = &((bcmolt_group_complete_members_update*)msg)->key;
+                    bcmolt_group_complete_members_update_data *data =
+                            &((bcmolt_group_complete_members_update*)msg)->data;
+
+                    if (data->result == BCMOLT_RESULT_SUCCESS) {
+                        OPENOLT_LOG(INFO, openolt_log_id, "Complete members update indication for group %d (successful)\n", key->id);
+                    } else {
+                        OPENOLT_LOG(ERROR, openolt_log_id, "Complete members update indication for group %d (failed with reason %d)\n", key->id, data->fail_reason);
+                    }
+                }
+            }
+    }
+    bcmolt_msg_free(msg);
+}
+
 Status SubscribeIndication() {
     bcmolt_rx_cfg rx_cfg = {};
     bcmos_errno rc;
@@ -1023,6 +1045,14 @@
         return Status(grpc::StatusCode::INTERNAL, "ITU PON Alloc Configuration \
 Complete Indication subscribe failed");
 
+    rx_cfg.obj_type = BCMOLT_OBJ_ID_GROUP;
+    rx_cfg.rx_cb = GroupIndication;
+    rx_cfg.flags = BCMOLT_AUTO_FLAGS_NONE;
+    rx_cfg.subgroup = bcmolt_group_auto_subgroup_complete_members_update;
+    rc = bcmolt_ind_subscribe(current_device, &rx_cfg);
+    if(rc != BCM_ERR_OK)
+        return Status(grpc::StatusCode::INTERNAL, "Complete members update indication subscribe failed");
+
     subscribed = true;
 
     return Status::OK;