Specification - GSoC Graph Streaming API

From Gephi:Wiki
Jump to: navigation, search
Qsicon inprogress.png This article is work in progress.



GSoC Project Graph Streaming API

Student: Andre Panisson

Mentor: Mathieu Bastian

Goal of the Project

The goal of this project is to set up a framework for graph streaming in Gephi. A high-quality API proposal should be sufficiently general to provide interoperability with other available tools. The more general is the API proposal, a larger number of tools should be able to implement it and to interoperate. It should be able to deal with dynamic data, with the idea that a graph is not static and might change continuously, as the Gephi's data structure and visualization engine does.

Specifications

A unified framework for graph streaming should include functionalities to add, remove and modify graph entities (nodes and edges), and also operations to retrieve and monitor graph information. It should be sufficiently general to give to a tool the ability to connect to a datasource as a client, reading and updating graph information in the datasource, and also enabling a tool to work as a datasource itself, providing operations to external clients to read and update its graph information.

The Graph Streaming API can be divided in 2 different types of operations, grouped by functionality. The first group exposes operations to add, remove and modify the graph entities (nodes and edges), while the second group exposes operations to retrieve and monitor graph information.

A totally new API called StreamingAPI will be created, based in the current ImportAPI, which already separates importing from processing data to the main data structure by using a container. As the new API should be also independent of the ImportAPI, it is expected that some of the ImportAPI's classes will be replicated in the new module, as the logic is very similar.

Operations

Different operations will be exposed by the server to be accessed by external entities, some of them based on the Graph API. The operations can be divided in two different groups.

The first group consists of read operations, and do not imply in any change in the graph:

  • nodeList getNodes() - returns a list of nodes in the graph structure
  • edgeList getEdges() - returns the list of edges in the graph structure
  • node getNode(id) - returns the node information
  • edge getEdge(id) - returns the edge information
  • boolean nodeExists(id) - verify if a given node exists in the graph structure
  • boolean edgeExists(id) - verify if a given edge exists in the graph structure
  • attributeValue getNodeAttribute(id, attribute) - returns the node attribute value
  • attributeValue getEdgeAttribute(id, attribute) - returns the edge attribute value
  • attributeList getNodeAttributes(id) - returns the node attributes
  • attributeList getEdgeAttributes(id) - returns the edge attributes

The second group consists of change operations, and imply in changes in the graph:

  • void addNode(node) - add a node to the graph structure
  • void addEdge(edge) - add an edge to the graph structure
  • removeNode(id)
  • removeEdge(id)
  • void addNodeAttribute(node, attribute, value) - add an attribute to the node
  • void addEdgeAttribute(edge, attribute, value) - add an attribute to the edge
  • void changeNodeAttribute(node, attribute, value) - changes a node's attribute
  • void changeEdgeAttribute(edge, attribute, value) - changes an edge's attribute
  • void removeNodeAttribute(node, attribute) - removes a node's attribute
  • void removeEdgeAttribute(edge, attribute) - removes an edge's attribute

The main difference between these two groups is that, while the first group of operations can do a direct reading on the graph's objects, the second group will generate one or more graph events to be processed asynchronously.

A special operation defined as getGraph() is very important for the graph streaming process. This operation should first return all graph events to build the entire graph and, after it, maintain the connection active, and send events as the graph is changed. The stream stops to send events only when the socket is closed.

The operations will be invoked through HTTP requests. All operations requires the client to make a request to the server with the operation name and parameters, and the server will return the response with the required data or if the operation was successfully executed. The only operation in which the server could not answer immediately is the getGraph() operation, that characterize the real streaming core of the API. In this operation, the server will keep the connection with the client, and send graph information on demand. In fact, while the other operations are client-driven and the client has the communication's initiative, the getGraph() operation is the only one where the server can actively communicate with the client, sending information to it. In this way, a client can keep a connection with the server through the getGraph() operation and receiving the server updates in real time, and also connect to the server through the other operations to inform the server when the graph changes locally.

The Streaming API could return data in JSON format and in GEXF format. JSON is strongly encouraged over XML, as JSON is more compact and parsing is greatly simplified by the delimited parameter: every object is returned on its own line, and ends with a carriage return. In a streaming situation, it is more practical, as it is possible to parse the data as it arrives, without waiting for a closing tag or for the end of the stream.

Connecting to the API server

As the Graph Streaming API involves sending and receiving data through the network, security should be a concern. At start, a minimum security could be provided by the socket server, but with possible extensions. In the scope of this project, the connections will be implemented with basic HTTP authentication, but more robust implementations should be supported by the API.

