package com.illumon.iris.importers;

import com.fishlib.base.log.LogOutput;
import com.fishlib.configuration.Configuration;
import com.fishlib.io.logger.Log4jAdapter;
import com.fishlib.io.logger.Logger;
import com.fishlib.io.logger.ProcessStreamLoggerImpl;
import com.fishlib.util.PidFileUtil;
import com.fishlib.util.process.ProcessEnvironment;
import com.illumon.dataobjects.ColumnDefinition;
import com.illumon.iris.binarystore.*;
import com.illumon.iris.db.tables.TableDefinition;
import com.illumon.iris.db.tables.dataimport.DbNamespace;
import com.illumon.iris.db.tables.utils.DBDateTime;
import com.illumon.iris.utils.SystemLoggerTimeSource;
import org.apache.commons.cli.*;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.*;

/**
 * <p> This file is an example importer that illustrates the extensions to
 * {@link BaseImporter} needed to create a custom importer.
 *
 * <p> There are two examples. Both manufacture data based on a row index.
 *
 * <p> {@link #processDataSimple()} will write data to a table defined by the following schema:
 * <pre>{@code
 * <Table name="DemoTable" namespace="ExampleNamespace" rethrowLoggerExceptionsAsIOExceptions="false" storageType="NestedPartitionedOnDisk">
 *      <Partitions keyFormula="${autobalance_by_first_grouping_column}" />
 *
 *      <Column name="StringCol" dataType="String" />
 *      <Column name="DateTimeCol" dataType="DateTime" />
 *      <Column name="IntegerCol" dataType="Integer" />
 *      <Column name="LongCol" dataType="Long" />
 *      <Column name="DoubleCol" dataType="Double" />
 *      <Column name="FloatCol" dataType="Float" />
 *      <Column name="BooleanCol" dataType="Boolean" />
 *      <Column name="CharCol" dataType="Char" />
 *      <Column name="ByteCol" dataType="Byte" />
 *      <Column name="ShortCol" dataType="Short" />
 *      <Column name="Date" dataType="String" columnType="Partitioning" />
 * </Table>
 * }</pre>
 *
 * <p> The getters and setters are hard-coded and stored in variables. This method
 * is simple, but tedious.
 *
 * <p> A valid invocation will include options similar to the following
 * (note that the namespace and table name parameters match the example schema above:
 * <p><code>     -dp localhost/2018-02-28 -ns ExampleNamespace -tn DemoTable -om REPLACE -nd 0 -st 100 -nr 123  </code>
 *
 * <p> {@link #processDataDynamic()} will write data to any table with an existing
 * schema.
 * The code to get the data is a calculation based on the index and data
 * type. There are several layers of code to move calculations out of the row
 * processing loop, and to ensure that unnecessary inefficiency (e.g. boxing of
 * primitives) is avoided.

 * <p> A valid invocation will include options similar to this:
 * <p><code>     -dp localhost/2018-02-28 -ns AnyNamespace -tn AnyTable -om REPLACE -nd 0 -st 100 -nr 123 </code>
 */
public class ExampleImporter extends BaseImporter<ExampleImporter.Arguments> {
    /**
     * Define this inner class if you need to add arguments to StandardImporterArguments.
     * Define the new options in {@link #addOptions} and handle them in the constructor.
     */
    static class Arguments extends StandardImporterArguments {

        // constants for the custom argument strings
        private static final String WELCOME_MESSAGE_OPTION = "wm";
        private static final String START_OPTION = "st";
        private static final String NUM_ROWS_OPTION = "nr";
        private static final String SIMPLE_OPTION = "simple";

        // hold onto parsed values
        private final int startIndex;
        private final int numRows;
        private final String message;
        private final boolean simpleExample;

