Skip to content

Commit

Permalink
supporting content for ingesting-data-with-big-query (#367)
Browse files Browse the repository at this point in the history
  • Loading branch information
Delacrobix authored Mar 5, 2025
1 parent f598877 commit 6f0b47d
Showing 1 changed file with 391 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,391 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "LlrEjmtJNpuX"
},
"source": [
"# Ingesting data with BigQuery\n",
"\n",
"This notebook demonstrates how to consume data contained in BigQuery and index it into Elasticsearch. This notebook is based on the article [Ingesting data with BigQuery](https://www.elastic.co/search-labs/blog/ingesting-data-with-big-query)."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "GNaAN-GNO5qp"
},
"source": [
"## Installing dependencies and importing packages"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "qgclZayCk1Ct",
"outputId": "7da1e962-ead6-4016-b2e5-12a019885d86"
},
"outputs": [],
"source": [
"!pip install google-cloud-bigquery elasticsearch==8.16 google-auth"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "rAesontNXpLu"
},
"outputs": [],
"source": [
"from elasticsearch import Elasticsearch, exceptions\n",
"from google.cloud import bigquery\n",
"from google.colab import auth\n",
"from getpass import getpass\n",
"\n",
"import json"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "NwOmnk99Pfh3"
},
"source": [
"## Declaring variables"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "-4sV9fiXdBwj"
},
"source": [
"This code will create inputs where you can enter your credentials.\n",
"Here you can learn how to retrieve your Elasticsearch credentials: [Finding Your Cloud ID](https://www.elastic.co/search-labs/tutorials/install-elasticsearch/elastic-cloud#finding-your-cloud-id)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "GVKJKfFpPWuj",
"outputId": "21c6f4e3-8cb5-4a2c-8efe-0e45c3c6b1c4"
},
"outputs": [],
"source": [
"ELASTICSEARCH_ENDPOINT = getpass(\"Elasticsearch endpoint: \")\n",
"ELASTIC_API_KEY = getpass(\"Elastic Api Key: \")\n",
"\n",
"\n",
"# Google Cloud project name and BigQuery dataset name\n",
"PROJECT_ID = \"elasticsearch-bigquery\"\n",
"# dataset_id in format <your-project-name>.<your-dataset-name>\n",
"DATASET_ID = f\"{PROJECT_ID}.server_logs\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "3O2HclcYHEsS"
},
"source": [
"## Instance a Elasticsearch client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "1LWiop8NYiQF"
},
"outputs": [],
"source": [
"auth.authenticate_user()\n",
"\n",
"# Elasticsearch client\n",
"es_client = Elasticsearch(\n",
" ELASTICSEARCH_ENDPOINT,\n",
" api_key=ELASTIC_API_KEY,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "9lvPHaXjPlfu"
},
"source": [
"## Creating mappings"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "tc88YzAYw31e"
},
"outputs": [],
"source": [
"try:\n",
" es_client.indices.create(\n",
" index=\"bigquery-logs\",\n",
" body={\n",
" \"mappings\": {\n",
" \"properties\": {\n",
" \"status_code_description\": {\"type\": \"match_only_text\"},\n",
" \"status_code\": {\"type\": \"keyword\"},\n",
" \"@timestamp\": {\"type\": \"date\"},\n",
" \"ip_address\": {\"type\": \"ip\"},\n",
" \"http_method\": {\"type\": \"keyword\"},\n",
" \"endpoint\": {\"type\": \"keyword\"},\n",
" \"response_time\": {\"type\": \"integer\"},\n",
" }\n",
" }\n",
" },\n",
" )\n",
"except exceptions.RequestError as e:\n",
" if e.error == \"resource_already_exists_exception\":\n",
" print(\"Index already exists.\")\n",
" else:\n",
" raise e"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "AOtwAfPXP38Z"
},
"source": [
"## Getting data from BigQuery"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "DyKtsjXRXB2S",
"outputId": "7734ad7e-cb11-42cc-b97d-e9241cab6b17"
},
"outputs": [],
"source": [
"client = bigquery.Client(project=PROJECT_ID)\n",
"# Getting tables from dataset\n",
"tables = client.list_tables(DATASET_ID)\n",
"\n",
"data = {}\n",
"\n",
"for table in tables:\n",
" # Table id must be in format <dataset_name>.<table_name>\n",
" table_id = f\"{DATASET_ID}.{table.table_id}\"\n",
"\n",
" print(f\"Processing table: {table.table_id}\")\n",
"\n",
" # Query to retrieve BigQuery tables data\n",
" query = f\"\"\"\n",
" SELECT *\n",
" FROM `{table_id}`\n",
" \"\"\"\n",
"\n",
" query_job = client.query(query)\n",
"\n",
" results = query_job.result()\n",
"\n",
" print(f\"Results for table: {table.table_id}:\")\n",
"\n",
" data[table.table_id] = []\n",
"\n",
" for row in results:\n",
" # Saving data with key=table_id\n",
" data[table.table_id].append(dict(row))\n",
" print(row)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "UAznwqXStJ39",
"outputId": "508a3255-43f7-4828-87d5-997cd04ca427"
},
"outputs": [],
"source": [
"# variable with data\n",
"logs_data = data[\"logs\"]\n",
"\n",
"\n",
"print(logs_data)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "2s4Tr6wBP773"
},
"source": [
"## Indexing to Elasticsearch"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "C4goyH6ZbDJK",
"outputId": "cfb4af41-1ef1-40da-a2da-6c0aa83633d3"
},
"outputs": [],
"source": [
"bulk_data = []\n",
"\n",
"for log_entry in logs_data:\n",
" # Convert timestamp to ISO 8601 string\n",
" timestamp_iso8601 = log_entry[\"_timestamp\"].isoformat()\n",
"\n",
" # Prepare action metadata\n",
" action_metadata = {\n",
" \"index\": {\n",
" \"_index\": \"bigquery-logs\",\n",
" \"_id\": f\"{log_entry['ip_address']}-{timestamp_iso8601}\",\n",
" }\n",
" }\n",
"\n",
" # Prepare document\n",
" document = {\n",
" \"ip_address\": log_entry[\"ip_address\"],\n",
" \"status_code\": log_entry[\"status_code\"],\n",
" \"@timestamp\": timestamp_iso8601,\n",
" \"http_method\": log_entry[\"http_method\"],\n",
" \"endpoint\": log_entry[\"endpoint\"],\n",
" \"response_time\": log_entry[\"response_time\"],\n",
" \"status_code_description\": log_entry[\"status_code_description\"],\n",
" }\n",
"\n",
" # Append to bulk data\n",
" bulk_data.append(action_metadata)\n",
" bulk_data.append(document)\n",
"\n",
"print(bulk_data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "WPEwsJrFbDHQ",
"outputId": "ab5904f7-21c1-4596-fb06-55569cd9eb17"
},
"outputs": [],
"source": [
"try:\n",
" # Indexing data\n",
" response = es_client.bulk(body=bulk_data)\n",
"\n",
" if response[\"errors\"]:\n",
" print(\"Errors while indexing:\")\n",
" for item in response[\"items\"]:\n",
" if \"error\" in item[\"index\"]:\n",
" print(item[\"index\"][\"error\"])\n",
" else:\n",
" print(\"Documents indexed successfully.\")\n",
"except Exception as e:\n",
" print(f\"Bulk indexing failed: {e}\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "uxix_o8LQDup"
},
"source": [
"# Searching data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "285_MwI8yknk",
"outputId": "67d43aca-b05d-43f7-c730-1fa3bc0c4662"
},
"outputs": [],
"source": [
"response = es_client.search(\n",
" index=\"bigquery-logs\",\n",
" body={\n",
" \"query\": {\"match\": {\"status_code_description\": \"error\"}},\n",
" \"sort\": [{\"@timestamp\": {\"order\": \"desc\"}}],\n",
" \"aggs\": {\"by_ip\": {\"terms\": {\"field\": \"ip_address\", \"size\": 10}}},\n",
" },\n",
")\n",
"\n",
"# Print results\n",
"formatted_json = json.dumps(response.body, indent=4)\n",
"print(formatted_json)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "S6WZMJayyzxh"
},
"source": [
"## Deleting\n",
"\n",
"Finally, we can delete the resources used to prevent them from consuming resources."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "9UcQwa41yy_x",
"outputId": "02a33d89-b57c-4273-ba93-5b6441a4f91e"
},
"outputs": [],
"source": [
"# Cleanup - Delete Index\n",
"es_client.indices.delete(index=\"bigquery-logs\", ignore=[400, 404])"
]
}
],
"metadata": {
"colab": {
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}

0 comments on commit 6f0b47d

Please sign in to comment.