Distributed Architecture RFC

From OpenCog
Jump to: navigation, search

Obsolete Documentation! The Distributed AtomSpace page describes the current archittecture. The Distributed AtomSpace Architecture page provides a general overview.

A proposal from 2009. Some of what is described in here has been implemented, some is no longer applicable. Some is potentially still interesting, and worth pondering. Note that terminology and defintions have changed between then and now.

Purpose

The purpose of this document is to outline the design, and specify some concrete details about how to create a massively distributed AtomSpace for OpenCog. We also discuss alterations to the AtomSpace API, and potential changes to MindAgent design due to the latency associated with accessing atoms that are not cached in RAM.

Assumptions and definitions

Core definitions, assumptions, requirements and issues are spelled out on the Distributed Architecture page. Here we clarify some terms and what they mean in the context of a distributed OpenCog architecture.

  • The AtomSpace refers to the entirety of an OpenCog instance. It consists of all atoms, whether they are in-memory, locally on disk, or distributed.
  • The repository is persistent storage for the AtomSpace. It does not need to be distributed and could be a disk based storage, the main requirement here is that it's a persistent data structure (handles versioning).
    • The repository driver which handles the storage and retrieval of atoms from the repository implements the AtomSpaceLayer API, but is the lowest or base layer.
    • It is expected that this repository is a lot slower than any higher AtomSpaceLayers, but can scale easily. One example of a potential distributed repository is Hadoop (the Apache BigTable project), and an example of a disk based repository might be an interface to SQLite (if appropriately designed to keep versions).
  • AtomSpaceLayers are layers in a hierarchical caching of the AtomSpace, with each above layer being quicker to retrieve from than the last, from network, to disk, to in-memory AtomTable (which also implements the AtomSpace API). The AtomSpaceLayer with no layers above it, is generally the AtomTable, however, specialist modules (such as an AtomSpace representation in a GPU's memory, for code utilising CUDA) may go on top the AtomTable layer if necessary).
  • "forgetting" an atom refers to marking an atom in the repository as obsolete (i.e. deleted, but previous versions can still exist) although individual Cogs may still have local versions.
  • "dropping" an atom from an AtomSpaceLayer means flushing it from that layer and optionally storing it permanently in the repository.
  • A Cog is a single instance of the OpenCog server, which may be part of a larger OpenCog instance all sharing the same repository. A Cog must have at least one AtomSpaceLayer for it to interface with the repository.
  • The AtomSpace API defines the API for querying for and manipulating atoms. It is the access point for atoms whether they are in memory or distributed.
  • The AtomTable is an implementation of both the AtomSpace API and the AtomSpaceLayer API. It is essentially an in-memory localised cache of the AtomSpace.
  • A Handle is a reference to an Atom - which might be in memory, distributed, and might even refer to a specific version. See next section for more detail.
  • The UUID 64/128 bit unique identifier for each atom in the AtomSpace.
  • Here, a MindAgent refers to any process (whether it inherits from the MindAgent class or not) that accesses the AtomSpace through the AtomSpace API.

The Handle

The Handle class is used for all accesses to Atoms. Unless special circumstances call for it, one should assume that any code outside of the AtomTable, and other AtomSpaceLayers, will only use Handles.

Handle management by the AtomTable

typedef boost::shared_ptr<HandleInstance> Handle;

class HandleInstance {
    uint UUID;
    uint version;
    boost::shared_ptr<Atom> atom;
}

/** A ScopedHandle merely copies the original HandleInstance
 * so that the shared_ptr<Atom> member increases it's reference count
 * for the lifetime of the ScopedHandle.
 */
class ScopedHandle: public HandleInstance {
public:
    /** The only addition is that users of ScopedHandle can decide to
     * commit changes to the Repository. This is important because the
     * AtomTable might attempt to drop the Atom from memory and forget
     * about it. commit() tells the AtomTable to push the changes to the
     * Repository even if it doesn't know about the Handle in
     * question anymore.
     * MindAgents don't have to do this if they are merely interested in
     * maintaining read-access to the Atoms.
     */
    commit();
}

The AtomTable maintains a set of all HandleInstances currently referenced by a Cog. All HandleInstances are passed by Handle throughout code external to the AtomSpace architecture - so that there is only one HandleInstance per UUID.

This HandleInstance object is automatically reference counted, so that the AtomTable cannot inadvertently remove the HandleInstance object when a MindAgent is using it via a Handle. The AtomTable can however remove the Atom that the HandleInstance refers to, this change is reflected in the HandleInstance and so MindAgents are aware when atoms are no longer in memory (if they chose to be, otherwise they may just ignore it and let the AtomTable deal with re-fetching the Atom from lower AtomSpaceLayers when an operation/retrieval is performed through it).

The reason for passing HandleInstances by smartpointer is to allow all MindAgents to be aware of changes of whether the atom a HandleInstance refers to is in memory or not (and what version of the Atom the HandleInstance points to). Passing HandleInstances by reference also saves memory by avoiding duplicatation of shared data (which isn't an issue when the Handle is just an unsigned int, but by also including an Atom pointer and version information the Handle becomes 3 times larger), although this saving is at the cost of an added dereference. The main point of having a single HandleInstance is to allow the AtomTable to easily update the atom pointer without concern for which MindAgents refer to it.

In some situations, MindAgents may have a strong need to keep an atom in memory. This might either be to ensure good performance when working with low importance atoms, or perhaps when dealing with temporary atoms that won't be saved to the repository, but that the MindAgent needs during the course of it's execution. If this is the case, then one can initialise a ScopedHandle with the Handle or HandleInstance. A ScopedHandle ensures that the atom the Handle refers to will remain in memory for the lifetime of the ScopedHandle.

The AtomSpace

The AtomSpace is the public API available to MindAgents. An AtomSpace implementation also needs to manage the retrieval of atoms from the underlying AtomSpaceLayer.

The AtomSpace API

Queries

Process queries in the AtomSpace (AtomTable will update it's own indexes).

Signals

A change-notification system is needed. Cogs must be able to receive events from the repository, indicating atom creation, request-for-deletion, changes in importance, etc. Events may be generated from the repository itself, or by other Cogs.

Signals from additions/removals from the repository may however be significantly delayed (although functionality should be provided for an explicit check). The only scalable way to do this may be through polling to repository.

The events monitored should be configurable, so as to not generate a flood of events delivered to an individual Cog as the total number of Cogs connected to the repository grows.

Interface

Here is a list of expected commands any object implementing the AtomSpace API should provide (Subject to change):

  • Query qType(Type t, bool subType)
  • Query qNode(String prefix, Type t, bool subType)
  • Query qLink(HandleSet outgoing, Type t, bool subType)
  • Query qIncomingSet(Handle h, Type linkType, bool subType)
  • Query qNeighbours(Handle h, Type linkType, bool subType)
  • Query qOutgoingSet(Handle h)
  • Handle addLink(HandleSet outgoing, Type t, TruthValue& tv)
  • Handle addNode(String name, Type t, TruthValue& tv)
  • Handle addVtree(tree<Vertex> &v)
  • bool removeAtom(Handle h, bool recursive)
  • string getName(Handle h) const
  • Type getType(Handle h) const
  • TruthValue getTV(Handle h, Handle context = NULL_HANDLE) const
  • bool setTV(Handle h, TruthValue& tv, Handle context = NULL_HANDLE)
  • get/set STI/LTI/VLTI
  • get/set normalised STI/LTI
  • uint getArity(Handle h) const

A layered AtomSpace architecture

Storage layers for the AtomSpace can be "chained". e.g.

AtomTable -> DiskLayer -> ? -> RepositoryLayer

Which is more similar to the way memory/storage works in computers:

CPU -> L1 cache -> L2 cache -> RAM -> Swap -> Disk.

Each layer only needs to communicate with the one directly below it if is missing atoms or needs more query results. It's up to individual layers to work out how to index and cache the atoms of the layer below it.

Each layer is a subclass of AtomSpaceLayer.

AtomSpaceLayer();

/**
 * Instantiate layer on top of below. 
 */
AtomSpaceLayer( AtomSpaceLayer& below );

/**
 * @return whether the Handle h was successfully updated with
 * an Atom pointer.
 */
bool getAtom(Handle h);

/**
 * @return A set of up to [max] handles referring to atoms of type [t],
 * that have changed since [time]. maxDepth indicates how many underlying
 * AtomSpaceLayers to check.
 * @note Does not load atoms into memory.
 */
HandleSet getType(Type t, bool subType = false, uint time = 0,
    int maxN=1000, int maxDepth=1);

/**
 * @note Does not load atoms into memory.
 */
Handle getNode(string name, Type t = NODE, bool subType = false,
    int maxDepth=1);

/**
 * @note Does not load atoms into memory.
 */
Handle getLink(HandleSeq outgoing, Type t = LINK, bool subType = false,
    int maxDepth=1);

/**
 * @param maxDepth indicates how far to commit to underlying base
 * layers. 0 indicates it should be stored right to the last.
 */
bool commit(Handle h, int maxDepth=0);

/**
 * attach this layer as the base of another (should only called by
 * another AtomSpaceLayer's constructor.
 * @return the depth that b now has (from the root AtomSpaceLayer/
 *         repository).
 */
int attach(AtomSpaceLayer* b);

/// Virtual methods whose defaults in AtomSpaceLayer are composed of the
/// above "core" methods. They can be implemented in children to be more
/// efficient if necessary/possible.
HandleSeq getOutgoing(Handle, Type t = ATOM, bool subType = false);
HandleSeq getIncoming(Handle, Type t = LINK, bool subType = false);
HandleSeq getNeighbours(Handle, Type t = ATOM, bool subType = false);

To be addressed:

  • What sort of indexes should AtomSpaceLayers keep, should there be any access to them or should AtomSpaceLayers act like black boxes - using whatever internal indexes it needs for efficient retrieval?
  • Should there be user-definable indexes (such as for Queries and the antecedents of Rules)? If so what is the API for these user-definable indexes and how do they stack from one layer to the next?

Pluggable Indexes

The current AtomTable already contains 6 different indexes by default and support for pluggable indexes

  • TODO: list these

The AtomSpaceLayer should also define methods to add new indexes. When this occurs, there should optionally be the ability to tell every layer underneath the current one to add an index based on the same attributes.

(Joel: I considered that we just guarantee that they'll be an equivalent index for every sub-layer, but for things like STI, it doesn't make sense for any layer except the AtomTable to have such an index)

CogServer

Cog interaction

Cogs in an OpenCog instance may communicate via channels other than adding atoms to the repository. There will be a JSON-RPC interface, so theoretically, Cogs can find out what the remote version of a specific atom (but uncommitted to the repository) looks like.

How will a Cog find the other Cogs in the great clockwork of OpenCog?

This could be done by adding a special ConnectedServersSet to the AtomSpace, and then have Cogs add themselves (including their IP and port) to this set. These memberships could be refreshed with LTI periodically by each server, and forgotten otherwise (to protect against Cogs that don't cleanly disconnect).

Sleeping

Perhaps a periodic down-time or "sleeping" phase would drop all local references and allow the repository to be flushed of removed/obsoleted versions (and for that matter, other earlier versions of atoms that are not current too).

The AtomTable

How Handles interact with the AtomTable

The AtomTable maintains a master set of Handles that indicates which Atoms are in memory. When it drops an atom from memory, it removes the Handle from the set and from it's indexes, but MindAgents may still access the HandleInstance through their own Handles. Before the removal from the master set or it's indexes however, the AtomTable resets the HandleInstance's atom smart pointer (any ScopedHandles will still be able to access the in-memory atom).

What happens when an atom is dropped from memory, but a MindAgent tries to bring it back from the underlying AtomSpaceLayer. In other words, if a MindAgent holds a reference to a HandleInstance that no longer points to an in memory atom, but later that atom comes back into memory... The MindAgent would reference an obsolete HandleInstance that the AtomTable was unaware of.

To prevent the above case, Handles removed from the master Handle set of in-memory atoms should be added to a handleRecycleBin. This recycle bin is a map keyed by UUID and whose values refer to the HandleInstance by boost::weak_ptrs only - thus allowing the HandleInstance memory to be cleaned up once it's reference count from other Handles reaches zero (weak_ptrs keep tract of whether the enclosed pointer is valid, but don't increase the ref count).

When an Atom is fetched from the below AtomSpaceLayer, the handleRecycleBin must be checked to ensure there are no active HandleInstances that refer to the same UUID (and version). This check should be quick, as the RecycleBin should never become that large and should also be periodically flushed (i.e. remove the weak_ptrs that are no longer valid references to HandleInstances).

The AtomTable API

The AtomTable implements the AtomSpace and AtomSpaceLayer APIs.


Locking

This could be done by adding a mutex to the HandleInstance (but locking should only done by the AtomTable when it's making a change to an Atom/Handle).

The Repository

The repository is a server such as a traditional database or a BigTable. A single-user, local-disk-volume storage backend would also be useful for testing or small scale applications of OpenCog, although this would have the caveat of being limited to access by a single Cog.

We assume however that the repository provides some amount of "smarts" for queries, e.g. possibly map-reduce functionality. To be cautious we should assume only a small/moderate amount of smarts, such as keeping a few indexes and performing a few basic joins.

The cost of accessing the repository is "high" relative to the AtomTable. Thus, although the repository may maintain indexes of various sorts, the cost of accessing those indexes is also high.

The repository need not be a single server; it may be P2P-like or have whatever architecture is needed to provide scalability so as to allow for simultaneous use by millions of Cogs. This should, however, only be the concern of it's driver (which is an implementation of the AtomSpaceLayer API).

The driver might contain indexes of the repository contents in memory, and are low-cost to query, however, this is also transparent to (and thus out of the control of) any level above the driver. (I don't think this should be assumed though as indexes of BigTable type repositories could be quite huge)


Version conflicts

When the repository has been updated by other Cogs, the local AtomTable is not aware of these changes until the the atom is commited back to the repository or explicitly refreshed. Thus the specifics of merging the changes on an older version of an atom with a newer version in the repository has to be worked out.

As an aside, this is a case of enabling the repository to be designed as a confluently persistent data structure. This should be feasible using HyperTable in for the repository.

Due to the immutability of atoms, the only changes that occur are to an atom's:

  • incoming set - from new links being added or removed that have the atom in question in the link's outgoing set. These can be trivially resolved.
  • TruthValue - Which may change type, or change value. This makes things slightly complicated, but in general simpler TVs can be converted to more complicated ones (e.g. SimpleTruthValue -> Indefinite), and weighted averages of confidence and strength taken to merge values (optionally weighted by how recent the change was too). TVs can also be simplified (Indefinite -> Simple), although this will lose information.
  • AttentionValue - but only the LTI and VLTI parts. These are both trivial merges (For LTI it's the difference from one version to the next that is important and these can be summed).

The other scenario is that, when the AtomTable goes to commit a change to the repository, it finds that a needed atom has been deleted. In this case, the semantics are less clear:

  • If the atom being changed has been deleted: Atoms that are commited to the Repository are not temporary working atoms, thus the main reason for deletion is likely to be "forgetting". Thus, we could perhaps discard the commit anyway, unless the commit was also providing a sufficient increase in LTI, in which case it could be recreated. If the atom that was deleted was a link, and it's destinations are also deleted, then recreation would be more difficult (Although another commit from the same Cog might recreate the destination, see below. However, I think that if MindAgent's are playing around with low LTI atoms, then they should expect for atoms to potentially disappear and changes forgotten).
  • If a link is being added and has the atom being deleted in its outgoing set: not much it can do except recreated the missing atom (i.e. unmark it as obsolete) or fail to add the link. There could possibly be a heuristic of whether to do this based on the LTI of the atom being added.


Since the repository should have some form of versioning (BigTable does), it assists with resolving these changes. Tuning the length of version history will be a trade-off between how long Cogs go without commiting/refreshing their atom versions and the storage required for the repository.

Queries

Queries are made by Agents through the AtomSpace. For example:

Query q = atomspace().qType(CONCEPT_NODE);

(I suggest making all queries prefixed with q, rather than long-winded names like getHandleSetByType, getHandleSetByName)

As shown, a Query object is returned. Note that this doesn't actually carry out the search, it merely creates an object that can talk to the AtomSpace to obtain matching results.

QueryResults

A rough class definition:

class Query {
    // Results are fetched in blocks, current stores the most
    // recently retrieved block.
    std::vector<Handle> current;

    // possibly unneeded if using a singleton AtomTable:
    AtomTable& at;
public:
    enum depth_t { NONE=0, Q_IN_MEMORY, Q_DISK, Q_DISTRIBUTED };

    depth_t currentDepth;

    // How should we initialise the query? Should query objects be
    // creatable outside of the AtomSpace?
    Query();

    // Return current results above, optionally fetch if current is
    // empty.
    const std::vector<Handle>& getCurrentResults(bool fetch=false) const;
    // Get "limit" results, going as far as the specified depth.
    // Returns the number of results fetched.
    int fetch(depth_t depth = NONE, int limit);

    Handle next(bool fetchMore = false);
    bool hasNext();
    bool isComplete();
    // We could also have for_each methods the process each result with
    // a bound function, or methods that reduce the results of
    // applying a unary predicate to all the results.
}

A usage example to get all ConceptNodes (pretty much guaranteed to be a huge result in any OpenCog instance):

Query q = atomspace().qType(CONCEPT_NODE, Query::Q_DISK);

// On the first fetch, the in memory handles are retrieved first, even if
// the the maxDepth is set for disk.
q.fetch();
bool enough = false;
foreach (result, q, true)
// true indicates that the Request object should automatically request
more results
// once the buffer is empty...
{
   // various operations, if enough results processed then break out of loop.
   if (enough) break;
}
if (!enough) {
   // Going through all memory and disk handles was not enough...
   // alter the request to also check the distributed AtomSpace.

   // distributed requests are expensive, so limit to 2.
   int maxDistributedRequests = 2;
   r.setMaxDepth(Query::Q_DISTRIBUTED);
   while (maxDistributedRequests > 0 && !q.isComplete())
       foreach (result, q, false) {
           // Operations on distributed handles, bearing in mind these are
           // likely not to be in memory.
      }
   }
}

Representing queries in the AtomSpace

Queries are represented as nodes in the AtomSpace. Agents can create the specifics using a subgraph and VariableNodes, or get the AtomSpace to construct an appropriate subgraph and just return a Query object (through which the AtomSpace subgraph representation can be accessed too).

  • The AtomSpace will have a separate thread for fulfilling queries
  • Queries are prioritised by STI
  • Queries and their SatisfyingSet (if it exists) get forgotten when their LTI drops too low.
  • MindAgents can however get immediate results of matching atoms in memory. Disk and distributed queries are not guaranteed to retrieve immediately (NOTE: this would change the way that MindAgents were implemented, and we could perhaps allow disk/distributed queries to be done immediately when the MindAgent asks for them).
  • Query subgraphs never get stored in the Repository.

Query subclasses

Queries have the potential to return a lot of results when they are generic (such as the above example for retrieving all ConceptNodes). For this reason, it may be worthwhile have Query subclasses such as TypeQuery. These wouldn't necessarily create SatisfyingSets in the AtomSpace (as that could be expensive and redundant).

... carrying on from that. Should we make a variety of query subclasses: SubgraphQuery (for those represented in the AtomSpace), TypeQuery (directly use AtomTable and DistributedAtomTable type indexes), other?

Actual implementation

There already exists a "completely general" query mechanism for obtaining atoms that are in the AtomTable. This could be used as basis for integration into within the AtomTable, and/or extended to allow Query's to be represented in the AtomSpace.

Queries as part of Rules

Rules are essentially made up of a target/result and a query or template subgraph that's needed to construct the target.

There is discussion to try and generalise the multiple rule systems in OpenCog (PLN, query, RelEx, RelExToFrame) into a Unified Rule Engine. It would be worthwhile trying to get whatever form the Rules take to use the Query object/subgraphs as their templates. They could then maintain indexes through the SatisfyingSets of the Queries.

Attention Allocation Heuristics

Here are some simple heuristics for how STI and LTI might relate to AtomTable/repository storage:

  • high STI and low LTI atoms should probably not be stored in the repository unless/until their LTI increase.
  • low STI atoms should probably be dropped from local instances.
  • old and very low LTI atoms should probably be forgotten from the repository.
  • since STI only really makes sense in terms of in-memory atoms, the repository probably shouldn't store this information. Atoms freshly retrieved from the repository and put in an AtomTable would be given 0 STI.
  • atoms with VLTI (very long term importance) should immediately be put in the Repository.

Forgetting

More intelligent forgetting and dropping of atoms will eventually be implemented (given two low LTI atoms, the one that is hardest to recreate, or has more knowledge value, should be forgotten after the other)

Or if there are 100 medium-LTI Atoms, but these can be quickly and somewhat accurately generated from 20 of them, it may be best to delete the other 80 .. even though they're nowhere near the lowest-LTI ones in the AtomTable.

This kind of analysis and freeing of space could also been done on the repository during "sleep phases".

MindAgent Design

How does one develop a MindAgent that understands that some atoms are not in memory and behaves accordingly (preference to

A cat example

Imagine we have a node for the general concept of "cat" in RAM, and there are

  • nodes for 50 specific cats stored in RAM, and linked to from the "cat" node
  • links from the "cat" node to 75 additional "specific cat" nodes stored in the repository (these are links that point to Handles that correspond to nodes currently living in disk but not RAM) ... we call these 75 "semi-accessible" nodes.
  • nodes for 5000 other specific cats stored on disk

If PLN needs to do some reasoning about cats, in many cases just following the links from "cat" to the 50 specific cats in RAM should be good enough.

If it needs to dip into the other 5000 cats, there's nothing simple for it to do, except just load some randomly, and see where that leads it. It may want to use some heuristic like loading the ones that used to have the highest average STI values or something.

On the other hand, if it wants to dip into the 75 "semi-accessible" specific-cat nodes on disk, then the MindAgent has some other alternatives available. Maybe it knows it's specifically interested in American cats, and it can then select the 25 out of these 75 semi-accessible on-disk specific-cats that are also known to be American (based on the links in RAM), and just load those.

e.g. If we had the following (with a * denoting "not in memory").

InheritanceLink 
    ConceptNode cat_x *
    ConceptNode American
InheritanceLink
    ConceptNode cat_x *
    ConceptNode Cat
InheritanceLink
    ConceptNode cat_y *
    ConceptNode Cat
InheritanceLink
    ConceptNode fail_cat
    ConceptNode Cat

Then we might try to reason with "fail_cat", find it's unsuitable, and then we are left with the choice of loading "cat_x" and "cat_y". From the InheritanceLink from cat_x to American, we deduce that cat_x is the one we should load from the repository in preference to cat_y (Although, in reality, for such a small set we'd probably just grab both!)