Get self-descriptions from connectors registered in the Metadata Broker

Example - Get self-descriptions of an external connector

Last update: 2024-02-28

This request retrieves the self-descriptions of connectors registered in the

metadata broker.

The following operations are demonstrated:

  1. Load environment variables (your connector configs) from a .env file.

  2. Establish a connection to your TSG connector.

  3. Retrieve and print information about external connector self-descriptions (registered in the Metadata Broker)

Important

  • Ensure that the required environment variables (Your Connector API_KEY, CONNECTOR_ID, ACCESS_URL and AGENT_ID) are set in the .env file before using this request.

  • The connector API_KEY can be retrieved by loging into the TSG connector UI and navigating to the ‘API Keys’ tab.

Execute the code below to make a request to the metadata broker

 1    from pprint import pprint
 2    from loguru import logger
 3    from dotenv import dotenv_values
 4    from tsg_client.controllers import TSGController
 5
 6    # Comment the line below to enable internal logger:
 7    logger.disable("")
 8
 9    # Load environment variables:
10    config = dotenv_values('.env')
11
12    # Connect to our TSG connector:
13    conn = TSGController(
14        api_key=config['API_KEY'],
15        connector_id=config['CONNECTOR_ID'],
16        access_url=config['ACCESS_URL'],
17        agent_id=config['AGENT_ID'],
18        metadata_broker_url=config['METADATA_BROKER_URL']
19    )
20
21    # Request data from DS Metadata Broker:
22    result = conn.query_metadata_broker()
23    pprint(result)
24
25    print("-" * 79)
26    print("> Connectors w/ self-descriptions in metadata-broker")
27    for k in result:
28        print("Connector ID:", k["@id"])
29        print("Access URL:", k["ids:hasEndpoint"][0]["ids:accessURL"]["@id"].split('/router')[0])
30        print("Agent ID:", k["ids:maintainer"]["@id"])
31        print("-")
32
33    print("-" * 79)
34    print("> Connectors w/ data apps")
35    for k in result:
36        try:
37            rc = k["ids:resourceCatalog"]
38        except KeyError:
39            continue
40        data_apps = [x["@id"] for x in rc if 'data-app' in x["@id"]]
41        if any(data_apps):
42            print("Connector:", k["@id"])
43            print("Data Apps:", data_apps)