        /**
         * Construct Arguments from a command line.
         *
         * @param commandLine the command line, as passed from {@link ExampleImporter#main}.
         * @throws ParseException if the command line or parameters are not valid
         */
        Arguments(@NotNull CommandLine commandLine) throws ParseException {
            super(commandLine);
            // validate and handle custom options here

            // Example usage of an optional String argument
            if (commandLine.hasOption(WELCOME_MESSAGE_OPTION)) {
                message = commandLine.getOptionValue(WELCOME_MESSAGE_OPTION);
            } else {
                message = null;
            }
            try {
                // optional Number argument
                if (commandLine.hasOption(START_OPTION)) {
                    startIndex = ((Number)commandLine.getParsedOptionValue(START_OPTION)).intValue();
                } else {
                    startIndex = 0;
                }
                // required Number argument
                numRows = ((Number)commandLine.getParsedOptionValue(NUM_ROWS_OPTION)).intValue();
            } catch (ParseException pe) {
                // The option could not be parsed into the declared type.
                // You don't really want to catch and immediately rethrow - handle the exception here, or remove the catch.
                throw pe;
            }
            simpleExample = commandLine.hasOption(SIMPLE_OPTION);
        }

        /**
         * Alternate constructor for usage that doesn't start at main (with a command line).
         *
         * @param namespace namespace of the destination table
         * @param tableName name of the destination table
         * @param partitionString internal and column partition values ("internalpartition/columnpartition")
         * @param outputMode a valid {@link ImportOutputMode} value
         * @param startIndex see the -st command line option: "Generate rows starting at startIndex (default is 0)"
         * @param numRows see the -nr command line option: "Generate this many rows"
         */
        Arguments(// StandardImporterArguments values
                  @NotNull final String namespace,
                  @NotNull final String tableName,
                  @NotNull final String partitionString,
                  @NotNull final String outputMode,
                  // ExampleImporter.Arguments values
                  final int startIndex,
                  final int numRows) {
            super(DbNamespace.valueOf(namespace), tableName, partitionString, null, null, ImportOutputMode.valueOf(outputMode));
            this.startIndex = startIndex;
            this.numRows = numRows;
            this.simpleExample = false;
            this.message = null;
        }

        /**
         * Alter the standard arguments passed in by adding options specific to this importer.
         * See {@link Options}, {@link Option} and {@link Option.Builder}.
         *
         * @param options addOptions should add new options to this set of Options.
         * @return an Options with custom modifications to the input options
         */
        private static Options addOptions(@NotNull final Options options) {
            // the following option produces this usage string:
            //   -wm,--welcomeMessage <message>                                  Example string argument
            Option wmOption =
                    Option.builder(WELCOME_MESSAGE_OPTION)  // short name of option, e.g. -wm vs --welcomeMessage
                            .longOpt("welcomeMessage")              // long name of option, e.g. --welcomeMessage vs -wm
                            .desc("Example string argument")        // option description, used in usage message
                            .hasArg()                               // include this if the option has an argument
                            .argName("message")                     // name of the argument value
                            .type(String.class)                     // argument type, used if getParsedOptionValue is used
                            .required(false)                        // makes this option optional
                            .build();

            // An OptionGroup indicates mutually exclusive options
            final OptionGroup exclusiveGroup = new OptionGroup();
            exclusiveGroup.setRequired(true);
            exclusiveGroup.addOption(
                    // you can declare options inline
                    Option.builder("nd")
                            .longOpt("numDogs")
                            .desc("Number of dogs (excludes cats)")
                            .hasArg()
                            .argName("number of dogs")
                            .type(Number.class)
                            .build())
                    // you can chain the calls together
                    .addOption(Option.builder("nc").longOpt("numCats").desc("Number of cats (excludes dogs)").hasArg().argName("number of cats").type(Number.class).build());

            // Add Options and OptionGroups the same way
            return options
                    .addOption(wmOption)
                    .addOptionGroup(exclusiveGroup)
                    .addOption(Option.builder(SIMPLE_OPTION).required(false).longOpt("simple").desc("Use SimpleExampleDataSource").build())
                    .addOption(Option.builder(NUM_ROWS_OPTION).required().longOpt("numRows").desc("Generate this many rows").hasArg().argName("numRows").type(Number.class).build())
                    .addOption(Option.builder(START_OPTION).required(false).longOpt("startIndex").desc("Generate rows starting at startIndex (default is 0)").hasArg().argName("startIndex").type(Number.class).build());
        }