To connect to the Graph Streaming API using the getGraph operation, a HTTP request should be made to the socket server and the resulting stream consumed for as long as is practical. The server will hold the connection open indefinitely. As stated before, the API expects HTTP basic authentication.

Some HTTP libraries only return the response body after the connection has been closed by the server. These libraries will not work for accessing the Graph Streaming API. We must implement a client (or use an HTTP library) that will return response data incrementally. The Apache HttpClient is an option. At the same time, most of the HTTP server libraries only return the response to the client when the processing is finished. Servers based on the Servlet API, for example, are not able to handle asynchronous processing, and are not suitable to work with streaming data.

As there are many reasons to lose the connection with the server (server restart, connection lagging, server errors), the client should support to reconnect immediatelly after the connection is closed. If the first retry returns an error, the client should back off exponentially, starting with a 2 second wait and cap the wait at 32 seconds.


In order to connect to a stream data source, any component should:

1. Get the current Graph instance:

        GraphController graphController = Lookup.getDefault().lookup(GraphController.class);
        GraphModel graphModel = graphController.getModel();
        Graph graph = graphModel.getHierarchicalMixedGraph();

2. Get the StreamingController:

        StreamingController controller = Lookup.getDefault().lookup(StreamingController.class);

3. Define the endpoint to connect to:

        GraphStreamingEndpoint endpoint = new GraphStreamingEndpoint();
        endpoint.setUrl(new URL("http://streamingserver/streamingcontext"));
        // endpoint.setStreamType() accepts a StreamType instance, so you
        // have to get it using the controller
        StreamType type = controller.getStreamType("JSON");
        endpoint.setStreamType(type);

4. Connect to it and process:

        StreamingConnection connection = controller.connect(endpoint, graph);
        connection.process();

The client will connect to the endpoint, process the events and update the graph accordingly. Note that the method connection.process() returns only when the stream finishes. If you want to asynchronously process the stream, you will want to call connection.asynchProcess(). But if you start the asynchronous processing of the stream, you lose control of when the stream finishes. In order to know when the stream finishes, you can control the StreamingConnection object to know when the connection was closed. If you want to know asynchronously when the connection was closed, without controlling the connection object, you can inform a listener to receive the event:

        StreamingConnection connection = controller.connect(endpoint, graph);
        connection.addStatusListener(new StatusListener() {
                    public void onConnectionClosed(StreamingConnection connection) {
                        System.out.println("Connection was closed!");
                    }
                });
        connection.asynchProcess();

If you want to customize the behavior of event processing, doing something else than updating the graph, you can work with more low-level objects like StreamReaders. You can implement you own GraphEventHandler and pass it directly to the StreamReader implementation. For example, to count the received events you could do this:

        final AtomicInteger counter = new AtomicInteger();
 
        GraphEventHandler eventHandler = new GraphEventHandler() {
            public void handleGraphEvent(GraphEvent event) {
                counter.incrementAndGet();
            }
        };
 
        URL url = new URL("http://streamingserver/streamingcontext");
        url.openConnection();
        InputStream inputStream = url.openStream();
 
        GraphEventBuilder eventBuilder = new GraphEventBuilder(endpoint.getUrl());
 
        StreamReaderFactory readerFactory = Lookup.getDefault().lookup(StreamReaderFactory.class);
        StreamReader reader = readerFactory.createStreamReader("JSON", eventHandler, eventBuilder);
        streamReader.processStream(inputStream);

The graph events will be sent to the GraphEventHandler as they arrive in the inputStream. The reported problems/exceptions during the streaming process are reported in the object Report:

	Report report = connection.getReport();

You can also write the events in a given format using the StreamWriter:

        ByteArrayOutputStream out = new ByteArrayOutputStream();
        StreamWriterFactory factory = Lookup.getDefault().lookup(StreamWriterFactory.class);
        StreamWriter streamWriter = factory.createStreamWriter("JSON", out);

Now you can use it as a GraphEventHandler instance (StreamReader implements the GraphEventHandler interface), and you can write the events to the output using the given format:

        StreamReaderFactory factory = Lookup.getDefault().lookup(StreamReaderFactory.class);
        StreamReader reader = readerFactory.createStreamReader("JSON", streamWriter, eventBuilder);
        streamReader.processStream(inputStream);

Parsing Responses

