|
| 1 | +import { getStreamDataWithHeaders } from '@/api/query'; |
| 2 | +import { StatusCodes } from 'http-status-codes'; |
| 3 | +import useMountedState from './useMountedState'; |
| 4 | +import { useAppStore } from '@/layouts/MainLayout/providers/AppProvider'; |
| 5 | +import _ from 'lodash'; |
| 6 | +import { AxiosError } from 'axios'; |
| 7 | +import { useStreamStore } from '@/pages/Stream/providers/StreamProvider'; |
| 8 | +import { |
| 9 | + correlationStoreReducers, |
| 10 | + CORRELATION_LOAD_LIMIT, |
| 11 | + useCorrelationStore, |
| 12 | +} from '@/pages/Correlation/providers/CorrelationProvider'; |
| 13 | +import { notifyError } from '@/utils/notification'; |
| 14 | +import { useQuery } from 'react-query'; |
| 15 | +import { LogsResponseWithHeaders } from '@/@types/parseable/api/query'; |
| 16 | +import { useRef, useEffect } from 'react'; |
| 17 | + |
| 18 | +const { setStreamData } = correlationStoreReducers; |
| 19 | + |
| 20 | +export const useFetchStreamData = () => { |
| 21 | + const [error, setError] = useMountedState<string | null>(null); |
| 22 | + const [{ selectedFields, correlationCondition, fields, streamData }, setCorrelationStore] = useCorrelationStore( |
| 23 | + (store) => store, |
| 24 | + ); |
| 25 | + const [streamInfo] = useStreamStore((store) => store.info); |
| 26 | + const [currentStream] = useAppStore((store) => store.currentStream); |
| 27 | + const timePartitionColumn = _.get(streamInfo, 'time_partition', 'p_timestamp'); |
| 28 | + const [timeRange] = useAppStore((store) => store.timeRange); |
| 29 | + const [ |
| 30 | + { |
| 31 | + tableOpts: { currentOffset }, |
| 32 | + }, |
| 33 | + ] = useCorrelationStore((store) => store); |
| 34 | + const streamNames = Object.keys(fields); |
| 35 | + |
| 36 | + const prevTimeRangeRef = useRef({ startTime: timeRange.startTime, endTime: timeRange.endTime }); |
| 37 | + |
| 38 | + const hasTimeRangeChanged = |
| 39 | + prevTimeRangeRef.current.startTime !== timeRange.startTime || |
| 40 | + prevTimeRangeRef.current.endTime !== timeRange.endTime; |
| 41 | + |
| 42 | + useEffect(() => { |
| 43 | + prevTimeRangeRef.current = { startTime: timeRange.startTime, endTime: timeRange.endTime }; |
| 44 | + }, [timeRange.startTime, timeRange.endTime]); |
| 45 | + |
| 46 | + const defaultQueryOpts = { |
| 47 | + startTime: timeRange.startTime, |
| 48 | + endTime: timeRange.endTime, |
| 49 | + limit: CORRELATION_LOAD_LIMIT, |
| 50 | + pageOffset: currentOffset, |
| 51 | + timePartitionColumn, |
| 52 | + selectedFields: _.flatMap(selectedFields, (values, key) => _.map(values, (value) => `${key}.${value}`)) || [], |
| 53 | + correlationCondition: correlationCondition, |
| 54 | + }; |
| 55 | + |
| 56 | + const { |
| 57 | + isLoading: logsLoading, |
| 58 | + isRefetching: logsRefetching, |
| 59 | + refetch: getFetchStreamData, |
| 60 | + } = useQuery( |
| 61 | + ['fetch-logs', defaultQueryOpts], |
| 62 | + async () => { |
| 63 | + const streamsToFetch = hasTimeRangeChanged |
| 64 | + ? streamNames |
| 65 | + : streamNames.filter((streamName) => !Object.keys(streamData).includes(streamName)); |
| 66 | + |
| 67 | + const fetchPromises = streamsToFetch.map((streamName) => { |
| 68 | + const queryOpts = { ...defaultQueryOpts, streamNames: [streamName] }; |
| 69 | + return getStreamDataWithHeaders(queryOpts); |
| 70 | + }); |
| 71 | + return Promise.all(fetchPromises); |
| 72 | + }, |
| 73 | + { |
| 74 | + enabled: false, |
| 75 | + refetchOnWindowFocus: false, |
| 76 | + onSuccess: async (responses) => { |
| 77 | + responses.map((data: { data: LogsResponseWithHeaders; status: StatusCodes }) => { |
| 78 | + const logs = data.data; |
| 79 | + const isInvalidResponse = _.isEmpty(logs) || _.isNil(logs) || data.status !== StatusCodes.OK; |
| 80 | + if (isInvalidResponse) return setError('Failed to query logs'); |
| 81 | + |
| 82 | + const { records, fields } = logs; |
| 83 | + if (fields.length > 0 && !correlationCondition) { |
| 84 | + return setCorrelationStore((store) => setStreamData(store, currentStream || '', records)); |
| 85 | + } else if (fields.length > 0 && correlationCondition) { |
| 86 | + return setCorrelationStore((store) => setStreamData(store, 'correlatedStream', records)); |
| 87 | + } else { |
| 88 | + notifyError({ message: `${currentStream} doesn't have any fields` }); |
| 89 | + } |
| 90 | + }); |
| 91 | + }, |
| 92 | + onError: (data: AxiosError) => { |
| 93 | + const errorMessage = data.response?.data as string; |
| 94 | + setError(_.isString(errorMessage) && !_.isEmpty(errorMessage) ? errorMessage : 'Failed to query logs'); |
| 95 | + }, |
| 96 | + }, |
| 97 | + ); |
| 98 | + |
| 99 | + return { |
| 100 | + error, |
| 101 | + loading: logsLoading || logsRefetching, |
| 102 | + getFetchStreamData, |
| 103 | + }; |
| 104 | +}; |
0 commit comments