        /**
         * Sub-classes should define their own version of appendArguments, which
         * should almost always invoke the superclass implementation first.
         * The superclass, {@link StandardImporterArguments}, includes
         * namespace, tableName, destinationDirectory, and outputMode.
         *
         * @param logOutput Append argument names and values to this LogOutput
         * @return The logOutput for call chaining
         */
        public LogOutput appendArguments(@NotNull final LogOutput logOutput) {
            super.appendArguments(logOutput);
            // For example,
            //return logOutput
            //        .append(",destinationDirectory=").append(destinationDirectory.toString())
            //        .append(",outputMode=").append(outputMode);
            return logOutput;
        }

        /**
         * Accessor for customized option startIndex.
         *
         * @return The specified or default start index.
         */
        int getStartIndexArgument() {
            return startIndex;
        }

        /**
         * Accessor for customized option numRows.
         *
         * @return The specified number of rows.
         */
        int getNumRowsArgument() {
            return numRows;
        }

        /**
         * Accessor for customized string argument.
         *
         * @return The welcome message.
         */
        String getMessage() {
            return message;
        }

        /**
         * You can certainly add methods to find out whether optional arguments were specified.
         *
         * @return true if a message was given on the command line
         */
        boolean hasMessageArgument() {
            return message != null;
        }
    }

    /**
     * <p>Example data source. This may implement {@link TableReader}, but does not have to.
     * This class needs to read your data source and supply the rows to ExampleImporter.processData.
     * You will likely need to add custom arguments to identify the data source.
     *
     * <p>This implementation produces manufactured data based on an index in a
     * range given in the constructor.
     */
    private static class SimpleExampleDataSource {

        private final int start;
        private final int numRows;
        private int index;

        SimpleExampleDataSource(int start, int numRows) {
            this.start = start;
            this.numRows = numRows > 0 ? numRows : 0;
            index = start-1;
        }

        /**
         * Read or otherwise prepare the next row of data.
         * This implementation manufactures the data.
         *
         * @return true if another row is available, false if there is no more data.
         */
        boolean readRow() {
            if (index < start + numRows - 1) {
                index = index +1;
                return true;
            }
            return false;
        }

        // This simple implementation hard-codes accessors for the known columns.
        String getStringColValue() {
            return "string:" + index;
        }
        DBDateTime getDateTimeColValue() {
            return DBDateTime.now();
        }
        long getLongColValue() {
            return index;
        }
        int getIntegerColValue() {
            return index;
        }
        double getDoubleColValue() {
            return 0.123d + index;
        }
        float getFloatColValue() {
            return 0.123f + index;
        }
        boolean getBooleanColValue() {
            return index % 3 == 0;
        }
        char getCharColValue() {
            return (char)index;
        }
        byte getByteColValue() {
            return (byte)(index%128);
        }
        short getShortColValue() {
            return (short)index;
        }
    }

    /**
     * <p> Example data source. This may implement {@link TableReader}, but does not have to.
     * This class needs to read your data source and supply the rows to ExampleImporter.processData.
     * You will likely need to add custom arguments to identify the data source.
     *
     * <p> This implementation dynamically discovers the columns in the table
     * definition, and produces manufactured data based on an index in a
     * range given in the constructor.
     */
    private static class DynamicExampleDataSource {

        private final int start;
        private final int numRows;
        private int index;
        /** Compute the getter methods once and store them. */
        final Map<String, RowGetter> rowGettersByName = new HashMap<>();

        DynamicExampleDataSource(int start, int numRows, TableDefinition tableDefinition) {
            this.start = start;
            this.numRows = numRows > 0 ? numRows : 0;
            index = start-1;
            init(tableDefinition);
        }