The Graph Streaming API returns data in JSON format. JSON is strongly encouraged over XML, as it is more compact and parsing is greatly simplified by the delimited parameter. Every data object is returned on its own line, and ends with a carriage return. Newline characters (\n) may occur in object elements, but carriage returns (\r) should not. The parser must be tolerant of occasional extra newline characters placed between data objects. These characters are placed as periodic "keep-alive" messages. These keep-alives allow clients and NAT firewalls to determine that the connection is indeed still valid during long periods without data transmission.

Collecting and Processing Graph Data

This implementation will be based in decoupled data collection, processing and event dispatching components. The stream client that collects graph data will run in its own thread, pushing the collected events to a buffer in the container. The container, in its independent thread, consumes the events in its buffer and sends it to the event handler, that is responsible for updating the graph.

The following diagram shows a typical sequence in a loop where the StreamingController connects to the URL, send the stream to the stream reader, which processes it and send the produced events to the container:

StreamingAPI diagram1.png
Figure 1: Sequence Diagram with Streaming API components

In this example, the StreamingController receives data and sends it to the StreamReader until the stream finishes or get closed. The StreamReader receives the data and, if a new graph event was received, sends it to the Container. The Container add them to its internal buffer and return immediatelly, as another thread should dispatch the events asynchronously. The events produced are event objects from the following hierarchy:

GraphEventClassDiagram.png
Figure 2: Graph Events Hierarchy

The StreamReader is the abstraction for the implementation of the different stream types: XmlStreamReader, JsonStreamReader, etc...

Implementation Strategies

Two strategies will be implemented in different steps:

1. Gephi connects to an external data-source using the defined Graph Streaming protocol, and updates its data accordingly;

2. Gephi exposes its containers operations, and external clients connects to it using the defined protocol, sending objects to it, and Gephi updates its data accordingly.

First Step:

The start implementation will focus on Gephi connecting to an external data-source. The test case will be a socket server that produces graph objects in streaming, using the defined Graph Streaming API. Gephi should be able to connect to the socket server and fetch the arriving objects, updating the local entities as the objects arrive in the stream.

The approach that will be used is the development of a multi-threaded socket server which accepts connections using HTTP protocol. The definition of a REST API is fundamental to start the implementation.

Second Step:

The second step in the implementation will be the development of a plugin to integrate the socket server within Gephi, so Gephi will be able to expose its operations accordingly to the Graph Streaming API. By doing this, and external client should be able to connect to it using the protocol, sending objects to it, and Gephi will update its data accordingly. Also Gephi should be able to expose graph information using the API. This step involves also the reusing and extension of the ExportAPI, as the current definition supports only exporting the whole graph, but not graph entities as nodes and edges.

Collaborative Graph Construction

At the end of the project, the following testbed should be implemented:

Consider two Gephi instances interconnected. One of the instances is used as the DataSource, the other instance as the Client. With a connection from the Client to the DataSource, using the Graph Streaming API, a change in a graph in the DataSource workspace should cause the DataSource to send information about its changes to the Client. At the same time, a change in a graph in the Client workspace should cause the Client to send requests to the Datasource to update its graph accordingly.

Both instances should work in a distributed mode, and that connecting other Gephi Clients to the DataSource should work as well. In fact, different people could work in a distributed mode to construct a graph, in a mode we call Collaborative Graph Construction.

One problem to solve is to decide how to deal with data propagation to the instances. For example, consider that the graph in the client is changed. A request will be sent to the server to update its graph accordingly, and as a consequence, this update echoes to all its connected clients. At the same time, the change should not be sent to the client that produced the first change, or the update mechanism could end in loop.

One way to handle with this problem is providing that only the server propagate its changes to the connected clients, and should recognize when an update is made by one of its clients to not send back to that specific client. To do so, clients should recognize when a change is caused by the streaming API, so the change will not propagate back to the data source.

Timeline

The starting phase is the design of drafts and specifications. The first coding phase will be the development of the first strategy: Gephi connects to an external data-source using the defined Graph Streaming protocol, and updates its data accordingly.

The second coding phase will be the development of the second strategy: a socket server will be integrated to Gephi as a plugin, and Gephi will expose its containers operations, and external clients will be able to connect to it using the API, sending objects to it, and Gephi updates its data accordingly. The final coding phase will be the development of user interfaces to monitor both the Gephi containers and socket server.

