Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Script to verify tiles described in Solr are present in Cassandra (and vice versa) #224

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
a1cc8ab
Initial iteration of tile check script
Dec 6, 2022
8bb6161
Logging + better async cassandra query
Dec 6, 2022
86d4735
Improved logging and handling of failed cassandra queries
Dec 13, 2022
6ba1bc8
Updated changelog to reflect change happening after release 1.0.0
Dec 13, 2022
9f7e18b
Undid accidentally committed changes to analysis/setup.py
Dec 13, 2022
4bfeca1
Updates
Dec 14, 2022
efacbd3
Updates
Dec 19, 2022
a2820dc
Actually try to enforce the limit argument.
Dec 19, 2022
a8f396e
Added defaults for Cassandra uname & password
Dec 19, 2022
5ce8f22
Alternate check to verify Cassandra tiles are in Solr
Dec 20, 2022
3728620
Update dockerfile to install requirements
Dec 21, 2022
5cbd07a
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Jan 23, 2023
78de29c
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
Jan 23, 2023
5b68d9e
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
Feb 14, 2023
2dc4d7d
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Feb 21, 2023
9ae066e
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Mar 13, 2023
98903bc
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Mar 20, 2023
268bd8e
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Mar 23, 2023
66aacca
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
Mar 30, 2023
deafc2a
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
Mar 31, 2023
0a85096
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff May 4, 2023
b06e994
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
May 16, 2023
ac66e14
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff May 17, 2023
d994835
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff May 18, 2023
ca109a6
Added scripts for orphan tile verification and deletion
Jun 15, 2023
58aa17c
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Jun 22, 2023
0c1a467
Merge remote-tracking branch 'RKuttruff/solr-cassandra-match' into so…
Jun 22, 2023
bc66ce5
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Jul 10, 2023
159ae07
Merge remote-tracking branch 'RKuttruff/solr-cassandra-match' into so…
Jul 10, 2023
96c6d9d
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Aug 1, 2023
ccc40a4
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
Aug 21, 2023
d16c3bc
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
Aug 22, 2023
b114de4
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Aug 23, 2023
0f89a95
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Sep 6, 2023
a1e5446
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Sep 7, 2023
2848817
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Sep 14, 2023
555836e
Merge remote-tracking branch 'origin/master' into solr-cassandra-match
Sep 14, 2023
d373140
Merge branch 'apache:master' into solr-cassandra-match
RKuttruff Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added scripts for orphan tile verification and deletion
  • Loading branch information
rileykk committed Jun 15, 2023
commit ca109a612534efcc5b4b81edb9c32077f6675adb
225 changes: 225 additions & 0 deletions tools/solr-cassandra-match/deletebyid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import uuid
import json
import asyncio

import logging
from datetime import datetime
from functools import partial
from time import sleep
from typing import List, Tuple, Union

import cassandra.concurrent
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.policies import RoundRobinPolicy, TokenAwarePolicy
from six.moves import input
from solrcloudpy import SolrConnection, SearchOptions
from tenacity import retry, stop_after_attempt, wait_exponential
from tqdm import tqdm


logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(threadName)s] [%(levelname)s] [%(name)s::%(lineno)d] %(message)s"
)

logger = logging.getLogger('delete-by-id')

logger.setLevel(logging.INFO)
logging.getLogger().handlers[0].setFormatter(
logging.Formatter(
fmt="%(asctime)s [%(threadName)s] [%(levelname)s] [%(name)s::%(lineno)d] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S"
))

logging.getLogger('cassandra').setLevel(logging.CRITICAL)
logging.getLogger('solrcloudpy').setLevel(logging.CRITICAL)

CASSANDRA_BATCH_SIZE = 8192
SOLR_BATCH_SIZE = 256


def delete_from_solr(args, ids):
pass


def delete_from_cassandra(args, ids):
logger.info('Trying to connect to Cassandra...')

dc_policy = RoundRobinPolicy()
token_policy = TokenAwarePolicy(dc_policy)

if args.cassandraUsername and args.cassandraPassword:
auth_provider = PlainTextAuthProvider(username=args.cassandraUsername, password=args.cassandraPassword)
else:
auth_provider = None

cassandra_cluster = Cluster(contact_points=args.cassandra, port=args.cassandraPort,
protocol_version=int(args.cassandraProtocolVersion),
execution_profiles={
EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=token_policy)
},
auth_provider=auth_provider)
cassandra_session = cassandra_cluster.connect(keyspace=args.cassandraKeyspace)

logger.info('Successfully connected to Cassandra')