        /**
         * Read or otherwise prepare the next row of data.
         * This implementation manufactures the data.
         *
         * @return true if another row is available, false if there is no more data.
         */
        boolean readRow() {
            if (index < start + numRows - 1) {
                index = index +1;
                return true;
            }
            return false;
        }

        /**
         * Get the getter for a named column.
         * The getter is a pre-calculated implementation of RowGetter.
         * This variant cannot return a typed RowGetter.
         *
         * @param name the column to get the getter for.
         * @return the getter
         * @throws IllegalArgumentException when the getter does not exist
         */
        RowGetter getGetter(String name) {
            RowGetter rowGetter = rowGettersByName.get(name);
            if (rowGetter == null) {
                throw new IllegalArgumentException("No getter for column " + name);
            }
            return rowGetter;
        }

        /**
         * Get the typed getter for a named column.
         * The getter is a pre-calculated implementation of RowGetter.
         *
         * @param name the column to get the getter for.
         * @return the getter
         * @throws IllegalArgumentException when the getter does not exist
         * @throws ClassCastException when the type is inappropriate
         */
        public <T> RowGetter<T> getGetter(String name, Class<T> tClass) {
            RowGetter getter = getGetter(name);
            if (tClass.isAssignableFrom(getter.getType())) {
                //noinspection unchecked
                return (RowGetter<T>) getter;
            } else if (tClass.isAssignableFrom(com.illumon.util.type.TypeUtils.getBoxedType(getter.getType()))) {
                //noinspection unchecked
                return (RowGetter<T>)getter;
            } else {
                throw new ClassCastException("Getter for column " + name + " is not assignable from " + tClass + ", type is " + getter.getType());
            }
        }

        /**
         * Pre-compute the getters for all the columns. This implementation
         * manufactures the data based on an index and the column type.
         *
         * @param tableDefinition the table definition to inspect
         */
        private void init(TableDefinition tableDefinition) {

            for (ColumnDefinition colDef : tableDefinition.getColumns()) {

                String colName = colDef.getName();

                if (DBDateTime.class.equals(colDef.getDataType())) {
                    rowGettersByName.put(colName,
                            new AbstractRowGetter<DBDateTime>(DBDateTime.class) {
                                @Override
                                public DBDateTime get() {
                                    return DBDateTime.now();
                                }
                            });
                    continue;
                }

                try {
                    switch (SupportedType.getType(colDef.getDataType())) {
                        case Boolean: {
                            rowGettersByName.put(colName,
                                    new AbstractRowGetter<Boolean>(Boolean.class) {
                                        @Override
                                        public Boolean get() {
                                            return getBoolean();
                                        }

                                        @Override
                                        public Boolean getBoolean() {
                                            return index % 3 == 0;
                                        }
                                    });
                            break;
                        }
                        case Byte: {
                            rowGettersByName.put(colName,
                                    new AbstractRowGetter<Byte>(Byte.class) {
                                        @Override
                                        public Byte get() {
                                            return getByte();
                                        }

                                        @Override
                                        public byte getByte() {
                                            return (byte)(index % 128);
                                        }
                                    });
                            break;
                        }
                        case Char: {
                            rowGettersByName.put(colName,
                                    new AbstractRowGetter<Character>(Character.class) {
                                        @Override
                                        public Character get() {
                                            return getChar();
                                        }

                                        @Override
                                        public char getChar() {
                                            return (char)(index % 128);
                                        }
                                    });
                            break;
                        }
                        case Double: {
                            rowGettersByName.put(colName,
                                    new AbstractRowGetter<Double>(Double.class) {
                                        @Override
                                        public Double get() {
                                            return getDouble();
                                        }

                                        @Override
                                        public double getDouble() {
                                            return index + 0.123d;
                                        }
                                    });
                            break;
                        }
                        case Float: {
                            rowGettersByName.put(colName,
                                    new AbstractRowGetter<Float>(Float.class) {
                                        @Override
                                        public Float get() {
                                            return getFloat();
                                        }

                                        @Override
                                        public float getFloat() {
                                            return index + 0.456f;
                                        }
                                    });
                            break;
                        }
                        case Int: {
                            rowGettersByName.put(colName,
                                    new AbstractRowGetter<Integer>(Integer.class) {
                                        @Override
                                        public Integer get() {
                                            return getInt();
                                        }

                                        @Override
                                        public int getInt() {
                                            return index;
                                        }
                                    });
                            break;
                        }
                        case Long: {
                            rowGettersByName.put(colName,
                                    new AbstractRowGetter<Long>(Long.class) {
                                        @Override
                                        public Long get() {
                                            return getLong();
                                        }

                                        @Override
                                        public long getLong() {
                                            return index;
                                        }
                                    });
                            break;
                        }
                        case Short: {
                            rowGettersByName.put(colName,
                                    new AbstractRowGetter<Short>(Short.class) {
                                        @Override
                                        public Short get() {
                                            return getShort();
                                        }

                                        @Override
                                        public short getShort() {
                                            return (short)index;
                                        }
                                    });
                            break;
                        }
                        case String:
                        case EnhancedString:
                        case Enum: {
                            rowGettersByName.put(colName,
                                    new AbstractRowGetter<String>(String.class) {
                                        @Override
                                        public String get() {
                                            return "String: " + index;
                                        }
                                    });
                            break;
                        }
                        case Blob:
                        case UnknownType:
                        default:
                        {
                            rowGettersByName.put(colName,
                                    new AbstractRowGetter<Object>(Object.class) {
                                        @Override
                                        public Object get() {
                                            return null;
                                        }
                                    });
                            break;
                        }
                    }
                } catch (UnsupportedOperationException e) {
                    // for the types not convertable in SupportedType
                    rowGettersByName.put(colName,
                            new AbstractRowGetter<Object>(Object.class) {
                                @Override
                                public Object get() {
                                    return null;
                                }
                            });
                }
            }
        }
    }