Activity
Start
End
Design drafts and specification 05-26
06-11
Implementation of a dummy socket server which exposes the Graph Streaming API and GEXF format 06-11
06-25
Implementation of a functional socket server which exposes a real graph using the Graph Streaming API 06-18 07-02
Implementation of a socket connection between Gephi and the socket server 06-18 07-09
Implementation of incremental graph construction with the objects arriving from the socket server 06-25 07-12
Mid-term evaluations
07-12 07-16
Integration of the socket server plugin within Gephi 07-12 07-23
Exposition of Gephi container functions and graph entities through the socket server plugin 07-16 07-30
Basic user interface to monitor the container and accept/reject filters and rules 07-23 08-09
Basic user interface to monitor the socket server 07-30 08-09
"Pencils down"
08-09 08-16

Using the Streaming API Components

Before running:

  • Check-out and build the Gephi branch for Graph Streaming, available at lp:~panisson/gephi/graphstreaming

Connecting to a stream:

Gephi will connect to the specified URL and parse the ingoing events using the JSON format.

Exposing the current workspace as a stream:

  • Go to the Streaming tab and click Server/Start
  • Go to a web browser and connect to the specified URL
  • By default you can also access it using https, the default port is 8443

To Connect from other Gephi instance, you can start Gephi in other computer and try to connect to the same URL, using the computer name or IP instead of localhost.

How to access the workspace data using the Streaming API

You can use the Streaming API to access the workspace data, for this the HTTP server exposes a REST interface. Execute the following steps to test it:

  • Run the Gephi application
  • Create an empty workspace (File/New Project)
  • Go to the tab Streaming and Click on Server/Start
  • Open a shell prompt and execute the following lines, one after the other:
curl "http://localhost:8080/workspace0?operation=updateGraph" -d "{\"an\":{\"A\":{\"label\":\"Streaming Node A\"}}}"
curl "http://localhost:8080/workspace0?operation=updateGraph" -d "{\"an\":{\"B\":{\"label\":\"Streaming Node B\"}}}"
curl "http://localhost:8080/workspace0?operation=updateGraph" -d "{\"an\":{\"C\":{\"label\":\"Streaming Node C\"}}}"
curl "http://localhost:8080/workspace0?operation=updateGraph" -d "{\"ae\":{\"AB\":{\"source\":\"A\",\"target\":\"B\",\"directed\":false}}}"
curl "http://localhost:8080/workspace0?operation=updateGraph" -d "{\"ae\":{\"BC\":{\"source\":\"B\",\"target\":\"C\",\"directed\":false}}}"
curl "http://localhost:8080/workspace0?operation=updateGraph" -d "{\"ae\":{\"CA\":{\"source\":\"C\",\"target\":\"A\",\"directed\":false}}}"

You should see the nodes and edges appearing in the workspace, until they form a triangle. You can send more events to the workspace using the same command line, just change the node and edge identifiers.

The same events can be sent by just one HTTP request:

curl "http://localhost:8080/workspace0?operation=updateGraph" -d $'{"an":{"A":{"label":"Streaming Node A"}}}\r
{"an":{"B":{"label":"Streaming Node B"}}}\r
{"an":{"C":{"label":"Streaming Node C"}}}\r
{"ae":{"AB":{"source":"A","target":"B","directed":false}}}\r
{"ae":{"BC":{"source":"B","target":"C","directed":false}}}\r
{"ae":{"CA":{"source":"C","target":"A","directed":false}}}'

Other operations are used to retrieve node and edge data:

curl "http://localhost:8080/workspace0?operation=getNode&id=A"
curl "http://localhost:8080/workspace0?operation=getEdge&id=AB"

And to get the complete graph, use the operation getGraph or no operation at all (getGraph is the default operation):

curl "http://localhost:8080/workspace0?operation=getGraph"

In this case, the server maintains the connection, and after sending the entire graph, enters the Streaming mode, in which any new event will be sent to the client.

The following command line can be used to get node and edge data in the DGS format:

curl "http://localhost:8080/workspace0?operation=getNode&id=A&format=DGS"
curl "http://localhost:8080/workspace0?operation=getEdge&id=AB&format=DGS"

To get the complete graph in DGS format, use the operation getGraph or no operation at all (getGraph is the default operation):

curl "http://localhost:8080/workspace0?operation=getGraph&format=DGS"

Current Status of JSON Streaming Format

The current implementation of JSON Streaming Format is very simple and still subject to changes. It is composed of 6 types of events, divided in 2 types of elements (nodes and edges) and 3 types of operations (add, change, delete):

  • an: Add node
  • cn: Change node
  • dn: Delete node
  • ae: Add edge
  • ce: Change edge
  • de: Delete edge