cassandra_table = args.cassandraTable

batches = [ids[i:i + CASSANDRA_BATCH_SIZE] for i in range(0, len(ids), CASSANDRA_BATCH_SIZE)]

logger.info(f'Prepared {len(batches):,} batches of tile ids to delete')

start = datetime.now()

n_tiles = len(ids)
deleting = 0

prepared_query = cassandra_session.prepare('DELETE FROM %s WHERE tile_id=?' % cassandra_table)

@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=10, max=60))
def delete_batch(batch):
for tile_id in batch:
futures.append(cassandra_session.execute_async(prepared_query, (tile_id,)))

for f in futures:
try:
f.result()
except:
logger.warning('Batch delete failed; maybe retrying')
raise

for batch in batches:
futures = []

deleting += len(batch)

logger.info(
f'Deleting batch of {len(batch)} tiles from Cassandra | '
f'({deleting}/{n_tiles}) [{deleting / n_tiles * 100:7.3f}%]')

try:
delete_batch(batch)
except:
logger.critical('Failed to delete batch after multiple retries, exiting')
exit(1)

logger.info(f'Deleted {len(ids):,} tiles from Cassandra in {str(datetime.now() - start)} seconds')


def read_ids(args):
logger.info(f'Reading ids from file {args.id_file}')

with open(args.id_file) as f:
ids = json.load(f)

return [uuid.UUID(i) for i in ids]


def main(args):
ids = read_ids(args)

logger.info(f'Successfully read {len(ids):,} tile ids to delete')

if args.target == 'solr':
delete_from_solr(args, ids)
else:
delete_from_cassandra(args, ids)


def parse_args():
parser = argparse.ArgumentParser(description='Delete a list of tile ids from either Solr or Cassandra',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)

parser.add_argument('--target',
required=True,
choices=['solr', 'cassandra'],
help='Store to delete data from',
dest='target')

parser.add_argument('-i', '--id-list',
required=True,
dest='id_file',
help='Path to JSON file containing a list of tile UUIDs to delete')

parser.add_argument('--solr',
help='The url of the SOLR server.',
default='localhost:8983',
metavar='127.0.0.1:8983')

parser.add_argument('--collection',
help='The name of the SOLR collection.',
required=False,
default='nexustiles',
metavar='nexustiles')

parser.add_argument('--solrIdField',
help='The name of the unique ID field for this collection.',
required=False,
default='id',
metavar='id')

parser.add_argument('--cassandra',
help='The hostname(s) or IP(s) of the Cassandra server(s).',
default=['localhost'],
nargs='+',
metavar=('127.0.0.100', '127.0.0.101'))

parser.add_argument('-k', '--cassandraKeyspace',
help='The Cassandra keyspace.',
default='nexustiles',
required=False,
metavar='nexustiles')

parser.add_argument('-t', '--cassandraTable',
help='The name of the cassandra table.',
required=False,
default='sea_surface_temp')

parser.add_argument('-p', '--cassandraPort',
help='The port used to connect to Cassandra.',
required=False,
default='9042')

parser.add_argument('--cassandraUsername',
help='The username used to connect to Cassandra.',
default='cassandra',
required=False)

parser.add_argument('--cassandraPassword',
help='The password used to connect to Cassandra.',
default='cassandra',
required=False)

parser.add_argument('-pv', '--cassandraProtocolVersion',
help='The version of the Cassandra protocol the driver should use.',
required=False,
choices=['1', '2', '3', '4', '5'],
default='3')

parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='Enable verbose output')

return parser.parse_args()


if __name__ == '__main__':
args = parse_args()
main(args)


24 changes: 24 additions & 0 deletions tools/solr-cassandra-match/match.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import concurrent.futures
import json
@@ -328,7 +343,13 @@ def cassandra_to_solr(args):

pool = concurrent.futures.ThreadPoolExecutor(max_workers=16, thread_name_prefix='solr-query-worker')

limit_reached = False

for result in tqdm(results, total=num_tiles, desc='Cassandra query', unit=' rows'):
if limit_reached:
logger.warning('Reached check limit; stopping check')
break

cassandra_tiles.append(str(result.tile_id))
cassandra_tile_count += 1

@@ -338,6 +359,9 @@ def cassandra_to_solr(args):

missing.extend(check_solr(args, to_check, pool))

if args.limit is not None and len(missing) >= args.limit:
limit_reached = True

if len(cassandra_tiles) > 0:
missing.extend(check_solr(args, cassandra_tiles, pool))

Loading
Oops, something went wrong.