    /**
     * Construct the ExampleImporter.
     * <p>Can be private, because it's only constructed from this class's main().
     * If queries are used to set up and run this importer, other access specifiers or constructors might be needed.
     *
     * @param log use this Logger for feedback
     * @param arguments importer arguments defining the import run
     */
    public ExampleImporter(@NotNull final Logger log, @NotNull final ExampleImporter.Arguments arguments) {
        super(log, arguments);
    }

    /**
     * Alternate constructor for usage without a command line.
     *
     * @param log Logger for monitoring and debugging messages
     * @param namespace namespace of the destination table
     * @param tableName name of the destination table
     * @param partitionString internal and column partition values ("internalpartition/columnpartition")
     * @param outputMode a valid {@link ImportOutputMode} value
     * @param startIndex see the -st command line option: "Generate rows starting at startIndex (default is 0)"
     * @param numRows see the -nr command line option: "Generate this many rows"
     */
    public ExampleImporter(@NotNull final Logger log,
                           @NotNull final String namespace,
                           @NotNull final String tableName,
                           @NotNull final String partitionString,
                           @NotNull final String outputMode,
                           final int startIndex,
                           final int numRows) {
        this(log, new Arguments(namespace, tableName, partitionString, outputMode, startIndex, numRows));
    }

    /**
     * This will be called by the base importer superclass.
     *
     * Override this method to open and process your data source. For each row of the input,
     * call tableWriter.getSetter(...).set(value) for each column, and then tableWriter.writeRow().
     */
    @Override
    protected void processData() {
        if (getImporterArguments().simpleExample) {
            processDataSimple();
        } else {
            processDataDynamic();
        }
    }

