ArcGIS Pro 3.3 API Reference Guide
ArcGIS.Core.Data.Realtime Namespace / RealtimeCursorBase Class / WaitForRowsAsync Method / WaitForRowsAsync() Method
Example

In This Topic
    WaitForRowsAsync() Method
    In This Topic
    Asynchronously waits for new rows to be available in the internal queue of this real-time cursor. The returned System.Threading.Tasks.Task will also complete if the state of this RealtimeCursor (see GetState) changes from RealtimeCursorState.Subscribed. This method can be called on any thread.
    Syntax
    public Task<bool> WaitForRowsAsync()
    Public Overloads Function WaitForRowsAsync() As Task(Of Boolean)

    Return Value

    A System.Threading.Tasks.Task that will complete once this cursor becomes unsubscribed or new rows are available in the internal queue of this real-time cursor. The returned System.Boolean value is false if this cursor is unsubscribed and there are no more rows to be read. Otherwise it's true.
    Exceptions
    ExceptionDescription
    A geodatabase-related exception has occurred.
    Example
    Submit a Graph Query
    //On the QueuedTask...
    //and assuming you have established a connection to a knowledge graph
    //...
    //Construct an openCypher query - return the first 10 entities (whatever
    //they are...)
    var query = "MATCH (n) RETURN n LIMIT 10";//default limit is 100 if not specified
    //other examples...
    //query = "MATCH (a:Person) RETURN [a.name, a.age] ORDER BY a.age DESC LIMIT 50";
    //query = "MATCH (b:Person) RETURN { Xperson: { Xname: b.name, Xage: b.age } } ORDER BY b.name DESC";
    //query = "MATCH p = (c:Person)-[:HasCar]-() RETURN p ORDER BY c.name DESC";
    
    //Create a query filter
    //Note: OutputSpatialReference is currently ignored
    var kg_qf = new KnowledgeGraphQueryFilter()
    {
      QueryText = query
    };
    //Optionally - u can choose to include provenance in the results
    //(_if_ the KG has provenance - otherwise the query will fail)
    if (includeProvenanceIfPresent)
    {
      //see "Get Whether KG Supports Provenance" snippet
      if (KnowledgeGraphSupportsProvenance(kg))
      {
        //Only include if the KG has provenance
        kg_qf.ProvenanceBehavior = 
          KnowledgeGraphProvenanceBehavior.Include;//default is exclude
      }
    }
    //submit the query - returns a KnowledgeGraphCursor
    using (var kg_rc = kg.SubmitQuery(kg_qf))
    {
      //wait for rows to be returned from the server
      //note the "await"...
      while (await kg_rc.WaitForRowsAsync())
      {
        //Rows have been retrieved - process this "batch"...
        while (kg_rc.MoveNext())
        {
          //Get the current KnowledgeGraphRow
          using (var graph_row = kg_rc.Current)
          {
            //Graph row is an array, process all returned values...
            var val_count = (int)graph_row.GetCount();
            for (int i = 0; i < val_count; i++)
            {
              var retval = graph_row[i];
              //Process row value (note: recursive)
              //See "Process a KnowledgeGraphRow Value" snippet
              ProcessKnowledgeGraphRowValue(retval);
            }
          }
        }
      }//WaitForRowsAsync
    }//SubmitQuery
    Search And Subscribe for Streaming Data
    await QueuedTask.Run(async () =>
    {
      //query filter can be null to search and retrieve all rows
      //true means recycling cursor
      using (var rc = streamLayer.SearchAndSubscribe(qfilter, true))
      {
        //waiting for new features to be streamed
        //default is no cancellation
        while (await rc.WaitForRowsAsync())
        {
          while (rc.MoveNext())
          {
            using (var row = rc.Current)
            {
              //determine the origin of the row event
              switch (row.GetRowSource())
              {
                case RealtimeRowSource.PreExisting:
                  //pre-existing row at the time of subscribe
                  continue;
                case RealtimeRowSource.EventInsert:
                  //row was inserted after subscribe
                  continue;
                case RealtimeRowSource.EventDelete:
                  //row was deleted after subscribe
                  continue;
              }
            }
          }
        }
      }//row cursor is disposed. row cursor is unsubscribed
    
      //....or....
      //Use the feature class instead of the layer
      using (var rfc = streamLayer.GetFeatureClass())
      {
        //non-recycling cursor - 2nd param "false"
        using (var rc = rfc.SearchAndSubscribe(qfilter, false))
        {
          //waiting for new features to be streamed
          //default is no cancellation
          while (await rc.WaitForRowsAsync())
          {
            //etc
          }
        }
      }
    });
    Search And Subscribe With Cancellation
    await QueuedTask.Run(async () =>
    {
      //Recycling cursor - 2nd param "true"
      //or streamLayer.Subscribe(qfilter, true) to just subscribe
      using (var rc = streamLayer.SearchAndSubscribe(qfilter, true))
      {
        //auto-cancel after 20 seconds
        var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
        //catch TaskCanceledException
        try
        {
          while (await rc.WaitForRowsAsync(cancel.Token))
          {
            //check for row events
            while (rc.MoveNext())
            {
              using (var row = rc.Current)
              {
                //etc
              }
            }
          }
        }
        catch (TaskCanceledException )
        {
          //Handle cancellation as needed
        }
        cancel.Dispose();
      }
    });
    Search Existing Data and Subscribe for Streaming Data
    //Note we can use System Task with the Realtime feature class
    //for subscribe
    await System.Threading.Tasks.Task.Run(async () =>
    // or use ... QueuedTask.Run()
    {
      using (var rfc = streamLayer.GetFeatureClass())
      {
        //non-recycling cursor - 2nd param "false"
        using (var rc = rfc.SearchAndSubscribe(qfilter, false))
        {
          //waiting for new features to be streamed
          //default is no cancellation
          while (await rc.WaitForRowsAsync())
          {
            //pre-existing rows will be retrieved that were searched
            while (rc.MoveNext())
            {
              using (var row = rc.Current)
              {
                var row_source = row.GetRowSource();
                switch (row_source)
                {
                  case RealtimeRowSource.EventDelete:
                    //TODO - handle deletes
                    break;
                  case RealtimeRowSource.EventInsert:
                    //TODO handle inserts
                    break;
                  case RealtimeRowSource.PreExisting:
                    //TODO handle pre-existing rows
                    break;
                }
              }
            }
          }
        }//row cursor is disposed. row cursor is unsubscribed
      }
    });
    
    Requirements

    Target Platforms: Windows 11, Windows 10

    ArcGIS Pro version: 3.2 or higher.
    See Also