Each event is composed by its event type and a list of objects of type node or edge, depending on the event type. Node and edge objects are similar, and composed of an identifier and a list of attributes. The "add edge" is the only operation in which there is three mandatory attributes: source, target and directed. Source and target are node identifiers, and directed is a boolean representing if the edge is directed or not.

The events are currently represented in the JSON format as follows:

{<event_type>:{<object_identifier>:{<attribute_name>:<attribute_value>,<attribute_name>:<attribute_value>}}}

Following, we show a list of events with some examples for each type of event, represented in the current JSON format implementation:

{"an":{"A":{"label":"Streaming Node A","size":2}}} // add node A
{"an":{"B":{"label":"Streaming Node B","size":1}}} // add node B
{"an":{"C":{"label":"Streaming Node C","size":1}}} // add node C
{"ae":{"AB":{"source":"A","target":"B","directed":false,"weight":2}}} // add edge A->B
{"ae":{"BC":{"source":"B","target":"C","directed":false,"weight":1}}} // add edge B->C
{"ae":{"CA":{"source":"C","target":"A","directed":false,"weight":2}}} // add edge C->A
{"cn":{"C":{"size":2}}}  // changes the size attribute to 2
{"cn":{"B":{"label":null}}}  // removes the label attribute
{"ce":{"AB":{"label":"From A to B"}}} // add the label attribute
{"de":{"BC":{}}} // delete edge BC
{"de":{"CA":{}}} // delete edge CA
{"dn":{"C":{}}}  // delete node C

With this format it is possible to put more than one object in each event, as in the following example:

{"an":{
    "A":{"label":"Streaming Node A","size":2}
    "B":{"label":"Streaming Node B","size":1}
    "C":{"label":"Streaming Node C","size":1}
  }
}

But we recommend to send only one object in each event, as it is more suitable for a streaming approach: the data should be read as soon as possible by the client, and the approach using multiple objects by event slows the client reading, because it can't parse the JSON event object until a '\r' appears.

We recall that this format is subject to changes, as more requirements are being added to the Graph Streaming API.

Format changing considerations currently in progress

There are some considerations in progress in order to adapt the JSON format to some requirements.

The first one is to add support to filters. Filters are very important when changing groups of objects with the same characteristic. For example, if you want to change the color of all nodes with size=x, you could use a filter event. It would cost much less than sending one event for each node.

Another requirement is to support identifiers to events. In some cases, it would be interesting to assign an identifier to the event. For example, in a distributed environment, the events are produced by event producers, but you cannot be sure that the event consumers receive the events in the same order. As a way to solve this problem, each event producer could assign a timestamp to the event, as a way to ensure that each event consumer produce the same results.

Event identifiers will be assigned to events using a special "id" attribute, at the same level of the event type:

{
    "id": "1278944510",
    "an": {
        "A": {
            "label": "Streaming Node A",
            "size": 2 
        } 
    }
}

This way, the event identifier will be parsed only if the "id" attribute is present in the event object. Someone that does not want to use identifiers should only ignore the "id" attribute: no overhead is added to the format, it remains compatible with "old style" events, and the format remains concise.

Using the Graph Streaming plugin in the Toolkit

You can use the Graph Streaming API to connect from the Toolkit, without a user interface.

Connecting to an external source

ProjectController projectController = Lookup.getDefault().lookup(ProjectController.class);
Project project = projectController.getCurrentProject();
Workspace workspace = projectController.getCurrentWorkspace();
 
// Get the graph instance
GraphController graphController = Lookup.getDefault().lookup(GraphController.class);
GraphModel graphModel = graphController.getModel();
Graph graph = graphModel.getHierarchicalMixedGraph();
 
// Connect to stream using the Streaming API
StreamingController controller = Lookup.getDefault().lookup(StreamingController.class);
StreamingEndpoint endpoint = new StreamingEndpoint();
endpoint.setUrl(new URL(“http://localhost:8080/workspace0″));
endpoint.setStreamType(controller.getStreamType(“JSON”));
StreamingConnection connection = controller.connect(endpoint, graph);
connection.asynchProcess();

Starting a Master

And you can also start a master from the Toolkit using the API:

StreamingServer server = Lookup.getDefault().lookup(StreamingServer.class);
ServerControllerFactory controllerFactory = Lookup.getDefault().lookup(ServerControllerFactory.class);
ServerController serverController = controllerFactory.createServerController(graph);
String context =/mycontext”;
server.register(serverController, context);

Using this code, the Gephi master will be accessible in the following url: http://your_ip_here:8080/mycontext


Resources

Related discussion: forum.