    /**
     * Pretend to read rows from a data source, actually manufacture data for a
     * known static schema.
     */
    private void processDataSimple() {
        final SimpleExampleDataSource dataSource = new SimpleExampleDataSource(getImporterArguments().getStartIndexArgument(), getImporterArguments().getNumRowsArgument());

        TableWriter tableWriter = getTableWriter();

        // (Optional) do the getter lookups outside the loop, for efficiency. These could be stored in an array or other structure.
        final RowSetter<String> setterStringCol = tableWriter.getSetter("StringCol", String.class);
        final RowSetter<DBDateTime> setterDateTimeCol = tableWriter.getSetter("DateTimeCol", DBDateTime.class);
        final RowSetter<Integer> setterIntegerCol = tableWriter.getSetter("IntegerCol", Integer.class);
        final RowSetter<Long> setterLongCol = tableWriter.getSetter("LongCol", Long.class);
        final RowSetter<Double> setterDoubleCol = tableWriter.getSetter("DoubleCol", Double.class);
        final RowSetter<Float> setterFloatCol = tableWriter.getSetter("FloatCol", Float.class);
        final RowSetter<Boolean> setterBooleanCol = tableWriter.getSetter("BooleanCol", Boolean.class);
        final RowSetter<Character> setterCharCol = tableWriter.getSetter("CharCol", Character.class);
        final RowSetter<Byte> setterByteCol = tableWriter.getSetter("ByteCol", Byte.class);
        final RowSetter<Short> setterShortCol = tableWriter.getSetter("ShortCol", Short.class);
        // Date is the partitioning column, so we cannot set it.

        // For each row provided by your data source, set all the values in the writer.
        while (dataSource.readRow()) {
            setterStringCol.set(dataSource.getStringColValue());
            setterDateTimeCol.set(dataSource.getDateTimeColValue());
            setterIntegerCol.setInt(dataSource.getIntegerColValue());
            setterLongCol.setLong(dataSource.getLongColValue());
            setterDoubleCol.setDouble(dataSource.getDoubleColValue());
            setterFloatCol.setFloat(dataSource.getFloatColValue());
            setterBooleanCol.setBoolean(dataSource.getBooleanColValue());
            setterCharCol.setChar(dataSource.getCharColValue());
            setterByteCol.setByte(dataSource.getByteColValue());
            setterShortCol.setShort(dataSource.getShortColValue());

            // write each populated row
            try {
                tableWriter.writeRow();
            } catch (IOException ioe) {
                throw new UncheckedIOException("Failed to write a row!", ioe);
            }
        }
    }

    /**
     * Pretend to read rows from a data source, actually manufacture data for
     * the columns discovered in the named namespace/table.
     */
    private void processDataDynamic() {
        final int start = getImporterArguments().getStartIndexArgument();
        final int numRows = getImporterArguments().getNumRowsArgument();

        final TableDefinition tableDefinition = getTableDefinition().getWritable();

        // this example data source knows how to manufacture data for the columns it discovers
        final DynamicExampleDataSource dataSource = new DynamicExampleDataSource(start, numRows, tableDefinition);

        final TableWriter tableWriter = getTableWriter();

        // calculate the type-sensitive get-then-set lambda for all columns, outside the loop
        List<Runnable> getAndSetters = new ArrayList<>();
        for (ColumnDefinition columnDefinition : tableDefinition.getColumns()) {
            String colName = columnDefinition.getName();
            getAndSetters.add(getTypeSpecificGetSet(tableWriter.getSetter(colName), dataSource.getGetter(colName), columnDefinition.getDataType()));
        }
        // It would be simpler to call tableWriter.getSetter(colName).set(dataSource.getGetter(colName)), but using the
        // generic get() causes primitive types to be boxed.

        while (dataSource.readRow()) {
            // For each row provided by your data source, set all the values in the writer.
            getAndSetters.forEach(Runnable::run);

            // write each populated row
            try {
                tableWriter.writeRow();
            } catch (IOException ioe) {
                throw new UncheckedIOException("Failed to write a row!", ioe);
            }
        }
    }

    /**
     * We want to use the typed versions of get and set to avoid boxing.
     * Calculate and return a runnable that uses the typed get and set methods
     * to move the source input to the writer output.
     *
     * @param setter a RowSetter for a column
     * @param getter a RowSetter for a column
     * @param type the class of the column data
     * @return a Runnable that gets and sets a column value
     */
    private static Runnable getTypeSpecificGetSet(final RowSetter setter, final RowGetter getter, final Class<?> type) {

        if (type == char.class || type == Character.class) {
            return () -> setter.setChar(getter.getChar());
        } else if (type == byte.class || type == Byte.class) {
            return () -> setter.setByte(getter.getByte());
        } else if (type == double.class || type == Double.class) {
            return () -> setter.setDouble(getter.getDouble());
        } else if (type == float.class || type == Float.class) {
            return () -> setter.setFloat(getter.getFloat());
        } else if (type == int.class || type == Integer.class) {
            return () -> setter.setInt(getter.getInt());
        } else if (type == long.class || type == Long.class) {
            return () -> setter.setLong(getter.getLong());
        } else if (type == short.class || type == Short.class) {
            return () -> setter.setShort(getter.getShort());
        } else {
            //noinspection unchecked
            return () -> setter.set(getter.get());
        }
    }

    /**
     * Static helper method to avoid fluff in queries.
     *
     * @param log Logger for monitoring and debugging messages
     * @param namespace namespace of the destination table
     * @param tableName name of the destination table
     * @param partitionString internal and column partition values ("internalpartition/columnpartition")
     * @param outputMode a valid {@link ImportOutputMode} value
     * @param startIndex see the -st command line option: "Generate rows starting at startIndex (default is 0)"
     * @param numRows see the -nr command line option: "Generate this many rows"
     */
    @SuppressWarnings("unused")
    public static void importData(@NotNull final Logger log,
                                  @NotNull final String namespace,
                                  @NotNull final String tableName,
                                  @NotNull final String partitionString,
                                  @NotNull final String outputMode,
                                  final int startIndex,
                                  final int numRows) {
        ExampleImporter importer = new ExampleImporter(log, namespace, tableName, partitionString, outputMode, startIndex, numRows);
        importer.doImport();
    }

    /**
     * Example importer.
     *
     * @param args command line arguments to the process
     */
    public static void main(final String... args) {
        // get the Configuration singleton instance and save it for convenience
        final Configuration configuration = Configuration.getInstance();

        // Make sure only one instance of this process runs at any time.
        // The process name is taken from the "process.name" property set in a property file or
        // via jvm args such as -Dprocess.name=ExampleImporter_1
        PidFileUtil.checkAndCreatePidFileForThisProcess(configuration);

        // Create an Iris logger for this process
        final Logger log = ProcessStreamLoggerImpl.makeLogger(new SystemLoggerTimeSource(), TimeZone.getDefault());
        // Redirect any log4j logging to the Iris logger
        Log4jAdapter.sendLog4jToLogger(log);


        // if no arguments are given, print a friendly usage message (to the log file)
        if (args.length == 0) {
            log.info().append(StandardImporterArguments.getHelpString(ExampleImporter.class.getSimpleName(), ExampleImporter.Arguments::addOptions)).endl();
            System.exit(-1);
        }

        try {
            // All Iris processes expect a ProcessEnvironment.
            ProcessEnvironment.basicServerInitialization(configuration, ExampleImporter.class.getSimpleName(), log);

            // parse the arguments
            ExampleImporter.Arguments importerArgs = StandardImporterArguments.parseCommandLine(ExampleImporter.class.getSimpleName(), ExampleImporter.Arguments::new, args, ExampleImporter.Arguments::addOptions);
            try {
                // Create an importer with the specified arguments, and import the data.
                ExampleImporter importer = new ExampleImporter(log, importerArgs);
                importer.doImport();
            } catch (RuntimeException e){
                log.fatal().append("Import failed with exception: ").append(e).endl();
            }
        } catch (Exception e) {
            log.fatal().append("Failed to parse command line arguments: ").append(e).endl();
            // add the usage statement
            log.info().append(StandardImporterArguments.getHelpString(ExampleImporter.class.getSimpleName(), ExampleImporter.Arguments::addOptions)).endl();
            System.exit(-1);
        }
    }
}
